You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/09/22 09:45:04 UTC

[1/3] flink git commit: [hotfix] Fix typo in Debugging Classloading documentation

Repository: flink
Updated Branches:
  refs/heads/master 345de772a -> d0debf4a8


[hotfix] Fix typo in Debugging Classloading documentation

This closes #4685.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0debf4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0debf4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0debf4a

Branch: refs/heads/master
Commit: d0debf4a8216ad7a38623811e07f2d0d08a7c73c
Parents: 6c1a946
Author: desktop <yu...@kddi-research.jp>
Authored: Wed Sep 20 12:38:26 2017 +0900
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Sep 22 11:43:54 2017 +0200

----------------------------------------------------------------------
 docs/monitoring/debugging_classloading.md | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0debf4a/docs/monitoring/debugging_classloading.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/debugging_classloading.md b/docs/monitoring/debugging_classloading.md
index d69870a..c072777 100644
--- a/docs/monitoring/debugging_classloading.md
+++ b/docs/monitoring/debugging_classloading.md
@@ -28,7 +28,7 @@ under the License.
 ## Overview of Classloading in Flink
 
 When running Flink applications, the JVM will load various classes over time.
-These classes can be devided into two domains:
+These classes can be divided into two domains:
 
   - The **Flink Framework** domain: This includes all code in the `/lib` directory in the Flink directory.
     By default these are the classes of Apache Flink and its core dependencies.
@@ -41,12 +41,12 @@ The class loading behaves slightly different for various Flink setups:
 
 **Standalone**
 
-When starting a the Flink cluster, the JobManagers and TaskManagers are started with the Flink framework classes in the
+When starting a Flink cluster, the JobManagers and TaskManagers are started with the Flink framework classes in the
 classpath. The classes from all jobs that are submitted against the cluster are loaded *dynamically*.
 
 **YARN**
 
-YARN classloading differs between single job deploymens and sessions:
+YARN classloading differs between single job deployments and sessions:
 
   - When submitting a Flink job directly to YARN (via `bin/flink run -m yarn-cluster ...`), dedicated TaskManagers and
     JobManagers are started for that job. Those JVMs have both Flink framework classes and user code classes in their classpath.
@@ -65,7 +65,7 @@ classes are loaded dynamically when the jobs are submitted.
 ## Avoiding Dynamic Classloading
 
 All components (JobManger, TaskManager, Client, ApplicationMaster, ...) log their classpath setting on startup.
-They can be found as part of the environment information at the beginnign of the log.
+They can be found as part of the environment information at the beginning of the log.
 
 When running a setup where the Flink JobManager and TaskManagers are exclusive to one particular job, one can put JAR files
 directly into the `/lib` folder to make sure they are part of the classpath and not loaded dynamically. 
@@ -76,7 +76,7 @@ Because the AppClassLoader is the parent of the FlinkUserCodeClassLoader (and Ja
 result in classes being loaded only once.
 
 For setups where the job's JAR file cannot be put to the `/lib` folder (for example because the setup is a session that is
-used by multiple jobs), it may still be posible to put common libraries to the `/lib` folder, and avoid dynamic class loading
+used by multiple jobs), it may still be possible to put common libraries to the `/lib` folder, and avoid dynamic class loading
 for those.
 
 


[3/3] flink git commit: [FLINK-7508] [kinesis] Switch to POOLED ThreadingMode in FlinkKinesisProducer

Posted by tz...@apache.org.
[FLINK-7508] [kinesis] Switch to POOLED ThreadingMode in FlinkKinesisProducer

KinesisProducerLibrary (KPL) 0.10.x had been using a
One-New-Thread-Per-Request model for all requests sent to AWS Kinesis,
which is very expensive.

0.12.4 introduced a new ThreadingMode - Pooled, which will use a thread
pool. This hugely improves KPL's performance and reduces consumed
resources. By default, KPL still uses per-request mode. We should
explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'.

This closes #4656.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/637dde88
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/637dde88
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/637dde88

Branch: refs/heads/master
Commit: 637dde889fe2d21ff6990749a750356d20fcd965
Parents: 345de77
Author: Bowen Li <bo...@gmail.com>
Authored: Thu Sep 7 13:34:09 2017 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Sep 22 11:43:54 2017 +0200

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  | 13 ++++
 .../kinesis/FlinkKinesisProducer.java           |  4 +-
 .../kinesis/util/KinesisConfigUtil.java         | 44 +++++++++++--
 .../kinesis/util/KinesisConfigUtilTest.java     | 66 +++++++++++++++++++-
 4 files changed, 116 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/637dde88/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 3ffe1c4..aa20d3f 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -279,6 +279,10 @@ producerConfig.put("AggregationMaxCount", "4294967295");
 producerConfig.put("CollectionMaxCount", "1000");
 producerConfig.put("RecordTtl", "30000");
 producerConfig.put("RequestTimeout", "6000");
+producerConfig.put("ThreadPoolSize", "15");
+
+// Switch KinesisProducer's threading model
+// producerConfig.put("ThreadingModel", "PER_REQUEST");
 
 FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
 kinesis.setFailOnError(true);
@@ -301,6 +305,10 @@ producerConfig.put("AggregationMaxCount", "4294967295");
 producerConfig.put("CollectionMaxCount", "1000");
 producerConfig.put("RecordTtl", "30000");
 producerConfig.put("RequestTimeout", "6000");
+producerConfig.put("ThreadPoolSize", "15");
+
+// Switch KinesisProducer's threading model
+// producerConfig.put("ThreadingModel", "PER_REQUEST");
 
 val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig);
 kinesis.setFailOnError(true);
@@ -321,6 +329,11 @@ Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSche
 done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream.
 Otherwise, the returned stream name is used.
 
+### Threading Model
+
+Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL from a one-thread-per-request mode to a thread-pool mode. KPL in thread-pool mode uses a queue and thread pool to execute requests to Kinesis. This limits the number of threads that KPL's native process may create, and therefore greatly lowers CPU utilizations and improves efficiency. **Thus, We highly recommend Flink users use thread-pool model.** The default thread pool size is `10`. Users can set the pool size in `java.util.Properties` instance with key `ThreadPoolSize`, as shown in the above example.
+
+Users can still switch back to one-thread-per-request mode by setting a key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, as shown in the code commented out in above example.
 
 ## Using Non-AWS Kinesis Endpoints for Testing
 

http://git-wip-us.apache.org/repos/asf/flink/blob/637dde88/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index 1f5e64c..286de53 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kinesis;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
-import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.InstantiationUtil;
@@ -171,8 +170,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 		super.open(parameters);
 
 		// check and pass the configuration properties
-		KinesisProducerConfiguration producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps);
-		producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
+		KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
 
 		producer = new KinesisProducer(producerConfig);
 		callback = new FutureCallback<UserRecordResult>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/637dde88/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 997191c..70975db 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -50,10 +50,23 @@ public class KinesisConfigUtil {
 	 * The default value is set as 100% in Flink. KPL's default value is 150% but it makes KPL throw
 	 * RateLimitExceededException too frequently and breaks Flink sink as a result.
 	 **/
-	private static final String RATE_LIMIT = "RateLimit";
+	protected static final String RATE_LIMIT = "RateLimit";
+
+	/**
+	 * The threading model that KinesisProducer will use.
+	 **/
+	protected static final String THREADING_MODEL = "ThreadingModel";
+
+	/**
+	 * The maximum number of threads that the native process' thread pool will be configured with.
+	 **/
+	protected static final String THREAD_POOL_SIZE = "ThreadPoolSize";
 
 	/** Default values for RateLimit. **/
-	private static final String DEFAULT_RATE_LIMIT = "100";
+	protected static final String DEFAULT_RATE_LIMIT = "100";
+
+	/** Default values for ThreadPoolSize. **/
+	protected static final int DEFAULT_THREAD_POOL_SIZE = 10;
 
 	/**
 	 * Validate configuration properties for {@link FlinkKinesisConsumer}.
@@ -164,9 +177,10 @@ public class KinesisConfigUtil {
 	}
 
 	/**
-	 * Validate configuration properties for {@link FlinkKinesisProducer}.
+	 * Validate configuration properties for {@link FlinkKinesisProducer},
+	 * and return a constructed KinesisProducerConfiguration.
 	 */
-	public static KinesisProducerConfiguration validateProducerConfiguration(Properties config) {
+	public static KinesisProducerConfiguration getValidatedProducerConfiguration(Properties config) {
 		checkNotNull(config, "config can not be null");
 
 		validateAwsConfiguration(config);
@@ -176,7 +190,27 @@ public class KinesisConfigUtil {
 			config.setProperty(RATE_LIMIT, DEFAULT_RATE_LIMIT);
 		}
 
-		return KinesisProducerConfiguration.fromProperties(config);
+		KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config);
+
+		kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config));
+
+		// Because of bug https://github.com/awslabs/amazon-kinesis-producer/issues/124
+		// KPL cannot set ThreadingModel and ThreadPoolSize using Java reflection
+		// Thus we have to set them explicitly
+		if (config.containsKey(THREADING_MODEL)) {
+			kpc.setThreadingModel(
+					KinesisProducerConfiguration.ThreadingModel.valueOf(config.getProperty(THREADING_MODEL)));
+		} else {
+			kpc.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED);
+		}
+
+		if (config.containsKey(THREAD_POOL_SIZE)) {
+			kpc.setThreadPoolSize(Integer.parseInt(config.getProperty(THREAD_POOL_SIZE)));
+		} else {
+			kpc.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE);
+		}
+
+		return kpc;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/637dde88/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index 3b00058..a576748 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -17,10 +17,11 @@
 
 package org.apache.flink.streaming.connectors.kinesis.util;
 
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -36,7 +37,7 @@ import static org.junit.Assert.assertEquals;
  * Tests for KinesisConfigUtil.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
+@PrepareForTest(KinesisConfigUtil.class)
 public class KinesisConfigUtilTest {
 	@Rule
 	private ExpectedException exception = ExpectedException.none();
@@ -50,7 +51,66 @@ public class KinesisConfigUtilTest {
 		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
 		testConfig.setProperty("RateLimit", "unparsableLong");
 
-		KinesisConfigUtil.validateProducerConfiguration(testConfig);
+		KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testDefaultRateLimitInProducerConfiguration() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+
+		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+
+		assertEquals(100, kpc.getRateLimit());
+	}
+
+	@Test
+	public void testCustomizedRateLimitInProducerConfiguration() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150");
+
+		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+
+		assertEquals(150, kpc.getRateLimit());
+	}
+
+	@Test
+	public void testDefaultThreadingModelInProducerConfiguration() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+
+		assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED, kpc.getThreadingModel());
+	}
+
+	@Test
+	public void testCustomizedThreadingModelInProducerConfiguration() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigUtil.THREADING_MODEL, "PER_REQUEST");
+		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+
+		assertEquals(KinesisProducerConfiguration.ThreadingModel.PER_REQUEST, kpc.getThreadingModel());
+	}
+
+	@Test
+	public void testDefaultThreadPoolSizeInProducerConfiguration() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+
+		assertEquals(10, kpc.getThreadPoolSize());
+	}
+
+	@Test
+	public void testCustomizedThreadPoolSizeInProducerConfiguration() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigUtil.THREAD_POOL_SIZE, "12");
+		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+
+		assertEquals(12, kpc.getThreadPoolSize());
 	}
 
 	@Test


[2/3] flink git commit: [FLINK-7600] [kinesis] Shorten credential update delay to avoid updateCredentials Exception

Posted by tz...@apache.org.
[FLINK-7600] [kinesis] Shorten credential update delay to avoid updateCredentials Exception

The updateCredentials delay is an ignorable warning that occurs due to
the fact that the default credential update delay is longer than the
await termination timeout when shutting down KPL.

See https://github.com/awslabs/amazon-kinesis-producer/issues/10 for
details.

This closes #4657.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c1a9465
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c1a9465
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c1a9465

Branch: refs/heads/master
Commit: 6c1a946562ec8ac4825b871aefdec040cc02aaf2
Parents: 637dde8
Author: Bowen Li <bo...@gmail.com>
Authored: Thu Sep 7 14:36:28 2017 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Sep 22 11:43:54 2017 +0200

----------------------------------------------------------------------
 .../streaming/connectors/kinesis/util/KinesisConfigUtil.java    | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c1a9465/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 70975db..cadde8d 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -194,6 +194,11 @@ public class KinesisConfigUtil {
 
 		kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config));
 
+		// we explicitly lower the credential refresh delay (default is 5 seconds)
+		// to avoid a ignorable interruption warning that occurs when shutting down the
+		// KPL client. See https://github.com/awslabs/amazon-kinesis-producer/issues/10.
+		kpc.setCredentialsRefreshDelay(100);
+
 		// Because of bug https://github.com/awslabs/amazon-kinesis-producer/issues/124
 		// KPL cannot set ThreadingModel and ThreadPoolSize using Java reflection
 		// Thus we have to set them explicitly