You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/17 11:16:03 UTC
[1/2] flink git commit: [FLINK-9692] [kinesis] Adaptive reads from
Kinesis
Repository: flink
Updated Branches:
refs/heads/release-1.6 7a91f3071 -> c3d3ff37d
[FLINK-9692] [kinesis] Adaptive reads from Kinesis
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8005a2eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8005a2eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8005a2eb
Branch: refs/heads/release-1.6
Commit: 8005a2ebac1afbec6fcf43a4442f51f442f33590
Parents: 7a91f30
Author: Lakshmi Gururaja Rao <gl...@gmail.com>
Authored: Tue Jul 10 11:40:02 2018 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 17 10:07:45 2018 +0200
----------------------------------------------------------------------
.../kinesis/config/ConsumerConfigConstants.java | 6 ++
.../kinesis/internals/ShardConsumer.java | 38 ++++++-
.../kinesis/internals/ShardConsumerTest.java | 45 ++++++++
.../testutils/FakeKinesisBehavioursFactory.java | 107 +++++++++++++++++++
4 files changed, 195 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8005a2eb/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index e46f79e..bcbc284 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -134,6 +134,10 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
/** The interval between each attempt to discover new shards. */
public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis";
+ /** The config to turn on adaptive reads from a shard. */
+ public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.adaptive.read.records.enabled";
+
+
// ------------------------------------------------------------------------
// Default values for consumer configuration
// ------------------------------------------------------------------------
@@ -179,6 +183,8 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L;
+ public static final boolean DEFAULT_SHARD_USE_ADAPTIVE_READS = false;
+
/**
* To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
* getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators.
http://git-wip-us.apache.org/repos/asf/flink/blob/8005a2eb/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 30f0016..77d180c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -56,6 +56,10 @@ public class ShardConsumer<T> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
+ // AWS Kinesis has a read limit of 2 Mb/sec
+ // https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
+ private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 1024L * 1024L;
+
private final KinesisDeserializationSchema<T> deserializer;
private final KinesisProxyInterface kinesis;
@@ -66,8 +70,9 @@ public class ShardConsumer<T> implements Runnable {
private final StreamShardHandle subscribedShard;
- private final int maxNumberOfRecordsPerFetch;
+ private int maxNumberOfRecordsPerFetch;
private final long fetchIntervalMillis;
+ private final boolean useAdaptiveReads;
private final ShardMetricsReporter shardMetricsReporter;
@@ -125,6 +130,9 @@ public class ShardConsumer<T> implements Runnable {
this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+ this.useAdaptiveReads = Boolean.valueOf(consumerConfig.getProperty(
+ ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS,
+ Boolean.toString(ConsumerConfigConstants.DEFAULT_SHARD_USE_ADAPTIVE_READS)));
if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
@@ -224,10 +232,19 @@ public class ShardConsumer<T> implements Runnable {
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
+ long recordBatchSizeBytes = 0L;
+ long averageRecordSizeBytes = 0L;
+
for (UserRecord record : fetchedRecords) {
+ recordBatchSizeBytes += record.getData().remaining();
deserializeRecordForCollectionAndUpdateState(record);
}
+ if (useAdaptiveReads && !fetchedRecords.isEmpty()) {
+ averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size();
+ maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
+ }
+
nextShardItr = getRecordsResult.getNextShardIterator();
}
}
@@ -330,4 +347,23 @@ public class ShardConsumer<T> implements Runnable {
protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+ /**
+ * Adapts the maxNumberOfRecordsPerFetch based on the current average record size
+ * to optimize 2 Mb / sec read limits.
+ *
+ * @param averageRecordSizeBytes
+ * @return adaptedMaxRecordsPerFetch
+ */
+
+ protected int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) {
+ int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch;
+ if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) {
+ adaptedMaxRecordsPerFetch = (int) (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / fetchIntervalMillis));
+
+ // Ensure the value is not more than 10000L
+ adaptedMaxRecordsPerFetch = Math.min(adaptedMaxRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+ }
+ return adaptedMaxRecordsPerFetch;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8005a2eb/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 4478b2f..c2924e2 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -168,6 +168,51 @@ public class ShardConsumerTest {
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
}
+ @Test
+ public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
+ Properties consumerProperties = new Properties();
+ consumerProperties.put("flink.shard.adaptive.read.records.enabled", "true");
+
+ StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
+
+ LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
+ subscribedShardsStateUnderTest.add(
+ new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
+ fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
+
+ TestSourceContext<String> sourceContext = new TestSourceContext<>();
+
+ TestableKinesisDataFetcher<String> fetcher =
+ new TestableKinesisDataFetcher<>(
+ Collections.singletonList("fakeStream"),
+ sourceContext,
+ consumerProperties,
+ new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
+ 10,
+ 2,
+ new AtomicReference<>(),
+ subscribedShardsStateUnderTest,
+ KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
+ Mockito.mock(KinesisProxyInterface.class));
+
+ new ShardConsumer<>(
+ fetcher,
+ 0,
+ subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
+ subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
+ // Initial number of records to fetch --> 10
+ FakeKinesisBehavioursFactory.initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(10, 2, 500L),
+ new ShardMetricsReporter()).run();
+
+ // Avg record size for first batch --> 10 * 10 Kb/10 = 10 Kb
+ // Number of records fetched in second batch --> 2 Mb/10Kb * 5 = 40
+ // Total number of records = 10 + 40 = 50
+ assertEquals(50, sourceContext.getCollectedOutputs().size());
+ assertEquals(
+ SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
+ subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
+ }
+
private static StreamShardHandle getMockStreamShard(String streamName, int shardId) {
return new StreamShardHandle(
streamName,
http://git-wip-us.apache.org/repos/asf/flink/blob/8005a2eb/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index e403623..eb34155 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kinesis.testutils;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
@@ -93,6 +94,14 @@ public class FakeKinesisBehavioursFactory {
numOfRecords, numOfGetRecordsCall, orderOfCallToExpire, millisBehindLatest);
}
+ public static KinesisProxyInterface initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(
+ final int numOfRecords,
+ final int numOfGetRecordsCalls,
+ final long millisBehindLatest) {
+ return new SingleShardEmittingAdaptiveNumOfRecordsKinesis(numOfRecords, numOfGetRecordsCalls,
+ millisBehindLatest);
+ }
+
private static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends SingleShardEmittingFixNumOfRecordsKinesis {
private final long millisBehindLatest;
@@ -227,6 +236,104 @@ public class FakeKinesisBehavioursFactory {
}
+ private static class SingleShardEmittingAdaptiveNumOfRecordsKinesis implements
+ KinesisProxyInterface {
+
+ protected final int totalNumOfGetRecordsCalls;
+
+ protected final int totalNumOfRecords;
+
+ private final long millisBehindLatest;
+
+ protected final Map<String, List<Record>> shardItrToRecordBatch;
+
+ protected static long averageRecordSizeBytes;
+
+ private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 1024L * 1024L;
+
+ public SingleShardEmittingAdaptiveNumOfRecordsKinesis(final int numOfRecords,
+ final int numOfGetRecordsCalls,
+ final long millisBehindLatest) {
+ this.totalNumOfRecords = numOfRecords;
+ this.totalNumOfGetRecordsCalls = numOfGetRecordsCalls;
+ this.millisBehindLatest = millisBehindLatest;
+ this.averageRecordSizeBytes = 0L;
+
+ // initialize the record batches that we will be fetched
+ this.shardItrToRecordBatch = new HashMap<>();
+
+ int numOfAlreadyPartitionedRecords = 0;
+ int numOfRecordsPerBatch = numOfRecords;
+ for (int batch = 0; batch < totalNumOfGetRecordsCalls; batch++) {
+ shardItrToRecordBatch.put(
+ String.valueOf(batch),
+ createRecordBatchWithRange(
+ numOfAlreadyPartitionedRecords,
+ numOfAlreadyPartitionedRecords + numOfRecordsPerBatch));
+ numOfAlreadyPartitionedRecords += numOfRecordsPerBatch;
+
+ numOfRecordsPerBatch = (int) (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT /
+ (averageRecordSizeBytes * 1000L / ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS));
+ }
+ }
+
+ @Override
+ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
+ // assuming that the maxRecordsToGet is always large enough
+ return new GetRecordsResult()
+ .withRecords(shardItrToRecordBatch.get(shardIterator))
+ .withMillisBehindLatest(millisBehindLatest)
+ .withNextShardIterator(
+ (Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
+ ? null : String
+ .valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null
+ }
+
+ @Override
+ public String getShardIterator(StreamShardHandle shard, String shardIteratorType,
+ Object startingMarker) {
+ // this will be called only one time per ShardConsumer;
+ // so, simply return the iterator of the first batch of records
+ return "0";
+ }
+
+ @Override
+ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
+ return null;
+ }
+
+ public static List<Record> createRecordBatchWithRange(int min, int max) {
+ List<Record> batch = new LinkedList<>();
+ long sumRecordBatchBytes = 0L;
+ // Create record of size 10Kb
+ String data = createDataSize(10 * 1024L);
+
+ for (int i = min; i < max; i++) {
+ Record record = new Record()
+ .withData(
+ ByteBuffer.wrap(String.valueOf(data).getBytes(ConfigConstants.DEFAULT_CHARSET)))
+ .withPartitionKey(UUID.randomUUID().toString())
+ .withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
+ .withSequenceNumber(String.valueOf(i));
+ batch.add(record);
+ sumRecordBatchBytes += record.getData().remaining();
+
+ }
+ if (batch.size() != 0) {
+ averageRecordSizeBytes = sumRecordBatchBytes / batch.size();
+ }
+
+ return batch;
+ }
+
+ private static String createDataSize(long msgSize) {
+ char[] data = new char[(int) msgSize];
+ return new String(data);
+
+ }
+
+ }
+
private static class NonReshardedStreamsKinesis implements KinesisProxyInterface {
private Map<String, List<StreamShardHandle>> streamsWithListOfShards = new HashMap<>();
[2/2] flink git commit: [FLINK-9692] [kinesis] Harmonize style of
config variable names
Posted by se...@apache.org.
[FLINK-9692] [kinesis] Harmonize style of config variable names
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3d3ff37
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3d3ff37
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3d3ff37
Branch: refs/heads/release-1.6
Commit: c3d3ff37de47e9302ba2a7397bef7933fab82f81
Parents: 8005a2e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 17 10:16:50 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 17 10:16:50 2018 +0200
----------------------------------------------------------------------
.../connectors/kinesis/config/ConsumerConfigConstants.java | 2 +-
.../streaming/connectors/kinesis/internals/ShardConsumerTest.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c3d3ff37/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index bcbc284..48a0b3c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -135,7 +135,7 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis";
/** The config to turn on adaptive reads from a shard. */
- public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.adaptive.read.records.enabled";
+ public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.adaptivereads";
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c3d3ff37/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index c2924e2..dbc7118 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -171,7 +171,7 @@ public class ShardConsumerTest {
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
Properties consumerProperties = new Properties();
- consumerProperties.put("flink.shard.adaptive.read.records.enabled", "true");
+ consumerProperties.put("flink.shard.adaptivereads", "true");
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);