You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "ext.mwalker" <ex...@riotgames.com> on 2017/03/06 19:57:12 UTC

Issues with Event Time and Kafka

Hi Folks,

We are working on a Flink job to proccess a large amount of data coming in
from a Kafka stream.

We selected Flink because the data is sometimes out of order or late, and we
need to roll up the data into 30-minutes event time windows, after which we
are writing it back out to an s3 bucket.

We have hit a couple issues:

1) The job works fine using processing time, but when we switch to event
time (almost) nothing seems to be written out.
Our watermark code looks like this:
```
  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis() - maxLateness);
  }
```
And we are doing this:
```
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
```
and this:
```
    .assignTimestampsAndWatermarks(new
TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds))
```

However even though we get millions of records per hour (the vast majority
of which are no more that 30 minutes late) we get like 2 - 10 records per
hour written out to the s3 bucket.
We are using a custom BucketingFileSink Bucketer if folks believe that is
the issue I would be happy to provide that code here as well.

2) On top of all this, we would really prefer to write the records directly
to Aurora in RDS rather than to an intermediate s3 bucket, but it seems that
the JDBC sink connector is unsupported / doesn't exist.
If this is not the case we would love to know.

Thanks in advance for all the help / insight on this,

Max Walker



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Issues with Event Time and Kafka

Posted by Aljoscha Krettek <al...@apache.org>.
What Robert said is correct. However, that behaviour depends on the
Trigger. You can write your own Trigger that behaves differently when
late data arrives, that is, you could write a trigger that never fires
for late data. In that case, you can also simply set the allowed
lateness to zero, however. You could also write a trigger that waits for
a certain number of late elements to arrive and then triggers a firing.


Best,

Aljoscha





On Fri, Mar 10, 2017, at 20:14, Robert Metzger wrote:

> Hi Ethan,

> 

> how late elements (elements with event time after the watermark) are
> handled depends on the operator. Flink's window operators will trigger
> a single event window when they fall into the "allowed lateness"
> timeframe. Otherwise, they are dropped.
> 

> On Thu, Mar 9, 2017 at 5:30 PM, ext.eformichella
> <ex...@riotgames.com> wrote:
>> Thanks for the suggestion, we can definitely try that out.

>> 

>> My one concern there is that events technically can lag for days or
>> even months in some cases, but we only care about including the
>> events that lag for 30 minutes or so, and would like the further
>> lagging events to be ignored - I just want to make sure that doesn't
>> require special handling.
>> 

>> I also just want to make sure I'm understanding the maximum lateness
>> watermark correctly. Suppose a watermark gets generated, and then an
>> element with an older timestamp is found. My understanding was that
>> that element should be ignored, but from our results it looks like
>> the late element actually overwrites the aggregate of the on-time
>> elements. Is this expected behavior?
>> 

>> Thank you for your help!

>> -Ethan

>> 

>> On Tue, Mar 7, 2017 at 6:01 PM, Dawid Wysakowicz [via Apache Flink
>> User Mailing List archive.] <[hidden email][1]> wrote:
>>> 

>>> Hi Ethan,

>>> 

>>> I believe then it is because the Watermark and Timestamps in your
>>> implementation are uncorrelated. What Watermark really is a marker
>>> that says there will be no elements with timestamp smaller than the
>>> value of this watermark. For more info on the concept see [1][2].
>>> 

>>> In your case as you say that events can "lag" for 30 minutes, you
>>> should try the BoundedOutOfOrdernessTimestampExtractor. It is
>>> designed exactly for a case like yours.
>>> 

>>> Regards,

>>> Dawid

>>> 

>>> 

>>> 2017-03-07 22:33 GMT+01:00 ext.eformichella <[hidden email][3]>:

>>>> Hi Dawid, I'm working with Max on the project Our code for the
>>>> TimestampAndWatermarkAssigner is: ``` class
>>>> TimestampAndWatermarkAssigner(val maxLateness: Long) extends
>>>> AssignerWithPeriodicWatermarks[Row] {
>>>>
>>>>    override def extractTimestamp(element: Row,
>>>>    previousElementTimestamp: Long): Long = {  element.minTime }
>>>>
>>>>    override def getCurrentWatermark(): Watermark = {  new
>>>>    Watermark(System.currentTimeMillis() - maxLateness) } } ```
>>>>
>>>>  Where Row is a class representing the incoming JSON object coming
>>>>  from Kafka, which includes the timestamp
>>>>
>>>>  Thanks, -Ethan
>>>>
>>>>
>>>>
>>>>  --
>>>>  View this message in context:
>>>>  http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12090.html
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive at Nabble.com.
>>>> 

>>> 

>>> 
>>> 

>>> 

>>> If you reply to this email, your message will be added to the
>>> discussion below:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12092.html
>>> 

>>> To unsubscribe from Issues with Event Time and Kafka, click here.
>>> NAML[4]
>>> 

>>
>> View this message in context:Re: Issues with Event Time and Kafka[5]
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive[6] at Nabble.com.



Links:

  1. http:///user/SendEmail.jtp?type=node&node=12139&i=0
  2. https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#event-time-and-watermarks
  3. http:///user/SendEmail.jtp?type=node&node=12092&i=0
  4. http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml
  5. http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12139.html
  6. http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Issues with Event Time and Kafka

Posted by Robert Metzger <rm...@apache.org>.
Hi Ethan,

how late elements (elements with event time after the watermark) are
handled depends on the operator. Flink's window operators will trigger a
single event window when they fall into the "allowed lateness" timeframe.
Otherwise, they are dropped.

On Thu, Mar 9, 2017 at 5:30 PM, ext.eformichella <
ext.eformichella@riotgames.com> wrote:

> Thanks for the suggestion, we can definitely try that out.
>
> My one concern there is that events technically can lag for days or even
> months in some cases, but we only care about including the events that lag
> for 30 minutes or so, and would like the further lagging events to be
> ignored - I just want to make sure that doesn't require special handling.
>
> I also just want to make sure I'm understanding the maximum lateness
> watermark correctly. Suppose a watermark gets generated, and then an
> element with an older timestamp is found. My understanding was that that
> element should be ignored, but from our results it looks like the late
> element actually overwrites the aggregate of the on-time elements. Is this
> expected behavior?
>
> Thank you for your help!
> -Ethan
>
> On Tue, Mar 7, 2017 at 6:01 PM, Dawid Wysakowicz [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=12139&i=0>> wrote:
>
>> Hi Ethan,
>>
>> I believe then it is because the Watermark and Timestamps in your
>> implementation are uncorrelated. What Watermark really is a marker that
>> says there will be no elements with timestamp smaller than the value of
>> this watermark. For more info on the concept see [1]
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#event-time-and-watermarks>
>> .
>>
>> In your case as you say that events can "lag" for 30 minutes, you should
>> try the BoundedOutOfOrdernessTimestampExtractor. It is designed exactly
>> for a case like yours.
>>
>> Regards,
>> Dawid
>>
>> 2017-03-07 22:33 GMT+01:00 ext.eformichella <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=12092&i=0>>:
>>
>>> Hi Dawid, I'm working with Max on the project
>>> Our code for the TimestampAndWatermarkAssigner is:
>>> ```
>>> class TimestampAndWatermarkAssigner(val maxLateness: Long) extends
>>> AssignerWithPeriodicWatermarks[Row] {
>>>
>>>   override def extractTimestamp(element: Row, previousElementTimestamp:
>>> Long): Long = {
>>>     element.minTime
>>>   }
>>>
>>>   override def getCurrentWatermark(): Watermark = {
>>>     new Watermark(System.currentTimeMillis() - maxLateness)
>>>   }
>>> }
>>> ```
>>>
>>> Where Row is a class representing the incoming JSON object coming from
>>> Kafka, which includes the timestamp
>>>
>>> Thanks,
>>> -Ethan
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-flink-user-maili
>>> ng-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time
>>> -and-Kafka-tp12061p12090.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12092.html
>> To unsubscribe from Issues with Event Time and Kafka, click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Issues with Event Time and Kafka
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12139.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Re: Issues with Event Time and Kafka

Posted by "ext.eformichella" <ex...@riotgames.com>.
Thanks for the suggestion, we can definitely try that out.

My one concern there is that events technically can lag for days or even
months in some cases, but we only care about including the events that lag
for 30 minutes or so, and would like the further lagging events to be
ignored - I just want to make sure that doesn't require special handling.

I also just want to make sure I'm understanding the maximum lateness
watermark correctly. Suppose a watermark gets generated, and then an
element with an older timestamp is found. My understanding was that that
element should be ignored, but from our results it looks like the late
element actually overwrites the aggregate of the on-time elements. Is this
expected behavior?

Thank you for your help!
-Ethan

On Tue, Mar 7, 2017 at 6:01 PM, Dawid Wysakowicz [via Apache Flink User
Mailing List archive.] <ml...@n4.nabble.com> wrote:

> Hi Ethan,
>
> I believe then it is because the Watermark and Timestamps in your
> implementation are uncorrelated. What Watermark really is a marker that
> says there will be no elements with timestamp smaller than the value of
> this watermark. For more info on the concept see [1]
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#event-time-and-watermarks>
> .
>
> In your case as you say that events can "lag" for 30 minutes, you should
> try the BoundedOutOfOrdernessTimestampExtractor. It is designed exactly
> for a case like yours.
>
> Regards,
> Dawid
>
> 2017-03-07 22:33 GMT+01:00 ext.eformichella <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=12092&i=0>>:
>
>> Hi Dawid, I'm working with Max on the project
>> Our code for the TimestampAndWatermarkAssigner is:
>> ```
>> class TimestampAndWatermarkAssigner(val maxLateness: Long) extends
>> AssignerWithPeriodicWatermarks[Row] {
>>
>>   override def extractTimestamp(element: Row, previousElementTimestamp:
>> Long): Long = {
>>     element.minTime
>>   }
>>
>>   override def getCurrentWatermark(): Watermark = {
>>     new Watermark(System.currentTimeMillis() - maxLateness)
>>   }
>> }
>> ```
>>
>> Where Row is a class representing the incoming JSON object coming from
>> Kafka, which includes the timestamp
>>
>> Thanks,
>> -Ethan
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Issues-with-Event-
>> Time-and-Kafka-tp12061p12090.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12092.html
> To unsubscribe from Issues with Event Time and Kafka, click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=12061&code=ZXh0LmVmb3JtaWNoZWxsYUByaW90Z2FtZXMuY29tfDEyMDYxfDk2NzY4OTQ2Mg==>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12139.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Issues with Event Time and Kafka

Posted by Dawid Wysakowicz <wy...@gmail.com>.
Hi Ethan,

I believe then it is because the Watermark and Timestamps in your
implementation are uncorrelated. What Watermark really is a marker that
says there will be no elements with timestamp smaller than the value of
this watermark. For more info on the concept see [1]
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#event-time-and-watermarks>
.

In your case as you say that events can "lag" for 30 minutes, you should
try the BoundedOutOfOrdernessTimestampExtractor. It is designed exactly for
a case like yours.

Regards,
Dawid

2017-03-07 22:33 GMT+01:00 ext.eformichella <ex...@riotgames.com>
:

> Hi Dawid, I'm working with Max on the project
> Our code for the TimestampAndWatermarkAssigner is:
> ```
> class TimestampAndWatermarkAssigner(val maxLateness: Long) extends
> AssignerWithPeriodicWatermarks[Row] {
>
>   override def extractTimestamp(element: Row, previousElementTimestamp:
> Long): Long = {
>     element.minTime
>   }
>
>   override def getCurrentWatermark(): Watermark = {
>     new Watermark(System.currentTimeMillis() - maxLateness)
>   }
> }
> ```
>
> Where Row is a class representing the incoming JSON object coming from
> Kafka, which includes the timestamp
>
> Thanks,
> -Ethan
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Issues-with-
> Event-Time-and-Kafka-tp12061p12090.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Issues with Event Time and Kafka

Posted by "ext.eformichella" <ex...@riotgames.com>.
Hi Dawid, I'm working with Max on the project
Our code for the TimestampAndWatermarkAssigner is:
```
class TimestampAndWatermarkAssigner(val maxLateness: Long) extends
AssignerWithPeriodicWatermarks[Row] {

  override def extractTimestamp(element: Row, previousElementTimestamp:
Long): Long = {
    element.minTime
  }

  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis() - maxLateness)
  }
}
```

Where Row is a class representing the incoming JSON object coming from
Kafka, which includes the timestamp

Thanks,
-Ethan



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12090.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Issues with Event Time and Kafka

Posted by Dawid Wysakowicz <wy...@gmail.com>.
Hi Max,
How do you assign timestamps to your events (in event-time case)? Could you
post whole code for your TimestampAndWatermarkAssigner?

Regards,
Dawid

2017-03-07 20:59 GMT+01:00 ext.mwalker <ex...@riotgames.com>:

> Hi Stephan,
>
> The right number of events seem to leave the source and enter the windows,
> but it shows that 0 exit the windows.
>
> Also I have tried 30 minutes and not setting the watermark interval, I am
> not sure what I am supposed to put there the docs seem vague about that.
>
> Best,
>
> Max
>
> On Tue, Mar 7, 2017 at 1:54 PM, Stephan Ewen [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=12087&i=0>> wrote:
>
>> Hi!
>>
>> At a first glance, your code looks correct to assign the Watermarks. What
>> is your watermark interval in the config?
>>
>> Can you check with the Flink metrics (if you are using 1.2) to see how
>> many rows leave the source, how many enter/leave the window operators, etc?
>>
>> That should help figuring out why there are so few result rows...
>>
>> Stephan
>>
>>
>> On Mon, Mar 6, 2017 at 8:57 PM, ext.mwalker <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=12084&i=0>> wrote:
>>
>>> Hi Folks,
>>>
>>> We are working on a Flink job to proccess a large amount of data coming
>>> in
>>> from a Kafka stream.
>>>
>>> We selected Flink because the data is sometimes out of order or late,
>>> and we
>>> need to roll up the data into 30-minutes event time windows, after which
>>> we
>>> are writing it back out to an s3 bucket.
>>>
>>> We have hit a couple issues:
>>>
>>> 1) The job works fine using processing time, but when we switch to event
>>> time (almost) nothing seems to be written out.
>>> Our watermark code looks like this:
>>> ```
>>>   override def getCurrentWatermark(): Watermark = {
>>>     new Watermark(System.currentTimeMillis() - maxLateness);
>>>   }
>>> ```
>>> And we are doing this:
>>> ```
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> ```
>>> and this:
>>> ```
>>>     .assignTimestampsAndWatermarks(new
>>> TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds))
>>> ```
>>>
>>> However even though we get millions of records per hour (the vast
>>> majority
>>> of which are no more that 30 minutes late) we get like 2 - 10 records per
>>> hour written out to the s3 bucket.
>>> We are using a custom BucketingFileSink Bucketer if folks believe that is
>>> the issue I would be happy to provide that code here as well.
>>>
>>> 2) On top of all this, we would really prefer to write the records
>>> directly
>>> to Aurora in RDS rather than to an intermediate s3 bucket, but it seems
>>> that
>>> the JDBC sink connector is unsupported / doesn't exist.
>>> If this is not the case we would love to know.
>>>
>>> Thanks in advance for all the help / insight on this,
>>>
>>> Max Walker
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-flink-user-maili
>>> ng-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time
>>> -and-Kafka-tp12061.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12084.html
>> To unsubscribe from Issues with Event Time and Kafka, click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Issues with Event Time and Kafka
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12087.html>
>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Re: Issues with Event Time and Kafka

Posted by "ext.mwalker" <ex...@riotgames.com>.
Hi Stephan,

The right number of events seem to leave the source and enter the windows,
but it shows that 0 exit the windows.

Also I have tried 30 minutes and not setting the watermark interval, I am
not sure what I am supposed to put there the docs seem vague about that.

Best,

Max

On Tue, Mar 7, 2017 at 1:54 PM, Stephan Ewen [via Apache Flink User Mailing
List archive.] <ml...@n4.nabble.com> wrote:

> Hi!
>
> At a first glance, your code looks correct to assign the Watermarks. What
> is your watermark interval in the config?
>
> Can you check with the Flink metrics (if you are using 1.2) to see how
> many rows leave the source, how many enter/leave the window operators, etc?
>
> That should help figuring out why there are so few result rows...
>
> Stephan
>
>
> On Mon, Mar 6, 2017 at 8:57 PM, ext.mwalker <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=12084&i=0>> wrote:
>
>> Hi Folks,
>>
>> We are working on a Flink job to proccess a large amount of data coming in
>> from a Kafka stream.
>>
>> We selected Flink because the data is sometimes out of order or late, and
>> we
>> need to roll up the data into 30-minutes event time windows, after which
>> we
>> are writing it back out to an s3 bucket.
>>
>> We have hit a couple issues:
>>
>> 1) The job works fine using processing time, but when we switch to event
>> time (almost) nothing seems to be written out.
>> Our watermark code looks like this:
>> ```
>>   override def getCurrentWatermark(): Watermark = {
>>     new Watermark(System.currentTimeMillis() - maxLateness);
>>   }
>> ```
>> And we are doing this:
>> ```
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> ```
>> and this:
>> ```
>>     .assignTimestampsAndWatermarks(new
>> TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds))
>> ```
>>
>> However even though we get millions of records per hour (the vast majority
>> of which are no more that 30 minutes late) we get like 2 - 10 records per
>> hour written out to the s3 bucket.
>> We are using a custom BucketingFileSink Bucketer if folks believe that is
>> the issue I would be happy to provide that code here as well.
>>
>> 2) On top of all this, we would really prefer to write the records
>> directly
>> to Aurora in RDS rather than to an intermediate s3 bucket, but it seems
>> that
>> the JDBC sink connector is unsupported / doesn't exist.
>> If this is not the case we would love to know.
>>
>> Thanks in advance for all the help / insight on this,
>>
>> Max Walker
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Issues-with-Event-
>> Time-and-Kafka-tp12061.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12084.html
> To unsubscribe from Issues with Event Time and Kafka, click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=12061&code=ZXh0Lm13YWxrZXJAcmlvdGdhbWVzLmNvbXwxMjA2MXwyMDg2Mjg2MjU0>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12087.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Issues with Event Time and Kafka

Posted by Stephan Ewen <se...@apache.org>.
Hi!

At a first glance, your code looks correct to assign the Watermarks. What
is your watermark interval in the config?

Can you check with the Flink metrics (if you are using 1.2) to see how many
rows leave the source, how many enter/leave the window operators, etc?

That should help figuring out why there are so few result rows...

Stephan


On Mon, Mar 6, 2017 at 8:57 PM, ext.mwalker <ex...@riotgames.com>
wrote:

> Hi Folks,
>
> We are working on a Flink job to proccess a large amount of data coming in
> from a Kafka stream.
>
> We selected Flink because the data is sometimes out of order or late, and
> we
> need to roll up the data into 30-minutes event time windows, after which we
> are writing it back out to an s3 bucket.
>
> We have hit a couple issues:
>
> 1) The job works fine using processing time, but when we switch to event
> time (almost) nothing seems to be written out.
> Our watermark code looks like this:
> ```
>   override def getCurrentWatermark(): Watermark = {
>     new Watermark(System.currentTimeMillis() - maxLateness);
>   }
> ```
> And we are doing this:
> ```
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> ```
> and this:
> ```
>     .assignTimestampsAndWatermarks(new
> TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds))
> ```
>
> However even though we get millions of records per hour (the vast majority
> of which are no more that 30 minutes late) we get like 2 - 10 records per
> hour written out to the s3 bucket.
> We are using a custom BucketingFileSink Bucketer if folks believe that is
> the issue I would be happy to provide that code here as well.
>
> 2) On top of all this, we would really prefer to write the records directly
> to Aurora in RDS rather than to an intermediate s3 bucket, but it seems
> that
> the JDBC sink connector is unsupported / doesn't exist.
> If this is not the case we would love to know.
>
> Thanks in advance for all the help / insight on this,
>
> Max Walker
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Issues-with-
> Event-Time-and-Kafka-tp12061.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>