You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Bowen Li (JIRA)" <ji...@apache.org> on 2017/07/14 20:39:00 UTC

[jira] [Commented] (FLINK-6365) Adapt default values of the Kinesis connector

    [ https://issues.apache.org/jira/browse/FLINK-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16088020#comment-16088020 ] 

Bowen Li commented on FLINK-6365:
---------------------------------

We ran into this issue, too. 100 is too small for SHARD_GETRECORDS_MAX, and we got exceptions like:


{code:java}
2017-07-14 19:53:02,995 INFO  org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient  - Unable to execute HTTP request: Broken pipe (Write failed)
java.net.SocketException: Broken pipe (Write failed)
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
	at sun.security.ssl.OutputRecord.writeBuffer(OutputRecord.java:431)
	at sun.security.ssl.OutputRecord.write(OutputRecord.java:417)
	at sun.security.ssl.SSLSocketImpl.writeRecordInternal(SSLSocketImpl.java:886)
	at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:857)
	at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:727)
	at sun.security.ssl.Handshaker.sendChangeCipherSpec(Handshaker.java:1124)
	at sun.security.ssl.ClientHandshaker.sendChangeCipherAndFinish(ClientHandshaker.java:1216)
	at sun.security.ssl.ClientHandshaker.serverHelloDone(ClientHandshaker.java:1128)
	at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:348)
	at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
	at sun.security.ssl.Handshaker.process_record(Handshaker.java:961)
	at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072)
	at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)
	at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)
	at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)
	at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:553)
	at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:412)
	at org.apache.flink.kinesis.shaded.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:134)
	at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:179)
	at org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:328)
	at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:612)
	at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:447)
	at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
	at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1940)
	at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1910)
	at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:936)
	at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:199)
	at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:292)
	at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:200)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
{code}


I'll take this ticket, and propose a default value (5000 or 10000) based on my use case with Flink and Kinesis.

> Adapt default values of the Kinesis connector
> ---------------------------------------------
>
>                 Key: FLINK-6365
>                 URL: https://issues.apache.org/jira/browse/FLINK-6365
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>    Affects Versions: 1.2.0
>            Reporter: Steffen Hausmann
>            Assignee: Bowen Li
>            Priority: Minor
>
> As discussed in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-connector-SHARD-GETRECORDS-MAX-default-value-td12332.html, it seems reasonable to change the default values of the Kinesis connector to follow KCL’s default settings. I suggest to adapt at least the values for SHARD_GETRECORDS_MAX and SHARD_GETRECORDS_INTERVAL_MILLIS. 
> As a Kinesis shard is currently limited to 5 get operations per second, you can observe high ReadProvisionedThroughputExceeded rates with the current default value for SHARD_GETRECORDS_INTERVAL_MILLIS of 0; it seem reasonable to increase it to 200. As it's described in the email thread, it seems furthermore desirable to increase the default value for SHARD_GETRECORDS_MAX to 10000.
> The values that are used by the KCL can be found here: https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
> Thanks for looking into this!
> Steffen



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)