You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Cui Lin <cu...@hds.com> on 2015/03/05 06:43:15 UTC
How to parse Json formatted Kafka message in spark streaming
Friends,
I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems:
1. I got the exception below. How to check an empty RDD?
Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
messages.foreachRDD { rdd =>
val message:RDD[String] = rdd.map { y => y._2 }
sqlContext.jsonRDD(message).registerTempTable("tempTable")
sqlContext.sql("SELECT time,To FROM tempTable")
.saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
}
2. how to get all column names from json messages? I have hundreds of columns in the json formatted message.
Thanks for your help!
Best regards,
Cui Lin
Re: How to parse Json formatted Kafka message in spark streaming
Posted by Cui Lin <cu...@hds.com>.
Hi, Helena,
I think your new version only fits to the json that has very limited columns. I couldn’t find MonthlyCommits, but I assume it only has small number of columns that are defined manually.
In my case, I have hundreds of column names so it is not feasible to define any class for these columns.
Is there any way to get column name instead of hard code “time” in this case?
mapper.readValue[Map[String,Any]](x).get("time")
Best regards,
Cui Lin
From: Helena Edelson <he...@datastax.com>>
Date: Thursday, March 5, 2015 at 7:02 AM
To: Ted Yu <yu...@gmail.com>>
Cc: Akhil Das <ak...@sigmoidanalytics.com>>, Cui Lin <Cu...@hds.com>>, "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
Subject: Re: How to parse Json formatted Kafka message in spark streaming
Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of spark sql for the mapping:
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafka.kafkaParams, Map("github" -> 5), StorageLevel.MEMORY_ONLY)
.map{ case (k,v) => JsonParser.parse(v).extract[MonthlyCommits]}
.saveToCassandra("githubstats","monthly_commits")
[datastax_logo.png]<http://www.datastax.com/>
HELENA EDELSON
Senior Software Engineer, DSE Analytics
[linkedin.png]<https://www.linkedin.com/in/helenaedelson>[twitter.png]<https://twitter.com/helenaedelson>[https://lh3.googleusercontent.com/osrzRgrOxm-gW72LtTXbYGuQkFiBqViXEQBVw4v_cbl99iphx_LETFoz0Ew_bYfYSqIg53gwho5elasykBtuKj1we5KqatfDbvYYw3vnupBmLrs0kkL0t4l9u8JDQqzwLw]<https://github.com/helena>
On Mar 5, 2015, at 9:33 AM, Ted Yu <yu...@gmail.com>> wrote:
Cui:
You can check messages.partitions.size to determine whether messages is an empty RDD.
Cheers
On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com>> wrote:
When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following:
val mapStream = messages.map(x=> {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.readValue[Map[String,Any]](x).get("time")
})
Thanks
Best Regards
On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cu...@hds.com>> wrote:
Friends,
I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems:
1. I got the exception below. How to check an empty RDD?
Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
messages.foreachRDD { rdd =>
val message:RDD[String] = rdd.map { y => y._2 }
sqlContext.jsonRDD(message).registerTempTable("tempTable")
sqlContext.sql("SELECT time,To FROM tempTable")
.saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
}
2. how to get all column names from json messages? I have hundreds of columns in the json formatted message.
Thanks for your help!
Best regards,
Cui Lin
Re: How to parse Json formatted Kafka message in spark streaming
Posted by Helena Edelson <he...@datastax.com>.
Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of spark sql for the mapping:
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafka.kafkaParams, Map("github" -> 5), StorageLevel.MEMORY_ONLY)
.map{ case (k,v) => JsonParser.parse(v).extract[MonthlyCommits]}
.saveToCassandra("githubstats","monthly_commits")
HELENA EDELSON
Senior Software Engineer, DSE Analytics
On Mar 5, 2015, at 9:33 AM, Ted Yu <yu...@gmail.com> wrote:
> Cui:
> You can check messages.partitions.size to determine whether messages is an empty RDD.
>
> Cheers
>
> On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com> wrote:
> When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following:
>
>
> val mapStream = messages.map(x=> {
> val mapper = new ObjectMapper() with ScalaObjectMapper
> mapper.registerModule(DefaultScalaModule)
>
> mapper.readValue[Map[String,Any]](x).get("time")
> })
>
>
>
> Thanks
> Best Regards
>
> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cu...@hds.com> wrote:
> Friends,
>
> I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems:
> I got the exception below. How to check an empty RDD?
> Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>
> val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
> messages.foreachRDD { rdd =>
> val message:RDD[String] = rdd.map { y => y._2 }
> sqlContext.jsonRDD(message).registerTempTable("tempTable")
> sqlContext.sql("SELECT time,To FROM tempTable")
> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
> }
>
> 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message.
>
> Thanks for your help!
>
>
>
>
> Best regards,
>
> Cui Lin
>
>
Re: How to parse Json formatted Kafka message in spark streaming
Posted by Ted Yu <yu...@gmail.com>.
See following thread for 1.3.0 release:
http://search-hadoop.com/m/JW1q5hV8c4
Looks like the release is around the corner.
On Thu, Mar 5, 2015 at 3:26 PM, Cui Lin <cu...@hds.com> wrote:
> Hi, Ted,
>
> Thanks for your reply. I noticed from the below link partitions.size
> will not work for checking empty RDD in streams. It seems that the problem
> can be solved in spark 1.3 which is no way to download at this time?
>
> https://issues.apache.org/jira/browse/SPARK-5270
> Best regards,
>
> Cui Lin
>
> From: Ted Yu <yu...@gmail.com>
> Date: Thursday, March 5, 2015 at 6:33 AM
> To: Akhil Das <ak...@sigmoidanalytics.com>
> Cc: Cui Lin <Cu...@hds.com>, "user@spark.apache.org" <
> user@spark.apache.org>
> Subject: Re: How to parse Json formatted Kafka message in spark streaming
>
> Cui:
> You can check messages.partitions.size to determine whether messages is
> an empty RDD.
>
> Cheers
>
> On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> When you use KafkaUtils.createStream with StringDecoders, it will
>> return String objects inside your messages stream. To access the elements
>> from the json, you could do something like the following:
>>
>>
>> val mapStream = messages.map(x=> {
>> val mapper = new ObjectMapper() with ScalaObjectMapper
>> mapper.registerModule(DefaultScalaModule)
>>
>> mapper.readValue[Map[String,Any]](x)*.get("time")*
>> })
>>
>>
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cu...@hds.com> wrote:
>>
>>> Friends,
>>>
>>> I'm trying to parse json formatted Kafka messages and then send back
>>> to cassandra.I have two problems:
>>>
>>> 1. I got the exception below. How to check an empty RDD?
>>>
>>> Exception in thread "main" java.lang.UnsupportedOperationException:
>>> empty collection
>>> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
>>> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
>>> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
>>> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>>>
>>> val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
>>>
>>> messages.foreachRDD { rdd =>
>>> val message:RDD[String] = rdd.map { y => y._2 }
>>> sqlContext.jsonRDD(message).registerTempTable("tempTable")
>>> sqlContext.sql("SELECT time,To FROM tempTable")
>>> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
>>> }
>>>
>>>
>>> 2. how to get all column names from json messages? I have hundreds of
>>> columns in the json formatted message.
>>>
>>> Thanks for your help!
>>>
>>>
>>>
>>>
>>> Best regards,
>>>
>>> Cui Lin
>>>
>>
>>
>
Re: How to parse Json formatted Kafka message in spark streaming
Posted by Cui Lin <cu...@hds.com>.
Hi, Ted,
Thanks for your reply. I noticed from the below link partitions.size will not work for checking empty RDD in streams. It seems that the problem can be solved in spark 1.3 which is no way to download at this time?
https://issues.apache.org/jira/browse/SPARK-5270
Best regards,
Cui Lin
From: Ted Yu <yu...@gmail.com>>
Date: Thursday, March 5, 2015 at 6:33 AM
To: Akhil Das <ak...@sigmoidanalytics.com>>
Cc: Cui Lin <Cu...@hds.com>>, "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
Subject: Re: How to parse Json formatted Kafka message in spark streaming
Cui:
You can check messages.partitions.size to determine whether messages is an empty RDD.
Cheers
On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com>> wrote:
When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following:
val mapStream = messages.map(x=> {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.readValue[Map[String,Any]](x).get("time")
})
Thanks
Best Regards
On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cu...@hds.com>> wrote:
Friends,
I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems:
1. I got the exception below. How to check an empty RDD?
Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
messages.foreachRDD { rdd =>
val message:RDD[String] = rdd.map { y => y._2 }
sqlContext.jsonRDD(message).registerTempTable("tempTable")
sqlContext.sql("SELECT time,To FROM tempTable")
.saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
}
2. how to get all column names from json messages? I have hundreds of columns in the json formatted message.
Thanks for your help!
Best regards,
Cui Lin
Re: How to parse Json formatted Kafka message in spark streaming
Posted by Ted Yu <yu...@gmail.com>.
Cui:
You can check messages.partitions.size to determine whether messages is an
empty RDD.
Cheers
On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:
> When you use KafkaUtils.createStream with StringDecoders, it will return
> String objects inside your messages stream. To access the elements from the
> json, you could do something like the following:
>
>
> val mapStream = messages.map(x=> {
> val mapper = new ObjectMapper() with ScalaObjectMapper
> mapper.registerModule(DefaultScalaModule)
>
> mapper.readValue[Map[String,Any]](x)*.get("time")*
> })
>
>
>
> Thanks
> Best Regards
>
> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cu...@hds.com> wrote:
>
>> Friends,
>>
>> I'm trying to parse json formatted Kafka messages and then send back
>> to cassandra.I have two problems:
>>
>> 1. I got the exception below. How to check an empty RDD?
>>
>> Exception in thread "main" java.lang.UnsupportedOperationException:
>> empty collection
>> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
>> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
>> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
>> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>>
>> val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
>>
>> messages.foreachRDD { rdd =>
>> val message:RDD[String] = rdd.map { y => y._2 }
>> sqlContext.jsonRDD(message).registerTempTable("tempTable")
>> sqlContext.sql("SELECT time,To FROM tempTable")
>> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
>> }
>>
>>
>> 2. how to get all column names from json messages? I have hundreds of
>> columns in the json formatted message.
>>
>> Thanks for your help!
>>
>>
>>
>>
>> Best regards,
>>
>> Cui Lin
>>
>
>
Re: How to parse Json formatted Kafka message in spark streaming
Posted by Akhil Das <ak...@sigmoidanalytics.com>.
When you use KafkaUtils.createStream with StringDecoders, it will return
String objects inside your messages stream. To access the elements from the
json, you could do something like the following:
val mapStream = messages.map(x=> {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.readValue[Map[String,Any]](x)*.get("time")*
})
Thanks
Best Regards
On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cu...@hds.com> wrote:
> Friends,
>
> I'm trying to parse json formatted Kafka messages and then send back to
> cassandra.I have two problems:
>
> 1. I got the exception below. How to check an empty RDD?
>
> Exception in thread "main" java.lang.UnsupportedOperationException:
> empty collection
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>
> val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
>
> messages.foreachRDD { rdd =>
> val message:RDD[String] = rdd.map { y => y._2 }
> sqlContext.jsonRDD(message).registerTempTable("tempTable")
> sqlContext.sql("SELECT time,To FROM tempTable")
> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
> }
>
>
> 2. how to get all column names from json messages? I have hundreds of
> columns in the json formatted message.
>
> Thanks for your help!
>
>
>
>
> Best regards,
>
> Cui Lin
>
Re: How to parse Json formatted Kafka message in spark streaming
Posted by Helena Edelson <he...@datastax.com>.
Hi Cui,
What version of Spark are you using? There was a bug ticket that may be related to this, fixed in core/src/main/scala/org/apache/spark/rdd/RDD.scala that is merged into versions 1.3.0 and 1.2.1 . If you are using 1.1.1 that may be the reason but it’s a stretch https://issues.apache.org/jira/browse/SPARK-4968
Did you verify that you have data streaming from Kafka?
Helena
https://twitter.com/helenaedelson
On Mar 5, 2015, at 12:43 AM, Cui Lin <cu...@hds.com> wrote:
> Friends,
>
> I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems:
> I got the exception below. How to check an empty RDD?
> Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>
> val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
> messages.foreachRDD { rdd =>
> val message:RDD[String] = rdd.map { y => y._2 }
> sqlContext.jsonRDD(message).registerTempTable("tempTable")
> sqlContext.sql("SELECT time,To FROM tempTable")
> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
> }
>
> 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message.
>
> Thanks for your help!
>
>
>
>
> Best regards,
>
> Cui Lin