You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/11/30 08:23:39 UTC

[1/3] flink git commit: [FLINK-8076] [kinesis] Upgrade KinesisProducer to 0.10.6 to set properties approperiately

Repository: flink
Updated Branches:
  refs/heads/master 9fc3c71f8 -> a744d4bf3


[FLINK-8076] [kinesis] Upgrade KinesisProducer to 0.10.6 to set properties approperiately

This closes #5017.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be913100
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be913100
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be913100

Branch: refs/heads/master
Commit: be913100d8657c7a0acdaa92557af51ab69af0ab
Parents: a246205
Author: Bowen Li <bo...@gmail.com>
Authored: Tue Nov 14 20:31:03 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Nov 30 16:04:12 2017 +0800

----------------------------------------------------------------------
 .../flink-connector-kinesis/pom.xml             |   2 +-
 .../kinesis/util/KinesisConfigUtil.java         |  31 ++--
 .../kinesis/util/KinesisConfigUtilTest.java     | 158 +++++--------------
 3 files changed, 52 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/be913100/flink-connectors/flink-connector-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index ea583f8..f91228b 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -35,7 +35,7 @@ under the License.
 	<properties>
 		<aws.sdk.version>1.11.171</aws.sdk.version>
 		<aws.kinesis-kcl.version>1.8.1</aws.kinesis-kcl.version>
-		<aws.kinesis-kpl.version>0.12.5</aws.kinesis-kpl.version>
+		<aws.kinesis-kpl.version>0.12.6</aws.kinesis-kpl.version>
 	</properties>
 
 	<packaging>jar</packaging>

http://git-wip-us.apache.org/repos/asf/flink/blob/be913100/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index cadde8d..dae7f52 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -63,7 +63,10 @@ public class KinesisConfigUtil {
 	protected static final String THREAD_POOL_SIZE = "ThreadPoolSize";
 
 	/** Default values for RateLimit. **/
-	protected static final String DEFAULT_RATE_LIMIT = "100";
+	protected static final long DEFAULT_RATE_LIMIT = 100L;
+
+	/** Default value for ThreadingModel. **/
+	protected static final KinesisProducerConfiguration.ThreadingModel DEFAULT_THREADING_MODEL = KinesisProducerConfiguration.ThreadingModel.POOLED;
 
 	/** Default values for ThreadPoolSize. **/
 	protected static final int DEFAULT_THREAD_POOL_SIZE = 10;
@@ -185,33 +188,23 @@ public class KinesisConfigUtil {
 
 		validateAwsConfiguration(config);
 
-		// Override KPL default value if it's not specified by user
-		if (!config.containsKey(RATE_LIMIT)) {
-			config.setProperty(RATE_LIMIT, DEFAULT_RATE_LIMIT);
-		}
-
 		KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config);
 
 		kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config));
 
 		// we explicitly lower the credential refresh delay (default is 5 seconds)
-		// to avoid a ignorable interruption warning that occurs when shutting down the
+		// to avoid an ignorable interruption warning that occurs when shutting down the
 		// KPL client. See https://github.com/awslabs/amazon-kinesis-producer/issues/10.
 		kpc.setCredentialsRefreshDelay(100);
 
-		// Because of bug https://github.com/awslabs/amazon-kinesis-producer/issues/124
-		// KPL cannot set ThreadingModel and ThreadPoolSize using Java reflection
-		// Thus we have to set them explicitly
-		if (config.containsKey(THREADING_MODEL)) {
-			kpc.setThreadingModel(
-					KinesisProducerConfiguration.ThreadingModel.valueOf(config.getProperty(THREADING_MODEL)));
-		} else {
-			kpc.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED);
+		// Override default values if they aren't specified by users
+		if (!config.containsKey(RATE_LIMIT)) {
+			kpc.setRateLimit(DEFAULT_RATE_LIMIT);
 		}
-
-		if (config.containsKey(THREAD_POOL_SIZE)) {
-			kpc.setThreadPoolSize(Integer.parseInt(config.getProperty(THREAD_POOL_SIZE)));
-		} else {
+		if (!config.containsKey(THREADING_MODEL)) {
+			kpc.setThreadingModel(DEFAULT_THREADING_MODEL);
+		}
+		if (!config.containsKey(THREAD_POOL_SIZE)) {
 			kpc.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/be913100/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index b52dce2..792d3a7 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -60,60 +60,43 @@ public class KinesisConfigUtilTest {
 	}
 
 	@Test
-	public void testDefaultRateLimitInProducerConfiguration() {
+	public void testRateLimitInProducerConfiguration() {
 		Properties testConfig = new Properties();
 		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-
 		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
 
 		assertEquals(100, kpc.getRateLimit());
-	}
 
-	@Test
-	public void testCustomizedRateLimitInProducerConfiguration() {
-		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
 		testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150");
-
-		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+		kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
 
 		assertEquals(150, kpc.getRateLimit());
 	}
 
 	@Test
-	public void testDefaultThreadingModelInProducerConfiguration() {
+	public void testThreadingModelInProducerConfiguration() {
 		Properties testConfig = new Properties();
 		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
 		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
 
 		assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED, kpc.getThreadingModel());
-	}
 
-	@Test
-	public void testCustomizedThreadingModelInProducerConfiguration() {
-		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
 		testConfig.setProperty(KinesisConfigUtil.THREADING_MODEL, "PER_REQUEST");
-		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+		kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
 
 		assertEquals(KinesisProducerConfiguration.ThreadingModel.PER_REQUEST, kpc.getThreadingModel());
 	}
 
 	@Test
-	public void testDefaultThreadPoolSizeInProducerConfiguration() {
+	public void testThreadPoolSizeInProducerConfiguration() {
 		Properties testConfig = new Properties();
 		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
 		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
 
 		assertEquals(10, kpc.getThreadPoolSize());
-	}
 
-	@Test
-	public void testCustomizedThreadPoolSizeInProducerConfiguration() {
-		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
 		testConfig.setProperty(KinesisConfigUtil.THREAD_POOL_SIZE, "12");
-		KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+		kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
 
 		assertEquals(12, kpc.getThreadPoolSize());
 	}
@@ -178,11 +161,8 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid AWS Credential Provider Type");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
-		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 
 		KinesisConfigUtil.validateAwsConfiguration(testConfig);
 	}
@@ -196,11 +176,8 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid initial position in stream");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -212,11 +189,8 @@ public class KinesisConfigUtilTest {
 		exception.expectMessage("Please set value for initial timestamp ('"
 				+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -227,11 +201,8 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate");
 
@@ -243,11 +214,8 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0");
 
@@ -258,11 +226,8 @@ public class KinesisConfigUtilTest {
 	public void testDateStringForValidateOptionDateProperty() {
 		String timestamp = "2016-04-04T19:58:46.480-00:00";
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp);
 
@@ -278,11 +243,8 @@ public class KinesisConfigUtilTest {
 	public void testUnixTimestampForValidateOptionDateProperty() {
 		String unixTimestamp = "1459799926.480";
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
 
@@ -299,11 +261,8 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "InvalidPattern");
@@ -316,11 +275,8 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd");
@@ -333,11 +289,8 @@ public class KinesisConfigUtilTest {
 		String unixTimestamp = "2016-04-04";
 		String pattern = "yyyy-MM-dd";
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern);
@@ -355,10 +308,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -369,10 +319,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -383,10 +330,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -397,10 +341,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -411,10 +352,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for maximum records per getRecords shard operation");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -425,10 +363,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get records operation base backoff milliseconds");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -439,10 +374,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get records operation max backoff milliseconds");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -453,10 +385,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get records operation backoff exponential constant");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -467,10 +396,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -481,10 +407,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -495,10 +418,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -509,10 +429,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -523,10 +440,7 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
@@ -537,12 +451,18 @@ public class KinesisConfigUtilTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		Properties testConfig = getPropertiesWithRequiredFields();
 		testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
+
+	private Properties getPropertiesWithRequiredFields() {
+		Properties config = new Properties();
+		config.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		config.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		config.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		return config;
+	}
 }


[2/3] flink git commit: [hotfix] [docs] Fix typos in State Backends doc

Posted by tz...@apache.org.
[hotfix] [docs] Fix typos in State Backends doc

This closes #5075.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a246205c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a246205c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a246205c

Branch: refs/heads/master
Commit: a246205cd48d881752f385329d6d564868e41922
Parents: 9fc3c71
Author: Cristian <me...@cristian.io>
Authored: Sun Nov 26 19:45:22 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Nov 30 16:04:12 2017 +0800

----------------------------------------------------------------------
 docs/ops/state/state_backends.md | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a246205c/docs/ops/state/state_backends.md
----------------------------------------------------------------------
diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md
index b32ad9f..4cda94f 100644
--- a/docs/ops/state/state_backends.md
+++ b/docs/ops/state/state_backends.md
@@ -96,9 +96,9 @@ The FsStateBackend is encouraged for:
 
 The *RocksDBStateBackend* is configured with a file system URL (type, address, path), such as "hdfs://namenode:40010/flink/checkpoints" or "file:///data/flink/checkpoints".
 
-The RocksDBStateBackend holds in-flight data in a [RocksDB](http://rocksdb.org) data base
+The RocksDBStateBackend holds in-flight data in a [RocksDB](http://rocksdb.org) database
 that is (per default) stored in the TaskManager data directories. Upon checkpointing, the whole
-RocksDB data base will be checkpointed into the configured file system and directory. Minimal
+RocksDB database will be checkpointed into the configured file system and directory. Minimal
 metadata is stored in the JobManager's memory (or, in high-availability mode, in the metadata checkpoint).
 
 The RocksDBStateBackend always performs asynchronous snapshots.
@@ -113,7 +113,7 @@ The RocksDBStateBackend is encouraged for:
   - Jobs with very large state, long windows, large key/value states.
   - All high-availability setups.
 
-Note that the amount of state that you can keep is only limited by the amount of disc space available.
+Note that the amount of state that you can keep is only limited by the amount of disk space available.
 This allows keeping very large state, compared to the FsStateBackend that keeps state in memory.
 This also means, however, that the maximum throughput that can be achieved will be lower with
 this state backend.


[3/3] flink git commit: [FLINK-8149] [kinesis] Replace usages of deprecated SerializationSchema

Posted by tz...@apache.org.
[FLINK-8149] [kinesis] Replace usages of deprecated SerializationSchema

This closes #5069.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a744d4bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a744d4bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a744d4bf

Branch: refs/heads/master
Commit: a744d4bf39f8bb365cc55c034f6521a00b2aa77a
Parents: be91310
Author: yew1eb <ye...@gmail.com>
Authored: Sat Nov 25 01:48:36 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Nov 30 16:05:04 2017 +0800

----------------------------------------------------------------------
 .../flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java | 2 +-
 .../flink/streaming/connectors/kinesis/FlinkKinesisProducer.java | 2 +-
 .../connectors/kinesis/examples/ConsumeFromKinesis.java          | 2 +-
 .../connectors/kinesis/examples/ProduceIntoKinesis.java          | 2 +-
 .../kinesis/serialization/KinesisDeserializationSchema.java      | 2 +-
 .../serialization/KinesisDeserializationSchemaWrapper.java       | 2 +-
 .../streaming/connectors/kinesis/FlinkKinesisConsumerTest.java   | 2 +-
 .../streaming/connectors/kinesis/FlinkKinesisProducerTest.java   | 4 ++--
 .../kinesis/manualtests/ManualConsumerProducerTest.java          | 2 +-
 .../kinesis/testutils/ExactlyOnceValidatingConsumerThread.java   | 2 +-
 .../kinesis/testutils/KinesisEventsGeneratorProducerThread.java  | 2 +-
 .../kinesis/testutils/TestableFlinkKinesisConsumer.java          | 2 +-
 .../connectors/kinesis/testutils/TestableKinesisDataFetcher.java | 2 +-
 13 files changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a744d4bf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index a3681ec..f6a9bd1 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kinesis;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -41,7 +42,6 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/flink/blob/a744d4bf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index 2256073..04cb78a 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kinesis;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -25,7 +26,6 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.InstantiationUtil;
 
 import com.amazonaws.services.kinesis.producer.Attempt;

http://git-wip-us.apache.org/repos/asf/flink/blob/a744d4bf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
index b1ac057..29f631d 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
@@ -17,12 +17,12 @@
 
 package org.apache.flink.streaming.connectors.kinesis.examples;
 
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 import java.util.Properties;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a744d4bf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
index 8d21c2c..6846018 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.streaming.connectors.kinesis.examples;
 
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 import org.apache.commons.lang3.RandomStringUtils;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a744d4bf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
index b06b20f..c4be96b 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kinesis.serialization;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
 import java.io.IOException;
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/flink/blob/a744d4bf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
index 279d410..e058736 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kinesis.serialization;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a744d4bf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 2b1fcf4..a354bb3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kinesis;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -43,7 +44,6 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeseri
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;

http://git-wip-us.apache.org/repos/asf/flink/blob/a744d4bf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index 702ab0b..8351f8e 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kinesis;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -26,8 +28,6 @@ import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a744d4bf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
index a7470dc..526261c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kinesis.manualtests;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -29,7 +30,6 @@ import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.Collector;
 
 import java.nio.ByteBuffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/a744d4bf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
index 1336652..45866bf 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
@@ -20,13 +20,13 @@ package org.apache.flink.streaming.connectors.kinesis.testutils;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;

http://git-wip-us.apache.org/repos/asf/flink/blob/a744d4bf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
index 699c977..89a4c0d 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.streaming.connectors.kinesis.testutils;
 
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 import org.apache.commons.lang3.RandomStringUtils;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/flink/blob/a744d4bf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
index 6c91eaf..de3e51d 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
@@ -18,8 +18,8 @@
 package org.apache.flink.streaming.connectors.kinesis.testutils;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;

http://git-wip-us.apache.org/repos/asf/flink/blob/a744d4bf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index b6f3cbc..d111546 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kinesis.testutils;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
@@ -26,7 +27,6 @@ import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;