You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Sea <26...@qq.com> on 2015/06/26 11:06:16 UTC

Time is ugly in Spark Streaming....

Hi, all


I find a problem in spark streaming, when I use the time in function foreachRDD... I find the time is very interesting.
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
dataStream.map(x => createGroup(x._2, dimensions)).groupByKey().foreachRDD((rdd, time) => {
  try {
    if (!rdd.partitions.isEmpty) {
      rdd.foreachPartition(partition => {
        handlePartition(partition, timeType, time, dimensions, outputTopic, brokerList)
      })
    }
  } catch {
    case e: Exception => e.printStackTrace()
  }
})


val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
var date = dateFormat.format(new Date(time.milliseconds))

Then I insert the 'date' into Kafka , but I found .....


{"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
{"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
{"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
{"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
{"timestamp":"0020-06-26T16:50:36","status":"7","type":"0","waittime":"0","count":1722}
{"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
{"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
{"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
{"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}

回复: Time is ugly in Spark Streaming....

Posted by Sea <26...@qq.com>.
Yes, I make it.




------------------ 原始邮件 ------------------
发件人: "Gerard Maas";<ge...@gmail.com>;
发送时间: 2015年6月26日(星期五) 下午5:40
收件人: "Sea"<26...@qq.com>; 
抄送: "user"<us...@spark.apache.org>; "dev"<de...@spark.apache.org>; 
主题: Re: Time is ugly in Spark Streaming....



Are you sharing the SimpleDateFormat instance? This looks a lot more like the non-thread-safe behaviour of SimpleDateFormat (that has claimed many unsuspecting victims over the years), than any 'ugly' Spark Streaming. Try writing the timestamps in millis to Kafka and compare.

-kr, Gerard.


On Fri, Jun 26, 2015 at 11:06 AM, Sea <26...@qq.com> wrote:
Hi, all


I find a problem in spark streaming, when I use the time in function foreachRDD... I find the time is very interesting.
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
dataStream.map(x => createGroup(x._2, dimensions)).groupByKey().foreachRDD((rdd, time) => {
  try {
    if (!rdd.partitions.isEmpty) {
      rdd.foreachPartition(partition => {
        handlePartition(partition, timeType, time, dimensions, outputTopic, brokerList)
      })
    }
  } catch {
    case e: Exception => e.printStackTrace()
  }
})


val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
var date = dateFormat.format(new Date(time.milliseconds))

Then I insert the 'date' into Kafka , but I found .....


{"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
{"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
{"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
{"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
{"timestamp":"0020-06-26T16:50:36","status":"7","type":"0","waittime":"0","count":1722}
{"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
{"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
{"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
{"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}

回复: Time is ugly in Spark Streaming....

Posted by Sea <26...@qq.com>.
Yes, I make it.




------------------ 原始邮件 ------------------
发件人: "Gerard Maas";<ge...@gmail.com>;
发送时间: 2015年6月26日(星期五) 下午5:40
收件人: "Sea"<26...@qq.com>; 
抄送: "user"<us...@spark.apache.org>; "dev"<de...@spark.apache.org>; 
主题: Re: Time is ugly in Spark Streaming....



Are you sharing the SimpleDateFormat instance? This looks a lot more like the non-thread-safe behaviour of SimpleDateFormat (that has claimed many unsuspecting victims over the years), than any 'ugly' Spark Streaming. Try writing the timestamps in millis to Kafka and compare.

-kr, Gerard.


On Fri, Jun 26, 2015 at 11:06 AM, Sea <26...@qq.com> wrote:
Hi, all


I find a problem in spark streaming, when I use the time in function foreachRDD... I find the time is very interesting.
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
dataStream.map(x => createGroup(x._2, dimensions)).groupByKey().foreachRDD((rdd, time) => {
  try {
    if (!rdd.partitions.isEmpty) {
      rdd.foreachPartition(partition => {
        handlePartition(partition, timeType, time, dimensions, outputTopic, brokerList)
      })
    }
  } catch {
    case e: Exception => e.printStackTrace()
  }
})


val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
var date = dateFormat.format(new Date(time.milliseconds))

Then I insert the 'date' into Kafka , but I found .....


{"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
{"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
{"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
{"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
{"timestamp":"0020-06-26T16:50:36","status":"7","type":"0","waittime":"0","count":1722}
{"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
{"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
{"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
{"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}

Re: Time is ugly in Spark Streaming....

Posted by Gerard Maas <ge...@gmail.com>.
Are you sharing the SimpleDateFormat instance? This looks a lot more like
the non-thread-safe behaviour of SimpleDateFormat (that has claimed many
unsuspecting victims over the years), than any 'ugly' Spark Streaming. Try
writing the timestamps in millis to Kafka and compare.

-kr, Gerard.

On Fri, Jun 26, 2015 at 11:06 AM, Sea <26...@qq.com> wrote:

> Hi, all
>
> I find a problem in spark streaming, when I use the time in function foreachRDD...
> I find the time is very interesting.
>
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>
> dataStream.map(x => createGroup(x._2, dimensions)).groupByKey().foreachRDD((rdd, time) => {
>   try {
>     if (!rdd.partitions.isEmpty) {
>       rdd.foreachPartition(partition => {
>         handlePartition(partition, timeType, time, dimensions, outputTopic, brokerList)
>       })
>     }
>   } catch {
>     case e: Exception => e.printStackTrace()
>   }
> })
>
>
> val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
>
> var date = dateFormat.format(new Date(time.milliseconds))
>
>
> Then I insert the 'date' into Kafka , but I found .....
>
>
> {"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
>
> {"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
>
> {"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
>
> {"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
> {"timestamp":"0020-06-26T16:50:36
> ","status":"7","type":"0","waittime":"0","count":1722}
>
> {"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
>
> {"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
>
> {"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
>
> {"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}
>
>

Re: Time is ugly in Spark Streaming....

Posted by Gerard Maas <ge...@gmail.com>.
Are you sharing the SimpleDateFormat instance? This looks a lot more like
the non-thread-safe behaviour of SimpleDateFormat (that has claimed many
unsuspecting victims over the years), than any 'ugly' Spark Streaming. Try
writing the timestamps in millis to Kafka and compare.

-kr, Gerard.

On Fri, Jun 26, 2015 at 11:06 AM, Sea <26...@qq.com> wrote:

> Hi, all
>
> I find a problem in spark streaming, when I use the time in function foreachRDD...
> I find the time is very interesting.
>
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>
> dataStream.map(x => createGroup(x._2, dimensions)).groupByKey().foreachRDD((rdd, time) => {
>   try {
>     if (!rdd.partitions.isEmpty) {
>       rdd.foreachPartition(partition => {
>         handlePartition(partition, timeType, time, dimensions, outputTopic, brokerList)
>       })
>     }
>   } catch {
>     case e: Exception => e.printStackTrace()
>   }
> })
>
>
> val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
>
> var date = dateFormat.format(new Date(time.milliseconds))
>
>
> Then I insert the 'date' into Kafka , but I found .....
>
>
> {"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
>
> {"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
>
> {"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
>
> {"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
> {"timestamp":"0020-06-26T16:50:36
> ","status":"7","type":"0","waittime":"0","count":1722}
>
> {"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
>
> {"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
>
> {"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
>
> {"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}
>
>

回复: Time is ugly in Spark Streaming....

Posted by Sea <26...@qq.com>.
Yes , things go well now.  It is a problem of SimpleDateFormat. Thank you all.




------------------ 原始邮件 ------------------
发件人: "Dumas Hwang";<du...@gmail.com>;
发送时间: 2015年6月27日(星期六) 晚上8:16
收件人: "Tathagata Das"<td...@databricks.com>; 
抄送: "Emrehan Tüzün"<em...@gmail.com>; "Sea"<26...@qq.com>; "dev"<de...@spark.apache.org>; "user"<us...@spark.apache.org>; 
主题: Re: Time is ugly in Spark Streaming....



Java's SimpleDateFormat is not thread safe.  You can consider using DateTimeFormatter if you are using Java 8 or Joda-time

On Sat, Jun 27, 2015 at 3:32 AM, Tathagata Das <td...@databricks.com> wrote:
Could you print the "time" on the driver (that is, in foreachRDD but before RDD.foreachPartition) and see if it is behaving weird?

TD


On Fri, Jun 26, 2015 at 3:57 PM, Emrehan Tüzün <em...@gmail.com> wrote:
 





On Fri, Jun 26, 2015 at 12:30 PM, Sea <26...@qq.com> wrote:

 Hi, all
 

 I find a problem in spark streaming, when I use the time in function foreachRDD... I find the time is very interesting. 
 val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
 dataStream.map(x => createGroup(x._2, dimensions)).groupByKey().foreachRDD((rdd, time) => {
try {
if (!rdd.partitions.isEmpty) {
      rdd.foreachPartition(partition => {
handlePartition(partition, timeType, time, dimensions, outputTopic, brokerList)
      })
    }
  } catch {
case e: Exception => e.printStackTrace()
  }
})
 

 val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
  var date = dateFormat.format(new Date(time.milliseconds)) 
 
 Then I insert the 'date' into Kafka , but I found .....
  

 {"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
 {"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
 {"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
 {"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
 {"timestamp":"0020-06-26T16:50:36","status":"7","type":"0","waittime":"0","count":1722}
 {"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
 {"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
 {"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
 {"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}

回复: Time is ugly in Spark Streaming....

Posted by Sea <26...@qq.com>.
Yes , things go well now.  It is a problem of SimpleDateFormat. Thank you all.




------------------ 原始邮件 ------------------
发件人: "Dumas Hwang";<du...@gmail.com>;
发送时间: 2015年6月27日(星期六) 晚上8:16
收件人: "Tathagata Das"<td...@databricks.com>; 
抄送: "Emrehan Tüzün"<em...@gmail.com>; "Sea"<26...@qq.com>; "dev"<de...@spark.apache.org>; "user"<us...@spark.apache.org>; 
主题: Re: Time is ugly in Spark Streaming....



Java's SimpleDateFormat is not thread safe.  You can consider using DateTimeFormatter if you are using Java 8 or Joda-time

On Sat, Jun 27, 2015 at 3:32 AM, Tathagata Das <td...@databricks.com> wrote:
Could you print the "time" on the driver (that is, in foreachRDD but before RDD.foreachPartition) and see if it is behaving weird?

TD


On Fri, Jun 26, 2015 at 3:57 PM, Emrehan Tüzün <em...@gmail.com> wrote:
 





On Fri, Jun 26, 2015 at 12:30 PM, Sea <26...@qq.com> wrote:

 Hi, all
 

 I find a problem in spark streaming, when I use the time in function foreachRDD... I find the time is very interesting. 
 val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
 dataStream.map(x => createGroup(x._2, dimensions)).groupByKey().foreachRDD((rdd, time) => {
try {
if (!rdd.partitions.isEmpty) {
      rdd.foreachPartition(partition => {
handlePartition(partition, timeType, time, dimensions, outputTopic, brokerList)
      })
    }
  } catch {
case e: Exception => e.printStackTrace()
  }
})
 

 val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
  var date = dateFormat.format(new Date(time.milliseconds)) 
 
 Then I insert the 'date' into Kafka , but I found .....
  

 {"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
 {"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
 {"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
 {"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
 {"timestamp":"0020-06-26T16:50:36","status":"7","type":"0","waittime":"0","count":1722}
 {"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
 {"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
 {"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
 {"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}

Re: Time is ugly in Spark Streaming....

Posted by Dumas Hwang <du...@gmail.com>.
Java's SimpleDateFormat is not thread safe.  You can consider using
DateTimeFormatter if you are using Java 8 or Joda-time

On Sat, Jun 27, 2015 at 3:32 AM, Tathagata Das <td...@databricks.com> wrote:

> Could you print the "time" on the driver (that is, in foreachRDD but
> before RDD.foreachPartition) and see if it is behaving weird?
>
> TD
>
> On Fri, Jun 26, 2015 at 3:57 PM, Emrehan Tüzün <em...@gmail.com>
> wrote:
>
>>
>>
>>
>>
>> On Fri, Jun 26, 2015 at 12:30 PM, Sea <26...@qq.com> wrote:
>>
>>> Hi, all
>>>
>>> I find a problem in spark streaming, when I use the time in function foreachRDD...
>>> I find the time is very interesting.
>>>
>>> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>>>
>>> dataStream.map(x => createGroup(x._2, dimensions)).groupByKey().foreachRDD((rdd, time) => {
>>> try {
>>> if (!rdd.partitions.isEmpty) {
>>>       rdd.foreachPartition(partition => {
>>> handlePartition(partition, timeType, time, dimensions, outputTopic, brokerList)
>>>       })
>>>     }
>>>   } catch {
>>> case e: Exception => e.printStackTrace()
>>>   }
>>> })
>>>
>>>
>>> val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
>>>
>>>  var date = dateFormat.format(new Date(time.milliseconds))
>>>
>>>
>>>  Then I insert the 'date' into Kafka , but I found .....
>>>
>>>
>>> {"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
>>>
>>> {"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
>>>
>>> {"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
>>>
>>> {"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
>>> {"timestamp":"0020-06-26T16:50:36
>>> ","status":"7","type":"0","waittime":"0","count":1722}
>>>
>>> {"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
>>>
>>> {"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
>>>
>>> {"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
>>>
>>> {"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}
>>>
>>>
>>
>

Re: Time is ugly in Spark Streaming....

Posted by Tathagata Das <td...@databricks.com>.
Could you print the "time" on the driver (that is, in foreachRDD but before
RDD.foreachPartition) and see if it is behaving weird?

TD

On Fri, Jun 26, 2015 at 3:57 PM, Emrehan Tüzün <em...@gmail.com>
wrote:

>
>
>
>
> On Fri, Jun 26, 2015 at 12:30 PM, Sea <26...@qq.com> wrote:
>
>> Hi, all
>>
>> I find a problem in spark streaming, when I use the time in function foreachRDD...
>> I find the time is very interesting.
>>
>> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>>
>> dataStream.map(x => createGroup(x._2, dimensions)).groupByKey().foreachRDD((rdd, time) => {
>> try {
>> if (!rdd.partitions.isEmpty) {
>>       rdd.foreachPartition(partition => {
>> handlePartition(partition, timeType, time, dimensions, outputTopic, brokerList)
>>       })
>>     }
>>   } catch {
>> case e: Exception => e.printStackTrace()
>>   }
>> })
>>
>>
>> val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
>>
>>  var date = dateFormat.format(new Date(time.milliseconds))
>>
>>
>>  Then I insert the 'date' into Kafka , but I found .....
>>
>>
>> {"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
>>
>> {"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
>>
>> {"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
>>
>> {"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
>> {"timestamp":"0020-06-26T16:50:36
>> ","status":"7","type":"0","waittime":"0","count":1722}
>>
>> {"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
>>
>> {"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
>>
>> {"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
>>
>> {"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}
>>
>>
>

Re: Time is ugly in Spark Streaming....

Posted by Tathagata Das <td...@databricks.com>.
Could you print the "time" on the driver (that is, in foreachRDD but before
RDD.foreachPartition) and see if it is behaving weird?

TD

On Fri, Jun 26, 2015 at 3:57 PM, Emrehan Tüzün <em...@gmail.com>
wrote:

>
>
>
>
> On Fri, Jun 26, 2015 at 12:30 PM, Sea <26...@qq.com> wrote:
>
>> Hi, all
>>
>> I find a problem in spark streaming, when I use the time in function foreachRDD...
>> I find the time is very interesting.
>>
>> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>>
>> dataStream.map(x => createGroup(x._2, dimensions)).groupByKey().foreachRDD((rdd, time) => {
>> try {
>> if (!rdd.partitions.isEmpty) {
>>       rdd.foreachPartition(partition => {
>> handlePartition(partition, timeType, time, dimensions, outputTopic, brokerList)
>>       })
>>     }
>>   } catch {
>> case e: Exception => e.printStackTrace()
>>   }
>> })
>>
>>
>> val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
>>
>>  var date = dateFormat.format(new Date(time.milliseconds))
>>
>>
>>  Then I insert the 'date' into Kafka , but I found .....
>>
>>
>> {"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
>>
>> {"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
>>
>> {"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
>>
>> {"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
>> {"timestamp":"0020-06-26T16:50:36
>> ","status":"7","type":"0","waittime":"0","count":1722}
>>
>> {"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
>>
>> {"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
>>
>> {"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
>>
>> {"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}
>>
>>
>

Re: Time is ugly in Spark Streaming....

Posted by Emrehan Tüzün <em...@gmail.com>.

On Fri, Jun 26, 2015 at 12:30 PM, Sea <26...@qq.com> wrote:

> Hi, all
> I find a problem in spark streaming, when I use the time in function foreachRDD... I find the time is very interesting.
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> dataStream.map(x => createGroup(x._2, dimensions)).groupByKey().foreachRDD((rdd, time) => {
>   try {
>     if (!rdd.partitions.isEmpty) {
>       rdd.foreachPartition(partition => {
>         handlePartition(partition, timeType, time, dimensions, outputTopic, brokerList)
>       })
>     }
>   } catch {
>     case e: Exception => e.printStackTrace()
>   }
> })
> val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
> var date = dateFormat.format(new Date(time.milliseconds))
> Then I insert the 'date' into Kafka , but I found .....
> {"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
> {"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
> {"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
> {"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
> {"timestamp":"0020-06-26T16:50:36","status":"7","type":"0","waittime":"0","count":1722}
> {"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
> {"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
> {"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
> {"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}