You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Kailash Dayanand <kd...@lyft.com> on 2018/03/01 06:38:32 UTC

Proposal - Change shard discovery in Flink Kinesis Connector to use ListShards

Based on the discussion at here <https://lists.apache.org/thread.html/7b5f8034127c303414927232835990c2573ea63abda9dd9b8f2a261f@%3Cdev.flink.apache.org%3E>, I want to propose using the latest ListShards API instead of the DescribeStreams on AWS to overcome the rate limits currently imposed on DescribeStream. The new List Shards have a much higher rate limits (a limit of 100 transactions per second per data stream link <https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html>). This was recently introduced in the aws-sdk-java release of 1.11.272 <https://github.com/aws/aws-sdk-java/releases/tag/1.11.272>. I propose bumping up the aws-sdk-java used in flink-kinesis connector and replace the DescribeStream calls with ListShards in the KinesisProxy class here <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java> allowing for faster shard discovery rate. 

Thanks
Kailash

Re: Proposal - Change shard discovery in Flink Kinesis Connector to use ListShards

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Btw, could you also open a JIRA to track this improvement? Thanks!


On 5 March 2018 at 5:47:09 PM, Tzu-Li (Gordon) Tai (tzulitai@apache.org) wrote:

Hi,

+1, I think the new ListShards API should provide all the information we need.

In the past, we were restricted to update the SDK version because it would break state compatibility (the consumer used to write AWS classes inside state).
I think this was fixed since Flink 1.3, so it should be safe to upgrade the SDK version.

+1 to the checks that Bowen recommended. To be on the safer side, checking that migrating from a Flink 1.3+ savepoint succeeds after the SDK upgrade would also be good.

Cheers,
Gordon

On 5 March 2018 at 3:39:15 PM, Bowen Li (bowenli86@gmail.com) wrote:

If ListShards() gives all the info that Flink needs, +1 on switching.
DescribeStreams() has a limitation of 5 requests/sec, which is pretty
bad....

But, I believe the goal of switching APIs should be *making Flink jobs that
read from Kinesis more stable*, rather than having faster shard discovery
rate. The default shard discovery rate is every 10s, which is already very
very fast and can satisfy most Kinesis users, we shouldn't shorten the
default value anymore. Developers who want faster discovery rate than 10s
should overwrite the default value themselves.


I last bumped up AWS SDK version in flink-connector-kinesis. So from my
experience, I'd recommend checking:

1. make sure the new SDK works with both KCL and KPL in Flink.
Unfortunately, AWS SDK versions in KCL and KPL are not aligned well...
1. 1 If necessary, you need to bump their versions too
2. might need to test it on AWS EMR. AWS SDK is of 1.11.267 in latest
EMR (5.12.0), need to test compatibility.


Bowen



On Fri, Mar 2, 2018 at 8:47 AM, Thomas Weise <th...@apache.org> wrote:

> It will be good to be able to use the ListShards API. Are there any
> concerns bumping up the AWS SDK dependency? I see it was last done in
> https://issues.apache.org/jira/browse/FLINK-7422
>
> Thanks
>
> On Wed, Feb 28, 2018 at 10:38 PM, Kailash Dayanand <kd...@lyft.com>
> wrote:
>
> > Based on the discussion at here
> > <https://lists.apache.org/thread.html/7b5f8034127c303414927232835990
> c2573ea63abda9dd9b8f2a261f@%3Cdev.flink.apache.org%3E>,
> > I want to propose using the latest ListShards API instead of the
> > DescribeStreams on AWS to overcome the rate limits currently imposed on
> > DescribeStream. The new List Shards have a much higher rate limits (a
> > limit of 100 transactions per second per data stream link
> > <https://docs.aws.amazon.com/kinesis/latest/APIReference/
> API_ListShards.html>).
> > This was recently introduced in the aws-sdk-java release of 1.11.272
> > <https://github.com/aws/aws-sdk-java/releases/tag/1.11.272>. I propose
> > bumping up the aws-sdk-java used in flink-kinesis connector and replace
> the
> > DescribeStream calls with ListShards in the KinesisProxy class here
> > <https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-kinesis/src/main/java/org/
> apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java>
> allowing
> > for faster shard discovery rate.
> >
> > Thanks
> > Kailash
> >
>

Re: Proposal - Change shard discovery in Flink Kinesis Connector to use ListShards

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

+1, I think the new ListShards API should provide all the information we need.

In the past, we were restricted to update the SDK version because it would break state compatibility (the consumer used to write AWS classes inside state).
I think this was fixed since Flink 1.3, so it should be safe to upgrade the SDK version.

+1 to the checks that Bowen recommended. To be on the safer side, checking that migrating from a Flink 1.3+ savepoint succeeds after the SDK upgrade would also be good.

Cheers,
Gordon

On 5 March 2018 at 3:39:15 PM, Bowen Li (bowenli86@gmail.com) wrote:

If ListShards() gives all the info that Flink needs, +1 on switching. 
DescribeStreams() has a limitation of 5 requests/sec, which is pretty 
bad.... 

But, I believe the goal of switching APIs should be *making Flink jobs that 
read from Kinesis more stable*, rather than having faster shard discovery 
rate. The default shard discovery rate is every 10s, which is already very 
very fast and can satisfy most Kinesis users, we shouldn't shorten the 
default value anymore. Developers who want faster discovery rate than 10s 
should overwrite the default value themselves. 


I last bumped up AWS SDK version in flink-connector-kinesis. So from my 
experience, I'd recommend checking: 

1. make sure the new SDK works with both KCL and KPL in Flink. 
Unfortunately, AWS SDK versions in KCL and KPL are not aligned well... 
1. 1 If necessary, you need to bump their versions too 
2. might need to test it on AWS EMR. AWS SDK is of 1.11.267 in latest 
EMR (5.12.0), need to test compatibility. 


Bowen 



On Fri, Mar 2, 2018 at 8:47 AM, Thomas Weise <th...@apache.org> wrote: 

> It will be good to be able to use the ListShards API. Are there any 
> concerns bumping up the AWS SDK dependency? I see it was last done in 
> https://issues.apache.org/jira/browse/FLINK-7422 
> 
> Thanks 
> 
> On Wed, Feb 28, 2018 at 10:38 PM, Kailash Dayanand <kd...@lyft.com> 
> wrote: 
> 
> > Based on the discussion at here 
> > <https://lists.apache.org/thread.html/7b5f8034127c303414927232835990 
> c2573ea63abda9dd9b8f2a261f@%3Cdev.flink.apache.org%3E>, 
> > I want to propose using the latest ListShards API instead of the 
> > DescribeStreams on AWS to overcome the rate limits currently imposed on 
> > DescribeStream. The new List Shards have a much higher rate limits (a 
> > limit of 100 transactions per second per data stream link 
> > <https://docs.aws.amazon.com/kinesis/latest/APIReference/ 
> API_ListShards.html>). 
> > This was recently introduced in the aws-sdk-java release of 1.11.272 
> > <https://github.com/aws/aws-sdk-java/releases/tag/1.11.272>. I propose 
> > bumping up the aws-sdk-java used in flink-kinesis connector and replace 
> the 
> > DescribeStream calls with ListShards in the KinesisProxy class here 
> > <https://github.com/apache/flink/blob/master/flink- 
> connectors/flink-connector-kinesis/src/main/java/org/ 
> apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java> 
> allowing 
> > for faster shard discovery rate. 
> > 
> > Thanks 
> > Kailash 
> > 
> 

Re:Re: Incorrect Rest URL

Posted by mingleizhang <zm...@163.com>.

Hello, Gary.


Sorry, I forget tell you the flink version i used. The version i have been using is 1.3.2. I didnt watch the url that under 1.3.2. 


By the way, I will take a look on the new bug.


Best
Minglei







At 2018-03-08 16:29:34, "Gary Yao" <ga...@data-artisans.com> wrote:
>Hi Minglei,
>
>What Flink version are you using? The path was only recently changed to
>/jobs/overview [1], and you linked to the documentation of the snapshot
>version.
>While I tested the feature, I discovered another bug in 1.5, though [2].
>
>Best,
>Gary
>
>[1] https://issues.apache.org/jira/browse/FLINK-7806
>[2] https://issues.apache.org/jira/browse/FLINK-8894
>
>
>On Wed, Mar 7, 2018 at 4:18 AM, mingleizhang <zm...@163.com> wrote:
>
>>
>>
>> I expect it response me like the following as the doc say. But does not.
>>
>>
>>
>> Sample Result:
>>
>> {
>>   "jobs":[
>>     {
>>       "jid": "7684be6004e4e955c2a558a9bc463f65",
>>       "name": "Flink Java Job at Wed Sep 16 18:08:21 CEST 2015",
>>       "state": "FINISHED",
>>       "start-time": 1442419702857,
>>       "end-time": 1442419975312,
>>       "duration":272455,
>>       "last-modification": 1442419975312,
>>       "tasks": {
>>          "total": 6,
>>          "pending": 0,
>>          "running": 0,
>>          "finished": 6,
>>          "canceling": 0,
>>          "canceled": 0,
>>          "failed": 0
>>       }
>>     },
>>     {
>>       "jid": "49306f94d0920216b636e8dd503a6409",
>>       "name": "Flink Java Job at Wed Sep 16 18:16:39 CEST 2015",
>>       ...
>>     }]
>> }
>>
>>
>>
>>
>>
>>
>>
>> At 2018-03-07 11:14:11, "mingleizhang" <zm...@163.com> wrote:
>> >I am not sure about whether it is a incorrect rest url, but i think it is.
>> >
>> >
>> >https://ci.apache.org/projects/flink/flink-docs-
>> master/monitoring/rest_api.html
>> >Overview of Jobs
>> >/jobs/overview
>> >
>> >And I access the url with :
>> >
>> >http://10.201.202.217:9081//jobs/overview
>> >
>> >It gives me :
>> >
>> >java.lang.RuntimeException: Invalid JobID string 'overview': contains
>> illegal character for hexBinary: overview
>> >
>> >       at org.apache.flink.runtime.webmonitor.handlers.
>> AbstractExecutionGraphRequestHandler.handleJsonRequest(
>> AbstractExecutionGraphRequestHandler.java:53)
>> >       at org.apache.flink.runtime.webmonitor.handlers.
>> AbstractJsonRequestHandler.handleRequest(AbstractJsonRequestHandler.
>> java:41)
>> >       at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.
>> respondAsLeader(RuntimeMonitorHandler.java:109)
>> >       at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.
>> channelRead0(RuntimeMonitorHandlerBase.java:97)
>> >       at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.
>> channelRead0(RuntimeMonitorHandlerBase.java:44)
>> >       at io.netty.channel.SimpleChannelInboundHandler.channelRead(
>> SimpleChannelInboundHandler.java:105)
>> >       at io.netty.channel.AbstractChannelHandlerContext.
>> invokeChannelRead(AbstractChannelHandlerContext.java:339)
>> >       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
>> AbstractChannelHandlerContext.java:324)
>> >       at io.netty.handler.codec.http.router.Handler.routed(Handler.
>> java:62)
>> >       at io.netty.handler.codec.http.router.DualAbstractHandler.
>> channelRead0(DualAbstractHandler.java:57)
>> >       at io.netty.handler.codec.http.router.DualAbstractHandler.
>> channelRead0(DualAbstractHandler.java:20)
>> >       at io.netty.channel.SimpleChannelInboundHandler.channelRead(
>> SimpleChannelInboundHandler.java:105)
>> >       at io.netty.channel.AbstractChannelHandlerContext.
>> invokeChannelRead(AbstractChannelHandlerContext.java:339)
>> >       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
>> AbstractChannelHandlerContext.java:324)
>> >       at org.apache.flink.runtime.webmonitor.HttpRequestHandler.
>> channelRead0(HttpRequestHandler.java:105)
>> >       at org.apache.flink.runtime.webmonitor.HttpRequestHandler.
>> channelRead0(HttpRequestHandler.java:65)
>> >       at io.netty.channel.SimpleChannelInboundHandler.channelRead(
>> SimpleChannelInboundHandler.java:105)
>> >       at io.netty.channel.AbstractChannelHandlerContext.
>> invokeChannelRead(AbstractChannelHandlerContext.java:339)
>> >       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
>> AbstractChannelHandlerContext.java:324)
>> >       at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(
>> ChannelInboundHandlerAdapter.java:86)
>> >       at io.netty.channel.AbstractChannelHandlerContext.
>> invokeChannelRead(AbstractChannelHandlerContext.java:339)
>> >       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
>> AbstractChannelHandlerContext.java:324)
>> >       at io.netty.handler.codec.ByteToMessageDecoder.channelRead(
>> ByteToMessageDecoder.java:242)
>> >
>> >
>> >Other than that, all works well.
>> >http://10.201.202.217:9081///config works fine.
>> >http://10.201.202.217:9081////overview works fine.
>> >
>> >
>> >thanks
>> >Minglei.
>> >
>> >
>> >
>> >
>> >
>>

Re:Re: Incorrect Rest URL

Posted by mingleizhang <zm...@163.com>.
I can get the value with /joboverview under the 1.3.2. Thanks ~








At 2018-03-08 16:29:34, "Gary Yao" <ga...@data-artisans.com> wrote:
>Hi Minglei,
>
>What Flink version are you using? The path was only recently changed to
>/jobs/overview [1], and you linked to the documentation of the snapshot
>version.
>While I tested the feature, I discovered another bug in 1.5, though [2].
>
>Best,
>Gary
>
>[1] https://issues.apache.org/jira/browse/FLINK-7806
>[2] https://issues.apache.org/jira/browse/FLINK-8894
>
>
>On Wed, Mar 7, 2018 at 4:18 AM, mingleizhang <zm...@163.com> wrote:
>
>>
>>
>> I expect it response me like the following as the doc say. But does not.
>>
>>
>>
>> Sample Result:
>>
>> {
>>   "jobs":[
>>     {
>>       "jid": "7684be6004e4e955c2a558a9bc463f65",
>>       "name": "Flink Java Job at Wed Sep 16 18:08:21 CEST 2015",
>>       "state": "FINISHED",
>>       "start-time": 1442419702857,
>>       "end-time": 1442419975312,
>>       "duration":272455,
>>       "last-modification": 1442419975312,
>>       "tasks": {
>>          "total": 6,
>>          "pending": 0,
>>          "running": 0,
>>          "finished": 6,
>>          "canceling": 0,
>>          "canceled": 0,
>>          "failed": 0
>>       }
>>     },
>>     {
>>       "jid": "49306f94d0920216b636e8dd503a6409",
>>       "name": "Flink Java Job at Wed Sep 16 18:16:39 CEST 2015",
>>       ...
>>     }]
>> }
>>
>>
>>
>>
>>
>>
>>
>> At 2018-03-07 11:14:11, "mingleizhang" <zm...@163.com> wrote:
>> >I am not sure about whether it is a incorrect rest url, but i think it is.
>> >
>> >
>> >https://ci.apache.org/projects/flink/flink-docs-
>> master/monitoring/rest_api.html
>> >Overview of Jobs
>> >/jobs/overview
>> >
>> >And I access the url with :
>> >
>> >http://10.201.202.217:9081//jobs/overview
>> >
>> >It gives me :
>> >
>> >java.lang.RuntimeException: Invalid JobID string 'overview': contains
>> illegal character for hexBinary: overview
>> >
>> >       at org.apache.flink.runtime.webmonitor.handlers.
>> AbstractExecutionGraphRequestHandler.handleJsonRequest(
>> AbstractExecutionGraphRequestHandler.java:53)
>> >       at org.apache.flink.runtime.webmonitor.handlers.
>> AbstractJsonRequestHandler.handleRequest(AbstractJsonRequestHandler.
>> java:41)
>> >       at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.
>> respondAsLeader(RuntimeMonitorHandler.java:109)
>> >       at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.
>> channelRead0(RuntimeMonitorHandlerBase.java:97)
>> >       at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.
>> channelRead0(RuntimeMonitorHandlerBase.java:44)
>> >       at io.netty.channel.SimpleChannelInboundHandler.channelRead(
>> SimpleChannelInboundHandler.java:105)
>> >       at io.netty.channel.AbstractChannelHandlerContext.
>> invokeChannelRead(AbstractChannelHandlerContext.java:339)
>> >       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
>> AbstractChannelHandlerContext.java:324)
>> >       at io.netty.handler.codec.http.router.Handler.routed(Handler.
>> java:62)
>> >       at io.netty.handler.codec.http.router.DualAbstractHandler.
>> channelRead0(DualAbstractHandler.java:57)
>> >       at io.netty.handler.codec.http.router.DualAbstractHandler.
>> channelRead0(DualAbstractHandler.java:20)
>> >       at io.netty.channel.SimpleChannelInboundHandler.channelRead(
>> SimpleChannelInboundHandler.java:105)
>> >       at io.netty.channel.AbstractChannelHandlerContext.
>> invokeChannelRead(AbstractChannelHandlerContext.java:339)
>> >       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
>> AbstractChannelHandlerContext.java:324)
>> >       at org.apache.flink.runtime.webmonitor.HttpRequestHandler.
>> channelRead0(HttpRequestHandler.java:105)
>> >       at org.apache.flink.runtime.webmonitor.HttpRequestHandler.
>> channelRead0(HttpRequestHandler.java:65)
>> >       at io.netty.channel.SimpleChannelInboundHandler.channelRead(
>> SimpleChannelInboundHandler.java:105)
>> >       at io.netty.channel.AbstractChannelHandlerContext.
>> invokeChannelRead(AbstractChannelHandlerContext.java:339)
>> >       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
>> AbstractChannelHandlerContext.java:324)
>> >       at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(
>> ChannelInboundHandlerAdapter.java:86)
>> >       at io.netty.channel.AbstractChannelHandlerContext.
>> invokeChannelRead(AbstractChannelHandlerContext.java:339)
>> >       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
>> AbstractChannelHandlerContext.java:324)
>> >       at io.netty.handler.codec.ByteToMessageDecoder.channelRead(
>> ByteToMessageDecoder.java:242)
>> >
>> >
>> >Other than that, all works well.
>> >http://10.201.202.217:9081///config works fine.
>> >http://10.201.202.217:9081////overview works fine.
>> >
>> >
>> >thanks
>> >Minglei.
>> >
>> >
>> >
>> >
>> >
>>

Re: Incorrect Rest URL

Posted by Gary Yao <ga...@data-artisans.com>.
Hi Minglei,

What Flink version are you using? The path was only recently changed to
/jobs/overview [1], and you linked to the documentation of the snapshot
version.
While I tested the feature, I discovered another bug in 1.5, though [2].

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-7806
[2] https://issues.apache.org/jira/browse/FLINK-8894


On Wed, Mar 7, 2018 at 4:18 AM, mingleizhang <zm...@163.com> wrote:

>
>
> I expect it response me like the following as the doc say. But does not.
>
>
>
> Sample Result:
>
> {
>   "jobs":[
>     {
>       "jid": "7684be6004e4e955c2a558a9bc463f65",
>       "name": "Flink Java Job at Wed Sep 16 18:08:21 CEST 2015",
>       "state": "FINISHED",
>       "start-time": 1442419702857,
>       "end-time": 1442419975312,
>       "duration":272455,
>       "last-modification": 1442419975312,
>       "tasks": {
>          "total": 6,
>          "pending": 0,
>          "running": 0,
>          "finished": 6,
>          "canceling": 0,
>          "canceled": 0,
>          "failed": 0
>       }
>     },
>     {
>       "jid": "49306f94d0920216b636e8dd503a6409",
>       "name": "Flink Java Job at Wed Sep 16 18:16:39 CEST 2015",
>       ...
>     }]
> }
>
>
>
>
>
>
>
> At 2018-03-07 11:14:11, "mingleizhang" <zm...@163.com> wrote:
> >I am not sure about whether it is a incorrect rest url, but i think it is.
> >
> >
> >https://ci.apache.org/projects/flink/flink-docs-
> master/monitoring/rest_api.html
> >Overview of Jobs
> >/jobs/overview
> >
> >And I access the url with :
> >
> >http://10.201.202.217:9081//jobs/overview
> >
> >It gives me :
> >
> >java.lang.RuntimeException: Invalid JobID string 'overview': contains
> illegal character for hexBinary: overview
> >
> >       at org.apache.flink.runtime.webmonitor.handlers.
> AbstractExecutionGraphRequestHandler.handleJsonRequest(
> AbstractExecutionGraphRequestHandler.java:53)
> >       at org.apache.flink.runtime.webmonitor.handlers.
> AbstractJsonRequestHandler.handleRequest(AbstractJsonRequestHandler.
> java:41)
> >       at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.
> respondAsLeader(RuntimeMonitorHandler.java:109)
> >       at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.
> channelRead0(RuntimeMonitorHandlerBase.java:97)
> >       at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.
> channelRead0(RuntimeMonitorHandlerBase.java:44)
> >       at io.netty.channel.SimpleChannelInboundHandler.channelRead(
> SimpleChannelInboundHandler.java:105)
> >       at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelRead(AbstractChannelHandlerContext.java:339)
> >       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
> >       at io.netty.handler.codec.http.router.Handler.routed(Handler.
> java:62)
> >       at io.netty.handler.codec.http.router.DualAbstractHandler.
> channelRead0(DualAbstractHandler.java:57)
> >       at io.netty.handler.codec.http.router.DualAbstractHandler.
> channelRead0(DualAbstractHandler.java:20)
> >       at io.netty.channel.SimpleChannelInboundHandler.channelRead(
> SimpleChannelInboundHandler.java:105)
> >       at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelRead(AbstractChannelHandlerContext.java:339)
> >       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
> >       at org.apache.flink.runtime.webmonitor.HttpRequestHandler.
> channelRead0(HttpRequestHandler.java:105)
> >       at org.apache.flink.runtime.webmonitor.HttpRequestHandler.
> channelRead0(HttpRequestHandler.java:65)
> >       at io.netty.channel.SimpleChannelInboundHandler.channelRead(
> SimpleChannelInboundHandler.java:105)
> >       at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelRead(AbstractChannelHandlerContext.java:339)
> >       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
> >       at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(
> ChannelInboundHandlerAdapter.java:86)
> >       at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelRead(AbstractChannelHandlerContext.java:339)
> >       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
> >       at io.netty.handler.codec.ByteToMessageDecoder.channelRead(
> ByteToMessageDecoder.java:242)
> >
> >
> >Other than that, all works well.
> >http://10.201.202.217:9081///config works fine.
> >http://10.201.202.217:9081////overview works fine.
> >
> >
> >thanks
> >Minglei.
> >
> >
> >
> >
> >
>

Re:Incorrect Rest URL

Posted by mingleizhang <zm...@163.com>.

I expect it response me like the following as the doc say. But does not.



Sample Result:

{
  "jobs":[
    {
      "jid": "7684be6004e4e955c2a558a9bc463f65",
      "name": "Flink Java Job at Wed Sep 16 18:08:21 CEST 2015",
      "state": "FINISHED",
      "start-time": 1442419702857,
      "end-time": 1442419975312,
      "duration":272455,
      "last-modification": 1442419975312,
      "tasks": {
         "total": 6,
         "pending": 0,
         "running": 0,
         "finished": 6,
         "canceling": 0,
         "canceled": 0,
         "failed": 0
      }
    },
    {
      "jid": "49306f94d0920216b636e8dd503a6409",
      "name": "Flink Java Job at Wed Sep 16 18:16:39 CEST 2015",
      ...
    }]
}







At 2018-03-07 11:14:11, "mingleizhang" <zm...@163.com> wrote:
>I am not sure about whether it is a incorrect rest url, but i think it is.
>
>
>https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html
>Overview of Jobs
>/jobs/overview
>
>And I access the url with : 
>
>http://10.201.202.217:9081//jobs/overview
>
>It gives me : 
>
>java.lang.RuntimeException: Invalid JobID string 'overview': contains illegal character for hexBinary: overview
>
>	at org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler.handleJsonRequest(AbstractExecutionGraphRequestHandler.java:53)
>	at org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler.handleRequest(AbstractJsonRequestHandler.java:41)
>	at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:109)
>	at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:97)
>	at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44)
>	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>	at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
>	at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
>	at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
>	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>	at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105)
>	at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
>	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
>	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>
>
>Other than that, all works well.
>http://10.201.202.217:9081///config works fine.
>http://10.201.202.217:9081////overview works fine.
>
>
>thanks
>Minglei.  
>
>
>
>
>

Incorrect Rest URL

Posted by mingleizhang <zm...@163.com>.
I am not sure about whether it is a incorrect rest url, but i think it is.


https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html
Overview of Jobs
/jobs/overview

And I access the url with : 

http://10.201.202.217:9081//jobs/overview

It gives me : 

java.lang.RuntimeException: Invalid JobID string 'overview': contains illegal character for hexBinary: overview

	at org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler.handleJsonRequest(AbstractExecutionGraphRequestHandler.java:53)
	at org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler.handleRequest(AbstractJsonRequestHandler.java:41)
	at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:109)
	at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:97)
	at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
	at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
	at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
	at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
	at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105)
	at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)


Other than that, all works well.
http://10.201.202.217:9081///config works fine.
http://10.201.202.217:9081////overview works fine.


thanks
Minglei.  






Re: Proposal - Change shard discovery in Flink Kinesis Connector to use ListShards

Posted by Kailash Dayanand <kd...@lyft.com>.
Hello Bowen,

Thank you very much for your feedback. I had a question with regard to your
concern on compatibilities issues

I notice that aws-java-sdk imported in the flink connector is shaded. So it
is not likely to cause conflicts in classpath due to AWS EMR depending on
a older version of the SDK.

Also KCL and KPL are depending on the aws-java-sdk. Since minimum required
aws-java-sdk version for KCL and KPL is earlier than the the proposed
version, it should not be a problem. I checked for if there are any changes
breaking the backward compatibility between the new version and minimum
version required by KCL and KPL. Looks like there are not any such changes.

Do you think there is something else which I am missing here?

Thanks
Kailash



On Mon, Mar 5, 2018 at 3:20 PM, Bowen Li <bo...@gmail.com> wrote:

> Good for AWS has increased the limit from 5txn/s last year to 10txn/s
>
> On Mon, Mar 5, 2018 at 8:32 AM, Thomas Weise <th...@apache.org> wrote:
>
> > On Sun, Mar 4, 2018 at 11:38 PM, Bowen Li <bo...@gmail.com> wrote:
> >
> > > If ListShards() gives all the info that Flink needs, +1 on switching.
> > > DescribeStreams() has a limitation of 5 requests/sec, which is pretty
> > > bad....
> > >
> > > But, I believe the goal of switching APIs should be *making Flink jobs
> > that
> > > read from Kinesis more stable*, rather than having faster shard
> discovery
> > > rate. The default shard discovery rate is every 10s, which is already
> > very
> > > very fast and can satisfy most Kinesis users, we shouldn't shorten the
> > > default value anymore. Developers who want faster discovery rate than
> 10s
> > > should overwrite the default value themselves.
> > >
> > >
> > https://docs.aws.amazon.com/kinesis/latest/APIReference/
> > API_DescribeStream.html
> >
> > "This operation has a limit of 10 transactions per second per account."
> >
> > 10s interval would be sufficient, but with 5 requests per second and a
> > larger number of subtasks and multiple applications in the account, we
> are
> > looking at discovery rates in double digit minutes.
> >
> > That's where ListShards will help. Of course, the consumer should still
> be
> > redesigned to centralize the discovery.
> >
> > Thanks,
> > Thomas
> >
>

Re: Proposal - Change shard discovery in Flink Kinesis Connector to use ListShards

Posted by Bowen Li <bo...@gmail.com>.
Good for AWS has increased the limit from 5txn/s last year to 10txn/s

On Mon, Mar 5, 2018 at 8:32 AM, Thomas Weise <th...@apache.org> wrote:

> On Sun, Mar 4, 2018 at 11:38 PM, Bowen Li <bo...@gmail.com> wrote:
>
> > If ListShards() gives all the info that Flink needs, +1 on switching.
> > DescribeStreams() has a limitation of 5 requests/sec, which is pretty
> > bad....
> >
> > But, I believe the goal of switching APIs should be *making Flink jobs
> that
> > read from Kinesis more stable*, rather than having faster shard discovery
> > rate. The default shard discovery rate is every 10s, which is already
> very
> > very fast and can satisfy most Kinesis users, we shouldn't shorten the
> > default value anymore. Developers who want faster discovery rate than 10s
> > should overwrite the default value themselves.
> >
> >
> https://docs.aws.amazon.com/kinesis/latest/APIReference/
> API_DescribeStream.html
>
> "This operation has a limit of 10 transactions per second per account."
>
> 10s interval would be sufficient, but with 5 requests per second and a
> larger number of subtasks and multiple applications in the account, we are
> looking at discovery rates in double digit minutes.
>
> That's where ListShards will help. Of course, the consumer should still be
> redesigned to centralize the discovery.
>
> Thanks,
> Thomas
>

Re: Proposal - Change shard discovery in Flink Kinesis Connector to use ListShards

Posted by Thomas Weise <th...@apache.org>.
On Sun, Mar 4, 2018 at 11:38 PM, Bowen Li <bo...@gmail.com> wrote:

> If ListShards() gives all the info that Flink needs, +1 on switching.
> DescribeStreams() has a limitation of 5 requests/sec, which is pretty
> bad....
>
> But, I believe the goal of switching APIs should be *making Flink jobs that
> read from Kinesis more stable*, rather than having faster shard discovery
> rate. The default shard discovery rate is every 10s, which is already very
> very fast and can satisfy most Kinesis users, we shouldn't shorten the
> default value anymore. Developers who want faster discovery rate than 10s
> should overwrite the default value themselves.
>
>
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html

"This operation has a limit of 10 transactions per second per account."

10s interval would be sufficient, but with 5 requests per second and a
larger number of subtasks and multiple applications in the account, we are
looking at discovery rates in double digit minutes.

That's where ListShards will help. Of course, the consumer should still be
redesigned to centralize the discovery.

Thanks,
Thomas

Re: Proposal - Change shard discovery in Flink Kinesis Connector to use ListShards

Posted by Bowen Li <bo...@gmail.com>.
If ListShards() gives all the info that Flink needs, +1 on switching.
DescribeStreams() has a limitation of 5 requests/sec, which is pretty
bad....

But, I believe the goal of switching APIs should be *making Flink jobs that
read from Kinesis more stable*, rather than having faster shard discovery
rate. The default shard discovery rate is every 10s, which is already very
very fast and can satisfy most Kinesis users, we shouldn't shorten the
default value anymore. Developers who want faster discovery rate than 10s
should overwrite the default value themselves.


I last bumped up AWS SDK version in flink-connector-kinesis. So from my
experience, I'd recommend checking:

   1. make sure the new SDK works with both KCL and KPL in Flink.
   Unfortunately, AWS SDK versions in KCL and KPL are not aligned well...
      1. 1   If necessary, you need to bump their versions too
   2. might need to test it on AWS EMR. AWS SDK is of 1.11.267 in latest
   EMR (5.12.0), need to test compatibility.


Bowen



On Fri, Mar 2, 2018 at 8:47 AM, Thomas Weise <th...@apache.org> wrote:

> It will be good to be able to use the ListShards API. Are there any
> concerns bumping up the AWS SDK dependency? I see it was last done in
> https://issues.apache.org/jira/browse/FLINK-7422
>
> Thanks
>
> On Wed, Feb 28, 2018 at 10:38 PM, Kailash Dayanand <kd...@lyft.com>
> wrote:
>
> > Based on the discussion at here
> > <https://lists.apache.org/thread.html/7b5f8034127c303414927232835990
> c2573ea63abda9dd9b8f2a261f@%3Cdev.flink.apache.org%3E>,
> > I want to propose using the latest ListShards API instead of the
> > DescribeStreams on AWS to overcome the rate limits currently imposed on
> > DescribeStream. The new List Shards have a much higher rate limits (a
> > limit of 100 transactions per second per data stream link
> > <https://docs.aws.amazon.com/kinesis/latest/APIReference/
> API_ListShards.html>).
> > This was recently introduced in the aws-sdk-java release of 1.11.272
> > <https://github.com/aws/aws-sdk-java/releases/tag/1.11.272>. I propose
> > bumping up the aws-sdk-java used in flink-kinesis connector and replace
> the
> > DescribeStream calls with ListShards in the KinesisProxy class here
> > <https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-kinesis/src/main/java/org/
> apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java>
> allowing
> > for faster shard discovery rate.
> >
> > Thanks
> > Kailash
> >
>

Re: Proposal - Change shard discovery in Flink Kinesis Connector to use ListShards

Posted by Thomas Weise <th...@apache.org>.
It will be good to be able to use the ListShards API. Are there any
concerns bumping up the AWS SDK dependency? I see it was last done in
https://issues.apache.org/jira/browse/FLINK-7422

Thanks

On Wed, Feb 28, 2018 at 10:38 PM, Kailash Dayanand <kd...@lyft.com>
wrote:

> Based on the discussion at here
> <https://lists.apache.org/thread.html/7b5f8034127c303414927232835990c2573ea63abda9dd9b8f2a261f@%3Cdev.flink.apache.org%3E>,
> I want to propose using the latest ListShards API instead of the
> DescribeStreams on AWS to overcome the rate limits currently imposed on
> DescribeStream. The new List Shards have a much higher rate limits (a
> limit of 100 transactions per second per data stream link
> <https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html>).
> This was recently introduced in the aws-sdk-java release of 1.11.272
> <https://github.com/aws/aws-sdk-java/releases/tag/1.11.272>. I propose
> bumping up the aws-sdk-java used in flink-kinesis connector and replace the
> DescribeStream calls with ListShards in the KinesisProxy class here
> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java> allowing
> for faster shard discovery rate.
>
> Thanks
> Kailash
>