You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by bowenli86 <gi...@git.apache.org> on 2017/09/07 20:39:21 UTC

[GitHub] flink pull request #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer ...

GitHub user bowenli86 opened a pull request:

    https://github.com/apache/flink/pull/4656

    [FLINK-7508][kinesis] switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode

    ## What is the purpose of the change
    
    KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which is very expensive.
    
    0.12.4 introduced a new ThreadingMode - Pooled, which will use a thread pool. This hugely improves KPL's performance and reduces consumed resources. By default, KPL still uses per-request mode. We should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'.
    This work depends on FLINK-7366 and FLINK-7508
    
    Benchmarking I did:
    
    - Environment: Running a Flink hourly-sliding windowing job on 18-node EMR cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink job generates about 21million UserRecords, which means that we generated a test load of 21million UserRecords at the first minute of each hour.
    - Criteria: Test KPL throughput per minute. Since the default RecordTTL for KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL within a minute, or we will see UserRecord expiration errors.
    - One-New-Thread-Per-Request model: max throughput is about 2million UserRecords per min; it doesn't go beyond that because CPU utilization goes to 100%, everything stopped working and that Flink job crashed.
    - Thread-Pool model with pool size of 10: it sends out 21million UserRecords within 30 sec without any UserRecord expiration errors. The average peak CPU utilization is about 20% - 30%. So 21million UserRecords/min is not the max throughput of thread-pool model. We didn't go any further because 1) this throughput is already a couple times more than what we really need, and 2) we don't have a quick way of increasing the test load
    
    Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. 
    
    ## Brief change log
    
      - *switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode*
      - *update docs*
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    - *added unit tests in flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java*
    
    ## Does this pull request potentially affect one of the following parts:
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (docs)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bowenli86/flink FLINK-7508

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4656.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4656
    
----
commit 6386983239bd3024b395c865ec4fd33e232ca5a3
Author: Bowen Li <bo...@gmail.com>
Date:   2017-08-30T16:35:03Z

    FLINK-7422 Upgrade Kinesis Client Library (KCL) and AWS SDK in flink-connector-kinesis

commit 381cd4156b84673a1d32d2db3f7b2d748d90d980
Author: Bowen Li <bo...@gmail.com>
Date:   2017-09-07T06:33:37Z

    Merge remote-tracking branch 'upstream/master'

commit 893ec61bebfa20a038819bf1929791e57b98f33b
Author: Bowen Li <bo...@gmail.com>
Date:   2017-09-07T20:34:09Z

    FLINK-7508 switch FlinkKinesisProducer to use KPL's ThreadingMode to threaded-pool mode rather than one_thread_per_request mode

----


---

[GitHub] flink pull request #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4656


---

[GitHub] flink issue #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer to use ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4656
  
    Thanks :)
    
    LGTM! Merging this ..


---

[GitHub] flink issue #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer to use ...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/4656
  
    @tzulitai Hi Gordon, can you please take a look at this PR?


---

[GitHub] flink pull request #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4656#discussion_r138836822
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java ---
    @@ -36,7 +37,7 @@
      * Tests for KinesisConfigUtil.
      */
     @RunWith(PowerMockRunner.class)
    -@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
    +@PrepareForTest({KinesisConfigUtil.class})
    --- End diff --
    
    Can remove the now unnecessary `{}`.


---

[GitHub] flink issue #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer to use ...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/4656
  
    @tzulitai Thank you! 


---

[GitHub] flink pull request #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4656#discussion_r138837100
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -171,8 +171,9 @@ public void open(Configuration parameters) throws Exception {
     		super.open(parameters);
     
     		// check and pass the configuration properties
    -		KinesisProducerConfiguration producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps);
    +		KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
     		producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
    +		producerConfig.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED);
    --- End diff --
    
    Do you think it will make sense to allow the user to configure different threading models?


---

[GitHub] flink issue #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer to use ...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/4656
  
    @tzulitai Thanks, Gordon! I watched your presentation on "managing state of Flink", and you did a great job explaining all details.
    
    I added an option for PER_REQUEST model since it doesn't hurt anything. 
    
    This PR is for 1.4. There's [another PR here](https://github.com/apache/flink/pull/4657) I think need to be merged to both 1.4 and 1.3.x


---

[GitHub] flink pull request #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4656#discussion_r139931718
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java ---
    @@ -50,7 +51,66 @@ public void testUnparsableLongForProducerConfiguration() {
     		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
     		testConfig.setProperty("RateLimit", "unparsableLong");
     
    -		KinesisConfigUtil.validateProducerConfiguration(testConfig);
    +		KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
    +	}
    +
    +	@Test
    +	public void testDefaultRateLimitInProducerConfiguration() {
    +		Properties testConfig = new Properties();
    +		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    +
    +		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
    +
    +		assertEquals(100, kpc.getRateLimit());
    +	}
    +
    +	@Test
    +	public void testCustomizedRateLimitInProducerConfiguration() {
    +		Properties testConfig = new Properties();
    +		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    +		testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150");
    +
    +		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
    +
    +		assertEquals(150, kpc.getRateLimit());
    +	}
    +
    +	@Test
    +	public void testDefaultThreadingModelInProducerConfiguration() {
    +		Properties testConfig = new Properties();
    +		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    +		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
    +
    +		assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED, kpc.getThreadingModel());
    +	}
    +
    +	@Test
    +	public void testCustomizedThreadingModelSizeInProducerConfiguration() {
    --- End diff --
    
    nit: I think the "Size" in the name test is redundant here. I'll remove it.


---

[GitHub] flink pull request #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer ...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4656#discussion_r138965647
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -171,8 +171,9 @@ public void open(Configuration parameters) throws Exception {
     		super.open(parameters);
     
     		// check and pass the configuration properties
    -		KinesisProducerConfiguration producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps);
    +		KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
     		producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
    +		producerConfig.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED);
    --- End diff --
    
    On a second thought for this. Though I believe POOLED model is the best option for most of the use cases I can think of, we should give users the flexibility to make decisions.
    
    Adding PER_REQUEST model 


---