You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/17 11:16:03 UTC

[1/2] flink git commit: [FLINK-9692] [kinesis] Adaptive reads from Kinesis

Repository: flink
Updated Branches:
  refs/heads/release-1.6 7a91f3071 -> c3d3ff37d


[FLINK-9692] [kinesis] Adaptive reads from Kinesis


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8005a2eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8005a2eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8005a2eb

Branch: refs/heads/release-1.6
Commit: 8005a2ebac1afbec6fcf43a4442f51f442f33590
Parents: 7a91f30
Author: Lakshmi Gururaja Rao <gl...@gmail.com>
Authored: Tue Jul 10 11:40:02 2018 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 17 10:07:45 2018 +0200

----------------------------------------------------------------------
 .../kinesis/config/ConsumerConfigConstants.java |   6 ++
 .../kinesis/internals/ShardConsumer.java        |  38 ++++++-
 .../kinesis/internals/ShardConsumerTest.java    |  45 ++++++++
 .../testutils/FakeKinesisBehavioursFactory.java | 107 +++++++++++++++++++
 4 files changed, 195 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8005a2eb/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index e46f79e..bcbc284 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -134,6 +134,10 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 	/** The interval between each attempt to discover new shards. */
 	public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis";
 
+	/** The config to turn on adaptive reads from a shard. */
+	public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.adaptive.read.records.enabled";
+
+
 	// ------------------------------------------------------------------------
 	//  Default values for consumer configuration
 	// ------------------------------------------------------------------------
@@ -179,6 +183,8 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 
 	public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L;
 
+	public static final boolean DEFAULT_SHARD_USE_ADAPTIVE_READS = false;
+
 	/**
 	 * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
 	 * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators.

http://git-wip-us.apache.org/repos/asf/flink/blob/8005a2eb/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 30f0016..77d180c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -56,6 +56,10 @@ public class ShardConsumer<T> implements Runnable {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
 
+	// AWS Kinesis has a read limit of 2 Mb/sec
+	// https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
+	private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 1024L * 1024L;
+
 	private final KinesisDeserializationSchema<T> deserializer;
 
 	private final KinesisProxyInterface kinesis;
@@ -66,8 +70,9 @@ public class ShardConsumer<T> implements Runnable {
 
 	private final StreamShardHandle subscribedShard;
 
-	private final int maxNumberOfRecordsPerFetch;
+	private int maxNumberOfRecordsPerFetch;
 	private final long fetchIntervalMillis;
+	private final boolean useAdaptiveReads;
 
 	private final ShardMetricsReporter shardMetricsReporter;
 
@@ -125,6 +130,9 @@ public class ShardConsumer<T> implements Runnable {
 		this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
 			ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
 			Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+		this.useAdaptiveReads = Boolean.valueOf(consumerConfig.getProperty(
+			ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS,
+			Boolean.toString(ConsumerConfigConstants.DEFAULT_SHARD_USE_ADAPTIVE_READS)));
 
 		if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
 			String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
@@ -224,10 +232,19 @@ public class ShardConsumer<T> implements Runnable {
 						subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
 						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
+					long recordBatchSizeBytes = 0L;
+					long averageRecordSizeBytes = 0L;
+
 					for (UserRecord record : fetchedRecords) {
+						recordBatchSizeBytes += record.getData().remaining();
 						deserializeRecordForCollectionAndUpdateState(record);
 					}
 
+					if (useAdaptiveReads && !fetchedRecords.isEmpty()) {
+						averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size();
+						maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
+					}
+
 					nextShardItr = getRecordsResult.getNextShardIterator();
 				}
 			}
@@ -330,4 +347,23 @@ public class ShardConsumer<T> implements Runnable {
 	protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
 		return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
 	}
+
+	/**
+	 * Adapts the maxNumberOfRecordsPerFetch based on the current average record size
+	 * to optimize 2 Mb / sec read limits.
+	 *
+	 * @param averageRecordSizeBytes
+	 * @return adaptedMaxRecordsPerFetch
+	 */
+
+	protected int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) {
+		int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch;
+		if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) {
+				adaptedMaxRecordsPerFetch = (int) (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / fetchIntervalMillis));
+
+				// Ensure the value is not more than 10000L
+				adaptedMaxRecordsPerFetch = Math.min(adaptedMaxRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+			}
+		return adaptedMaxRecordsPerFetch;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8005a2eb/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 4478b2f..c2924e2 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -168,6 +168,51 @@ public class ShardConsumerTest {
 			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
 	}
 
+	@Test
+	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
+		Properties consumerProperties = new Properties();
+		consumerProperties.put("flink.shard.adaptive.read.records.enabled", "true");
+
+		StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
+
+		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
+		subscribedShardsStateUnderTest.add(
+			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
+				fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
+
+		TestSourceContext<String> sourceContext = new TestSourceContext<>();
+
+		TestableKinesisDataFetcher<String> fetcher =
+			new TestableKinesisDataFetcher<>(
+				Collections.singletonList("fakeStream"),
+				sourceContext,
+				consumerProperties,
+				new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
+				10,
+				2,
+				new AtomicReference<>(),
+				subscribedShardsStateUnderTest,
+				KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
+				Mockito.mock(KinesisProxyInterface.class));
+
+		new ShardConsumer<>(
+			fetcher,
+			0,
+			subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
+			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
+			// Initial number of records to fetch --> 10
+			FakeKinesisBehavioursFactory.initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(10, 2, 500L),
+			new ShardMetricsReporter()).run();
+
+		// Avg record size for first batch --> 10 * 10 Kb/10 = 10 Kb
+		// Number of records fetched in second batch --> 2 Mb/10Kb * 5 = 40
+		// Total number of records = 10 + 40 = 50
+		assertEquals(50, sourceContext.getCollectedOutputs().size());
+		assertEquals(
+			SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
+			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
+	}
+
 	private static StreamShardHandle getMockStreamShard(String streamName, int shardId) {
 		return new StreamShardHandle(
 			streamName,

http://git-wip-us.apache.org/repos/asf/flink/blob/8005a2eb/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index e403623..eb34155 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kinesis.testutils;
 
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
@@ -93,6 +94,14 @@ public class FakeKinesisBehavioursFactory {
 			numOfRecords, numOfGetRecordsCall, orderOfCallToExpire, millisBehindLatest);
 	}
 
+	public static KinesisProxyInterface initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(
+			final int numOfRecords,
+			final int numOfGetRecordsCalls,
+			final long millisBehindLatest) {
+		return new SingleShardEmittingAdaptiveNumOfRecordsKinesis(numOfRecords, numOfGetRecordsCalls,
+				millisBehindLatest);
+	}
+
 	private static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends SingleShardEmittingFixNumOfRecordsKinesis {
 
 		private final long millisBehindLatest;
@@ -227,6 +236,104 @@ public class FakeKinesisBehavioursFactory {
 
 	}
 
+	private static class SingleShardEmittingAdaptiveNumOfRecordsKinesis implements
+			KinesisProxyInterface {
+
+		protected final int totalNumOfGetRecordsCalls;
+
+		protected final int totalNumOfRecords;
+
+		private final long millisBehindLatest;
+
+		protected final Map<String, List<Record>> shardItrToRecordBatch;
+
+		protected static long averageRecordSizeBytes;
+
+		private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 1024L * 1024L;
+
+		public SingleShardEmittingAdaptiveNumOfRecordsKinesis(final int numOfRecords,
+				final int numOfGetRecordsCalls,
+				final long millisBehindLatest) {
+			this.totalNumOfRecords = numOfRecords;
+			this.totalNumOfGetRecordsCalls = numOfGetRecordsCalls;
+			this.millisBehindLatest = millisBehindLatest;
+			this.averageRecordSizeBytes = 0L;
+
+			// initialize the record batches that we will be fetched
+			this.shardItrToRecordBatch = new HashMap<>();
+
+			int numOfAlreadyPartitionedRecords = 0;
+			int numOfRecordsPerBatch = numOfRecords;
+			for (int batch = 0; batch < totalNumOfGetRecordsCalls; batch++) {
+					shardItrToRecordBatch.put(
+							String.valueOf(batch),
+							createRecordBatchWithRange(
+									numOfAlreadyPartitionedRecords,
+									numOfAlreadyPartitionedRecords + numOfRecordsPerBatch));
+					numOfAlreadyPartitionedRecords += numOfRecordsPerBatch;
+
+				numOfRecordsPerBatch = (int) (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT /
+						(averageRecordSizeBytes * 1000L / ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS));
+			}
+		}
+
+		@Override
+		public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
+			// assuming that the maxRecordsToGet is always large enough
+			return new GetRecordsResult()
+					.withRecords(shardItrToRecordBatch.get(shardIterator))
+					.withMillisBehindLatest(millisBehindLatest)
+					.withNextShardIterator(
+							(Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
+									? null : String
+									.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null
+		}
+
+		@Override
+		public String getShardIterator(StreamShardHandle shard, String shardIteratorType,
+				Object startingMarker) {
+			// this will be called only one time per ShardConsumer;
+			// so, simply return the iterator of the first batch of records
+			return "0";
+		}
+
+		@Override
+		public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
+			return null;
+		}
+
+		public static List<Record> createRecordBatchWithRange(int min, int max) {
+			List<Record> batch = new LinkedList<>();
+			long	sumRecordBatchBytes = 0L;
+			// Create record of size 10Kb
+			String data = createDataSize(10 * 1024L);
+
+			for (int i = min; i < max; i++) {
+				Record record = new Record()
+								.withData(
+										ByteBuffer.wrap(String.valueOf(data).getBytes(ConfigConstants.DEFAULT_CHARSET)))
+								.withPartitionKey(UUID.randomUUID().toString())
+								.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
+								.withSequenceNumber(String.valueOf(i));
+				batch.add(record);
+				sumRecordBatchBytes += record.getData().remaining();
+
+			}
+			if (batch.size() != 0) {
+				averageRecordSizeBytes = sumRecordBatchBytes / batch.size();
+			}
+
+			return batch;
+		}
+
+		private static String createDataSize(long msgSize) {
+			char[] data = new char[(int) msgSize];
+			return new String(data);
+
+		}
+
+	}
+
 	private static class NonReshardedStreamsKinesis implements KinesisProxyInterface {
 
 		private Map<String, List<StreamShardHandle>> streamsWithListOfShards = new HashMap<>();


[2/2] flink git commit: [FLINK-9692] [kinesis] Harmonize style of config variable names

Posted by se...@apache.org.
[FLINK-9692] [kinesis] Harmonize style of config variable names


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3d3ff37
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3d3ff37
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3d3ff37

Branch: refs/heads/release-1.6
Commit: c3d3ff37de47e9302ba2a7397bef7933fab82f81
Parents: 8005a2e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 17 10:16:50 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 17 10:16:50 2018 +0200

----------------------------------------------------------------------
 .../connectors/kinesis/config/ConsumerConfigConstants.java         | 2 +-
 .../streaming/connectors/kinesis/internals/ShardConsumerTest.java  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c3d3ff37/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index bcbc284..48a0b3c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -135,7 +135,7 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 	public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis";
 
 	/** The config to turn on adaptive reads from a shard. */
-	public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.adaptive.read.records.enabled";
+	public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.adaptivereads";
 
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c3d3ff37/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index c2924e2..dbc7118 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -171,7 +171,7 @@ public class ShardConsumerTest {
 	@Test
 	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
 		Properties consumerProperties = new Properties();
-		consumerProperties.put("flink.shard.adaptive.read.records.enabled", "true");
+		consumerProperties.put("flink.shard.adaptivereads", "true");
 
 		StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);