You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2016/06/04 04:49:49 UTC

[GitHub] flink pull request #2071: [FLINK-4018](streaming-connectors) Configurable id...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/2071

    [FLINK-4018](streaming-connectors) Configurable idle time between getRecords requests to Kinesis shards

    Along with this new configuration and the already existing `KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET`, users will have more control on the desired throughput behaviour for the Kinesis consumer.
    
    The default value for this new configuration is 500 milliseconds idle time.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-4018

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2071.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2071
    
----
commit 74fcb62f62007c0396d2ae870298f344445ae5ce
Author: Gordon Tai <go...@vm5.com>
Date:   2016-06-04T04:43:30Z

    [FLINK-4018] Add configuration for idle time between get requests to Kinesis shards

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2071: [FLINK-4018][streaming-connectors] Configurable id...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2071#discussion_r68420387
  
    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThread.java ---
    @@ -105,6 +117,8 @@ public void run() {
     
     					break;
     				} else {
    +					Thread.sleep(idleMillisBetweenFetches);
    --- End diff --
    
    Agree. I'll change the default value of this to 0 in `KinesisConfigConstants`, and check value here before calling sleep.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2071: [FLINK-4018][streaming-connectors] Configurable id...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2071#discussion_r70064126
  
    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---
    @@ -155,6 +163,15 @@ public void run() {
     					// we can close this consumer thread once we've reached the end of the subscribed shard
     					break;
     				} else {
    +					if (fetchIntervalMillis != 0) {
    +						if (LOG.isDebugEnabled()) {
    +							LOG.debug(
    +								"Consumer {} of subtask {} is sleeping for {} milliseconds before fetching the next batch of records ...",
    --- End diff --
    
    Makes sense, I'll remove it!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2071: [FLINK-4018][streaming-connectors] Configurable idle time...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2071
  
    @rmetzger Rebased + addressed your comment. Please review, thanks :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2071: [FLINK-4018][streaming-connectors] Configurable idle time...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2071
  
    No problem :)
    +1 to merge once travis is green


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2071: [FLINK-4018][streaming-connectors] Configurable id...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2071#discussion_r68386857
  
    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThread.java ---
    @@ -105,6 +117,8 @@ public void run() {
     
     					break;
     				} else {
    +					Thread.sleep(idleMillisBetweenFetches);
    --- End diff --
    
    I'm not sure if its a good idea to introduce this waiting time by default.
    Imagine we are consuming slowly because downstream operations are expensive. Then, we would introduce an artificial slowdown (in latency and throughput). I would suggest to:
    - Set the value by default to 0
    - Only call sleep if the idle time is > 0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2071: [FLINK-4018][streaming-connectors] Configurable idle time...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2071
  
    @rmetzger Will you have time to help merge this PR too? I think the changes are good to go.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2071: [FLINK-4018][streaming-connectors] Configurable idle time...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2071
  
    @rmetzger Thanks for the quick review Robert.
    Comments are addressed, sorry for the sloppy execution on not validating input, should have remembered that :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2071: [FLINK-4018][streaming-connectors] Configurable id...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2071#discussion_r70059642
  
    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---
    @@ -155,6 +163,15 @@ public void run() {
     					// we can close this consumer thread once we've reached the end of the subscribed shard
     					break;
     				} else {
    +					if (fetchIntervalMillis != 0) {
    +						if (LOG.isDebugEnabled()) {
    +							LOG.debug(
    +								"Consumer {} of subtask {} is sleeping for {} milliseconds before fetching the next batch of records ...",
    --- End diff --
    
    I wonder if this log statement is really necessary. It can lead to quite a lot of log entries just for sleeping. (+ there is this other DEBUG log entry on each fetch)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2071: [FLINK-4018][streaming-connectors] Configurable idle time...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2071
  
    Sry, I forgot to merge it. Will do now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2071: [FLINK-4018][streaming-connectors] Configurable idle time...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2071
  
    I'll try to update this PR tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2071: [FLINK-4018][streaming-connectors] Configurable idle time...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2071
  
    No problem, thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2071: [FLINK-4018][streaming-connectors] Configurable id...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2071#discussion_r70065967
  
    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---
    @@ -155,6 +163,15 @@ public void run() {
     					// we can close this consumer thread once we've reached the end of the subscribed shard
     					break;
     				} else {
    +					if (fetchIntervalMillis != 0) {
    --- End diff --
    
    `KinesisConfigUtil.validateConfiguration()` will check for a negative value, at the client before the tasks are submitted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2071: [FLINK-4018][streaming-connectors] Configurable idle time...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2071
  
    Great, thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2071: [FLINK-4018][streaming-connectors] Configurable id...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2071#discussion_r70059792
  
    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---
    @@ -155,6 +163,15 @@ public void run() {
     					// we can close this consumer thread once we've reached the end of the subscribed shard
     					break;
     				} else {
    +					if (fetchIntervalMillis != 0) {
    --- End diff --
    
    Yup. Adding the validation to `KinesisConfigUtil.validateConfiguration()`, like the other values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2071: [FLINK-4018][streaming-connectors] Configurable id...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2071#discussion_r70059586
  
    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---
    @@ -155,6 +163,15 @@ public void run() {
     					// we can close this consumer thread once we've reached the end of the subscribed shard
     					break;
     				} else {
    +					if (fetchIntervalMillis != 0) {
    --- End diff --
    
    There is no input type validation happening, right?
    So if a user sets a negative sleeping interval, it will probably fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2071: [FLINK-4018][streaming-connectors] Configurable id...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2071


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2071: [FLINK-4018][streaming-connectors] Configurable idle time...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2071
  
    Yes, I would suggest to do this PR after the big one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2071: [FLINK-4018][streaming-connectors] Configurable idle time...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2071
  
    Thank you for reviewing this @rmetzger. Should I address your comment after the rework in https://github.com/apache/flink/pull/2131 is merged, and rebase this PR on that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---