You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2016/07/08 00:57:09 UTC

[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/2214

    [FLINK-4019][kinesis-connector] Use Kinesis records' approximateArrivalTimestamp

    This Kinesis-provided timestamp is used in the following:
    1) Exposed through the KinesisDeserializationSchema for users to obtain
    2) Attached to records as the default event time timestamp

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-4019

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2214.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2214
    
----
commit c71666f53dd666d11f85b0303b586cbb38175963
Author: Gordon Tai <tz...@gmail.com>
Date:   2016-07-08T00:46:31Z

    [FLINK-4019][kinesis-connector] Use Kinesis records' approximateArrivalTimestamp
    
    Used in the following:
    1) Exposed through the KinesisDeserializationSchema for users to obtain
    2) Attatched to records as the default event time

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2214: [FLINK-4019][kinesis-connector] Use Kinesis records' appr...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2214
  
    @rmetzger can you also help review this PR for the Kinesis connector too? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2214#discussion_r70164370
  
    --- Diff: docs/apis/streaming/connectors/kinesis.md ---
    @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing slots ar
     Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
     Flink on YARN supports automatic restart of lost YARN containers.
     
    +#### Event Time for Consumed Records
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
    +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they
    +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side
    +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be
    +ascending).
    +
    +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html),
    +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so,
    +it can be passed to the consumer in the following way:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    +    "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig));
    +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
    --- End diff --
    
    Oh I see, right :) Okay! Thanks for fixing it for me!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2214#discussion_r70059282
  
    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ---
    @@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() {
     	 * This method is called by {@link ShardConsumer}s.
     	 *
     	 * @param record the record to collect
    +	 * @param recordTimestamp timestamp to attach to the collected record
     	 * @param shardStateIndex index of the shard to update in subscribedShardsState;
     	 *                        this index should be the returned value from
     	 *                        {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
     	 *                        when the shard state was registered.
     	 * @param lastSequenceNumber the last sequence number value to update
     	 */
    -	protected void emitRecordAndUpdateState(T record, int shardStateIndex, SequenceNumber lastSequenceNumber) {
    +	protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
     		synchronized (checkpointLock) {
     			sourceContext.collect(record);
    --- End diff --
    
    Did you also consider passing the record to the `sourceContext.collectWithTimestamp()` method in addition to passing it to the serialization schema?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2214: [FLINK-4019][kinesis-connector] Use Kinesis records' appr...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2214
  
    @rmetzger Thank you for the review. Comments addressed :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2214#discussion_r70161839
  
    --- Diff: docs/apis/streaming/connectors/kinesis.md ---
    @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing slots ar
     Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
     Flink on YARN supports automatic restart of lost YARN containers.
     
    +#### Event Time for Consumed Records
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
    +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they
    +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side
    +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be
    +ascending).
    +
    +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html),
    +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so,
    +it can be passed to the consumer in the following way:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    +    "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig));
    +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
    --- End diff --
    
    Thanks Robert.
    I'm not quite sure about the problem with using `assignTimestampsAndWatermarks()` here, can you explain? I looked at the code, and from my understanding there's not much difference  with `assignTimestamps()` except that `assignTimestamps()` is deprecated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2214: [FLINK-4019][kinesis-connector] Use Kinesis records' appr...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2214
  
    +1 to merge once travis is green.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2214#discussion_r70059358
  
    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ---
    @@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() {
     	 * This method is called by {@link ShardConsumer}s.
     	 *
     	 * @param record the record to collect
    +	 * @param recordTimestamp timestamp to attach to the collected record
     	 * @param shardStateIndex index of the shard to update in subscribedShardsState;
     	 *                        this index should be the returned value from
     	 *                        {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
     	 *                        when the shard state was registered.
     	 * @param lastSequenceNumber the last sequence number value to update
     	 */
    -	protected void emitRecordAndUpdateState(T record, int shardStateIndex, SequenceNumber lastSequenceNumber) {
    +	protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
     		synchronized (checkpointLock) {
     			sourceContext.collect(record);
    --- End diff --
    
    Ah, sorry, this is actually wrong. Should be using `sourceContext.collectWithTimestamp()`, missed this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2214: [FLINK-4019][kinesis-connector] Use Kinesis records' appr...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2214
  
    Rebasing ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2214#discussion_r70058907
  
    --- Diff: docs/apis/streaming/connectors/kinesis.md ---
    @@ -146,6 +146,29 @@ Also note that Flink can only restart the topology if enough processing slots ar
     Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
     Flink on YARN supports automatic restart of lost YARN containers.
     
    +#### Event Time for Consumed Records
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
    +timestamps, an *approximate arrival timestamp* will be used. This timestamp is attached to records by Kinesis once they
    +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side
    +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be
    +ascending).
    --- End diff --
    
    Maybe we can also add a sentence saying that users can override the timestamp if they want to extract their own event-time timestamp.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2214#discussion_r70063862
  
    --- Diff: docs/apis/streaming/connectors/kinesis.md ---
    @@ -146,6 +146,29 @@ Also note that Flink can only restart the topology if enough processing slots ar
     Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
     Flink on YARN supports automatic restart of lost YARN containers.
     
    +#### Event Time for Consumed Records
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
    +timestamps, an *approximate arrival timestamp* will be used. This timestamp is attached to records by Kinesis once they
    +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side
    +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be
    +ascending).
    --- End diff --
    
    Good point. Added!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2214


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2214#discussion_r70103527
  
    --- Diff: docs/apis/streaming/connectors/kinesis.md ---
    @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing slots ar
     Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
     Flink on YARN supports automatic restart of lost YARN containers.
     
    +#### Event Time for Consumed Records
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
    +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they
    +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side
    +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be
    +ascending).
    +
    +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html),
    +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so,
    +it can be passed to the consumer in the following way:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    +    "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig));
    +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
    --- End diff --
    
    There is one minor thing here, you have to do kinesis = kinesis.assignTS() in order to work properly.
    But I'll fix it while merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2214#discussion_r70164121
  
    --- Diff: docs/apis/streaming/connectors/kinesis.md ---
    @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing slots ar
     Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
     Flink on YARN supports automatic restart of lost YARN containers.
     
    +#### Event Time for Consumed Records
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
    +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they
    +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side
    +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be
    +ascending).
    +
    +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html),
    +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so,
    +it can be passed to the consumer in the following way:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    +    "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig));
    +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
    --- End diff --
    
    Ah, sorry, its not about the method name. 
    I meant `kinesis = kinesis.assignTimestampsAndWatermarks()`.
    The problem is that the `assignTimestampsAndWatermarks()` returns a stream with assigned timestamps.
    So doing
    ```java
    kinesis.assignTimestampsAndWatermarks();
    kinesis.timeWindow(); // <-- this time window won't get the watermarks
    ```
    won't work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2214: [FLINK-4019][kinesis-connector] Use Kinesis records' appr...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2214
  
    Thx for rebasing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---