You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/06/22 09:34:31 UTC

[3/6] flink git commit: [FLINK-8516] [kinesis] Allow for custom hash function for shard to subtask mapping in Kinesis consumer

[FLINK-8516] [kinesis] Allow for custom hash function for shard to subtask mapping in Kinesis consumer

This closes #5393.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a231c3c2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a231c3c2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a231c3c2

Branch: refs/heads/release-1.4
Commit: a231c3c2f6e2aa1148d002a517ee67226ea78af4
Parents: 8b046fa
Author: Thomas Weise <th...@apache.org>
Authored: Tue Jan 30 17:44:44 2018 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Sun Jun 17 20:59:01 2018 +0200

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  |  5 ++
 .../kinesis/FlinkKinesisConsumer.java           | 36 ++++++++++-
 .../kinesis/KinesisShardAssigner.java           | 53 ++++++++++++++++
 .../kinesis/internals/KinesisDataFetcher.java   | 23 +++++--
 .../FlinkKinesisConsumerMigrationTest.java      |  3 +-
 .../internals/KinesisDataFetcherTest.java       | 65 ++++++++++++++++++++
 .../testutils/TestableKinesisDataFetcher.java   |  1 +
 7 files changed, 176 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a231c3c2/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 9bcd70a..4ee9c6b 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -119,6 +119,11 @@ then some consumer subtasks will simply be idle and wait until it gets assigned
 new shards (i.e., when the streams are resharded to increase the
 number of shards for higher provisioned Kinesis service throughput).
 
+Also note that the assignment of shards to subtasks may not be optimal when
+shard IDs are not consecutive (as result of dynamic re-sharding in Kinesis).
+For cases where skew in the assignment leads to significant imbalanced consumption,
+a custom implementation of `KinesisShardAssigner` can be set on the consumer.
+
 ### Configuring Starting Position
 
 The Flink Kinesis Consumer currently provides the following options to configure where to start reading Kinesis streams, simply by setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to

http://git-wip-us.apache.org/repos/asf/flink/blob/a231c3c2/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index bf5f779..bea8bf6 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -67,6 +68,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial
  * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST.</p>
  *
+ * <p>Kinesis and the Flink consumer support dynamic re-sharding and shard IDs, while sequential,
+ * cannot be assumed to be consecutive. There is no perfect generic default assignment function.
+ * Default shard to subtask assignment, which is based on hash code, may result in skew,
+ * with some subtasks having many shards assigned and others none.
+ *
+ * <p>It is recommended to monitor the shard distribution and adjust assignment appropriately.
+ * A custom assigner implementation can be set via {@link #setShardAssigner(KinesisShardAssigner)} to optimize the
+ * hash function or use static overrides to limit skew.
+ *
  * @param <T> the type of data emitted
  */
 public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> implements
@@ -91,6 +101,11 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	/** User supplied deserialization schema to convert Kinesis byte messages to Flink objects. */
 	private final KinesisDeserializationSchema<T> deserializer;
 
+	/**
+	 * The function that determines which subtask a shard should be assigned to.
+	 */
+	private KinesisShardAssigner shardAssigner = KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
+
 	// ------------------------------------------------------------------------
 	//  Runtime state
 	// ------------------------------------------------------------------------
@@ -190,6 +205,19 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 		}
 	}
 
+	public KinesisShardAssigner getShardAssigner() {
+		return shardAssigner;
+	}
+
+	/**
+	 * Provide a custom assigner to influence how shards are distributed over subtasks.
+	 * @param shardAssigner
+	 */
+	public void setShardAssigner(KinesisShardAssigner shardAssigner) {
+		this.shardAssigner = checkNotNull(shardAssigner, "function can not be null");
+		ClosureCleaner.clean(shardAssigner, true);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Source life cycle
 	// ------------------------------------------------------------------------
@@ -349,9 +377,11 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 					for (Map.Entry<StreamShardMetadata.EquivalenceWrapper, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
 						// sequenceNumsToRestore is the restored global union state;
 						// should only snapshot shards that actually belong to us
-
+						int hashCode = shardAssigner.assign(
+							KinesisDataFetcher.convertToStreamShardHandle(entry.getKey().getShardMetadata()),
+							getRuntimeContext().getNumberOfParallelSubtasks());
 						if (KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(
-								KinesisDataFetcher.convertToStreamShardHandle(entry.getKey().getShardMetadata()),
+								hashCode,
 								getRuntimeContext().getNumberOfParallelSubtasks(),
 								getRuntimeContext().getIndexOfThisSubtask())) {
 
@@ -382,7 +412,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 			Properties configProps,
 			KinesisDeserializationSchema<T> deserializationSchema) {
 
-		return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema);
+		return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner);
 	}
 
 	@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/a231c3c2/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisShardAssigner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisShardAssigner.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisShardAssigner.java
new file mode 100644
index 0000000..76e3cd6
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisShardAssigner.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices. Users can implement this interface to optimize
+ * distribution of shards over subtasks. See {@link #assign(StreamShardHandle, int)}  for details.
+ */
+@PublicEvolving
+public interface KinesisShardAssigner extends Serializable {
+
+    /**
+	 * Returns the index of the target subtask that a specific shard should be
+	 * assigned to. For return values outside the subtask range, modulus operation will
+	 * be applied automatically, hence it is also valid to just return a hash code.
+	 *
+	 * <p>The resulting distribution of shards should have the following contract:
+	 * <ul>
+	 *     <li>1. Uniform distribution across subtasks</li>
+	 *     <li>2. Deterministic, calls for a given shard always return same index.</li>
+	 * </ul>
+	 *
+	 * <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on this
+	 * contract to filter out shards that they should not subscribe to, guaranteeing
+	 * that each shard of a stream will always be assigned to one subtask in a
+	 * uniformly distributed manner.
+	 *
+     * @param shard the shard to determine
+     * @param numParallelSubtasks total number of subtasks
+     * @return target index, if index falls outside of the range, modulus operation will be applied
+     */
+	int assign(StreamShardHandle shard, int numParallelSubtasks);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a231c3c2/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 4634421..abce21b 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.internals;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
@@ -73,6 +74,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class KinesisDataFetcher<T> {
 
+	public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER = (shard, subtasks) -> shard.hashCode();
+
 	private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class);
 
 	// ------------------------------------------------------------------------
@@ -92,6 +95,11 @@ public class KinesisDataFetcher<T> {
 	 */
 	private final KinesisDeserializationSchema<T> deserializationSchema;
 
+	/**
+	 * The function that determines which subtask a shard should be assigned to.
+	 */
+	private final KinesisShardAssigner shardAssigner;
+
 	// ------------------------------------------------------------------------
 	//  Subtask-specific settings
 	// ------------------------------------------------------------------------
@@ -172,13 +180,15 @@ public class KinesisDataFetcher<T> {
 							SourceFunction.SourceContext<T> sourceContext,
 							RuntimeContext runtimeContext,
 							Properties configProps,
-							KinesisDeserializationSchema<T> deserializationSchema) {
+							KinesisDeserializationSchema<T> deserializationSchema,
+							KinesisShardAssigner shardAssigner) {
 		this(streams,
 			sourceContext,
 			sourceContext.getCheckpointLock(),
 			runtimeContext,
 			configProps,
 			deserializationSchema,
+			shardAssigner,
 			new AtomicReference<>(),
 			new ArrayList<>(),
 			createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
@@ -192,6 +202,7 @@ public class KinesisDataFetcher<T> {
 								RuntimeContext runtimeContext,
 								Properties configProps,
 								KinesisDeserializationSchema<T> deserializationSchema,
+								KinesisShardAssigner shardAssigner,
 								AtomicReference<Throwable> error,
 								List<KinesisStreamShardState> subscribedShardsState,
 								HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
@@ -204,6 +215,7 @@ public class KinesisDataFetcher<T> {
 		this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks();
 		this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
 		this.deserializationSchema = checkNotNull(deserializationSchema);
+		this.shardAssigner = checkNotNull(shardAssigner);
 		this.kinesis = checkNotNull(kinesis);
 
 		this.error = checkNotNull(error);
@@ -436,7 +448,8 @@ public class KinesisDataFetcher<T> {
 			for (String stream : streamsWithNewShards) {
 				List<StreamShardHandle> newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream);
 				for (StreamShardHandle newShard : newShardsOfStream) {
-					if (isThisSubtaskShouldSubscribeTo(newShard, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) {
+					int hashCode = shardAssigner.assign(newShard, totalNumberOfConsumerSubtasks);
+					if (isThisSubtaskShouldSubscribeTo(hashCode, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) {
 						newShardsToSubscribe.add(newShard);
 					}
 				}
@@ -551,14 +564,14 @@ public class KinesisDataFetcher<T> {
 	/**
 	 * Utility function to determine whether a shard should be subscribed by this consumer subtask.
 	 *
-	 * @param shard the shard to determine
+	 * @param shardHash hash code for the shard
 	 * @param totalNumberOfConsumerSubtasks total number of consumer subtasks
 	 * @param indexOfThisConsumerSubtask index of this consumer subtask
 	 */
-	public static boolean isThisSubtaskShouldSubscribeTo(StreamShardHandle shard,
+	public static boolean isThisSubtaskShouldSubscribeTo(int shardHash,
 														int totalNumberOfConsumerSubtasks,
 														int indexOfThisConsumerSubtask) {
-		return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask;
+		return (Math.abs(shardHash % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask;
 	}
 
 	@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/a231c3c2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index 541d464..62a5796 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
-
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -424,7 +423,7 @@ public class FlinkKinesisConsumerMigrationTest {
 				HashMap<StreamShardMetadata, SequenceNumber> testStateSnapshot,
 				List<StreamShardHandle> testInitialDiscoveryShards) {
 
-			super(streams, sourceContext, runtimeContext, configProps, deserializationSchema);
+			super(streams, sourceContext, runtimeContext, configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER);
 
 			this.testStateSnapshot = testStateSnapshot;
 			this.testInitialDiscoveryShards = testInitialDiscoveryShards;

http://git-wip-us.apache.org/repos/asf/flink/blob/a231c3c2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 47d4653..838bc09 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
@@ -39,6 +40,7 @@ import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
 import org.junit.Test;
+import org.powermock.reflect.Whitebox;
 
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -51,6 +53,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -601,4 +604,66 @@ public class KinesisDataFetcherTest extends TestLogger {
 
 		return config;
 	}
+
+	// ----------------------------------------------------------------------
+	// Tests shard distribution with custom hash function
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testShardToSubtaskMappingWithCustomHashFunction() throws Exception {
+
+		int totalCountOfSubtasks = 10;
+		int shardCount = 3;
+
+		for (int i = 0; i < 2; i++) {
+
+			final int hash = i;
+			final KinesisShardAssigner allShardsSingleSubtaskFn = (shard, subtasks) -> hash;
+			Map<String, Integer> streamToShardCount = new HashMap<>();
+			List<String> fakeStreams = new LinkedList<>();
+			fakeStreams.add("fakeStream");
+			streamToShardCount.put("fakeStream", shardCount);
+
+			for (int j = 0; j < totalCountOfSubtasks; j++) {
+
+				int subtaskIndex = j;
+				// subscribe with default hashing
+				final TestableKinesisDataFetcher fetcher =
+					new TestableKinesisDataFetcher(
+						fakeStreams,
+						new TestSourceContext<>(),
+						new Properties(),
+						new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
+						totalCountOfSubtasks,
+						subtaskIndex,
+						new AtomicReference<>(),
+						new LinkedList<>(),
+						KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams),
+						FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
+				Whitebox.setInternalState(fetcher, "shardAssigner", allShardsSingleSubtaskFn); // override hashing
+				List<StreamShardHandle> shards = fetcher.discoverNewShardsToSubscribe();
+				fetcher.shutdownFetcher();
+
+				String msg = String.format("for hash=%d, subtask=%d", hash, subtaskIndex);
+				if (j == i) {
+					assertEquals(msg, shardCount, shards.size());
+				} else {
+					assertEquals(msg, 0, shards.size());
+				}
+			}
+
+		}
+
+	}
+
+	@Test
+	public void testIsThisSubtaskShouldSubscribeTo() {
+		assertTrue(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(0, 2, 0));
+		assertFalse(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(1, 2, 0));
+		assertTrue(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(2, 2, 0));
+		assertFalse(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(0, 2, 1));
+		assertTrue(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(1, 2, 1));
+		assertFalse(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(2, 2, 1));
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a231c3c2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index 83af7a0..1edd333 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -67,6 +67,7 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
 			getMockedRuntimeContext(fakeTotalCountOfSubtasks, fakeIndexOfThisSubtask),
 			fakeConfiguration,
 			deserializationSchema,
+			DEFAULT_SHARD_ASSIGNER,
 			thrownErrorUnderTest,
 			subscribedShardsStateUnderTest,
 			subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,