You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/02 17:39:16 UTC

[flink] 01/02: [FLINK-12768][tests] Fix FlinkKinesisConsumerTest.testSourceSynchronization race condition

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 66d0e31294b2588e1aabccb952fdbf2bcfabe878
Author: Thomas Weise <th...@apache.org>
AuthorDate: Tue Jul 16 12:41:22 2019 -0700

    [FLINK-12768][tests] Fix FlinkKinesisConsumerTest.testSourceSynchronization race condition
    
    This closes #9183.
---
 .../kinesis/internals/KinesisDataFetcher.java      |  9 +----
 .../connectors/kinesis/util/RecordEmitter.java     |  9 ++---
 .../kinesis/FlinkKinesisConsumerTest.java          | 45 +++++++++++++++-------
 .../testutils/FakeKinesisBehavioursFactory.java    |  2 +-
 4 files changed, 38 insertions(+), 27 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index eae3153..f38e6eb 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -270,8 +270,6 @@ public class KinesisDataFetcher<T> {
 		@Override
 		public void emit(RecordWrapper<T> record, RecordQueue<RecordWrapper<T>> queue) {
 			emitRecordAndUpdateState(record);
-			ShardWatermarkState<T> sws = shardWatermarks.get(queue.getQueueId());
-			sws.lastEmittedRecordWatermark = record.watermark;
 		}
 	}
 
@@ -290,11 +288,6 @@ public class KinesisDataFetcher<T> {
 					}
 
 					@Override
-					public int getQueueId() {
-						return producerIndex;
-					}
-
-					@Override
 					public int getSize() {
 						return 0;
 					}
@@ -770,6 +763,8 @@ public class KinesisDataFetcher<T> {
 		synchronized (checkpointLock) {
 			if (rw.getValue() != null) {
 				sourceContext.collectWithTimestamp(rw.getValue(), rw.timestamp);
+				ShardWatermarkState<T> sws = shardWatermarks.get(rw.shardStateIndex);
+				sws.lastEmittedRecordWatermark = rw.watermark;
 			} else {
 				LOG.warn("Skipping non-deserializable record at sequence number {} of shard {}.",
 					rw.lastSequenceNumber,
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
index 17344b1..da74b08 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
@@ -79,8 +79,6 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna
 	public interface RecordQueue<T> {
 		void put(T record) throws InterruptedException;
 
-		int getQueueId();
-
 		int getSize();
 
 		T peek();
@@ -98,6 +96,7 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna
 			this.headTimestamp = Long.MAX_VALUE;
 		}
 
+		@Override
 		public void put(T record) throws InterruptedException {
 			queue.put(record);
 			synchronized (condition) {
@@ -105,14 +104,12 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna
 			}
 		}
 
-		public int getQueueId() {
-			return queueId;
-		}
-
+		@Override
 		public int getSize() {
 			return queue.size();
 		}
 
+		@Override
 		public T peek() {
 			return queue.peek();
 		}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index cbcd8b4..0eb7bb3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
@@ -52,6 +53,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
 import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.CollectingSourceContext;
@@ -72,7 +74,9 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -531,8 +535,8 @@ public class FlinkKinesisConsumerTest {
 	}
 
 	/**
-	 * FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#shardId} or
-	 * {@link StreamShardMetadata#streamName} does not result in the shard not being able to be restored.
+	 * FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#getShardId()} or
+	 * {@link StreamShardMetadata#getStreamName()} does not result in the shard not being able to be restored.
 	 * This handles the corner case where the stored shard metadata is open (no ending sequence number), but after the
 	 * job restore, the shard has been closed (ending number set) due to re-sharding, and we can no longer rely on
 	 * {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection of restored shard metadata.
@@ -833,6 +837,7 @@ public class FlinkKinesisConsumerTest {
 		final long autoWatermarkInterval = 1_000;
 		final long watermarkSyncInterval = autoWatermarkInterval + 1;
 
+		TestWatermarkTracker.WATERMARK.set(0);
 		HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds = new HashMap<>();
 		subscribedStreamsToLastDiscoveredShardIds.put(streamName, null);
 
@@ -846,10 +851,9 @@ public class FlinkKinesisConsumerTest {
 		props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, Long.toString(5));
 
 		BlockingQueue<String> shard1 = new LinkedBlockingQueue();
-		BlockingQueue<String> shard2 = new LinkedBlockingQueue();
 
 		Map<String, List<BlockingQueue<String>>> streamToQueueMap = new HashMap<>();
-		streamToQueueMap.put(streamName, Lists.newArrayList(shard1, shard2));
+		streamToQueueMap.put(streamName, Collections.singletonList(shard1));
 
 		// override createFetcher to mock Kinesis
 		FlinkKinesisConsumer<String> sourceFunc =
@@ -878,7 +882,16 @@ public class FlinkKinesisConsumerTest {
 							subscribedStreamsToLastDiscoveredShardIds,
 							(props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(
 								streamToQueueMap)
-						) {};
+						) {
+							@Override
+							protected void emitWatermark() {
+								// necessary in this test to ensure that watermark state is updated
+								// before the watermark timer callback is triggered
+								synchronized (sourceContext.getCheckpointLock()) {
+									super.emitWatermark();
+								}
+							}
+						};
 					return fetcher;
 				}
 			};
@@ -952,27 +965,33 @@ public class FlinkKinesisConsumerTest {
 
 		// trigger sync
 		testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
-		TestWatermarkTracker.assertSingleWatermark(-4);
+		TestWatermarkTracker.assertGlobalWatermark(-4);
 
 		final long record2 = record1 + (watermarkSyncInterval * 3) + 1;
 		shard1.put(Long.toString(record2));
 
-		// TODO: check for record received instead
-		Thread.sleep(100);
+		// wait for the record to be buffered in the emitter
+		final RecordEmitter<?> emitter = org.powermock.reflect.Whitebox.getInternalState(fetcher, "recordEmitter");
+		RecordEmitter.RecordQueue emitterQueue = emitter.getQueue(0);
+		Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+		while (deadline.hasTimeLeft() && emitterQueue.getSize() < 1) {
+			Thread.sleep(10);
+		}
+		assertEquals("first record received", 1, emitterQueue.getSize());
 
 		// Advance the watermark. Since the new record is past global watermark + threshold,
 		// it won't be emitted and the watermark does not advance
 		testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
 		assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
 		assertEquals(3000L, (long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark"));
-		TestWatermarkTracker.assertSingleWatermark(-4);
+		TestWatermarkTracker.assertGlobalWatermark(-4);
 
 		// Trigger global watermark sync
 		testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
 		expectedResults.add(Long.toString(record2));
 		awaitRecordCount(results, expectedResults.size());
 		assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
-		TestWatermarkTracker.assertSingleWatermark(3000);
+		TestWatermarkTracker.assertGlobalWatermark(3000);
 
 		// Trigger watermark update and emit
 		testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
@@ -984,8 +1003,8 @@ public class FlinkKinesisConsumerTest {
 	}
 
 	private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count) throws Exception {
-		long timeoutMillis = System.currentTimeMillis() + 10_000;
-		while (System.currentTimeMillis() < timeoutMillis && queue.size() < count) {
+		Deadline deadline  = Deadline.fromNow(Duration.ofSeconds(10));
+		while (deadline.hasTimeLeft() && queue.size() < count) {
 			Thread.sleep(10);
 		}
 	}
@@ -1018,7 +1037,7 @@ public class FlinkKinesisConsumerTest {
 			return localWatermark;
 		}
 
-		static void assertSingleWatermark(long expected) {
+		static void assertGlobalWatermark(long expected) {
 			Assert.assertEquals(expected, WATERMARK.get());
 		}
 	}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 20373ff..ee4e0a3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -499,7 +499,7 @@ public class FakeKinesisBehavioursFactory {
 				String data = queue.take();
 				Record record = new Record()
 					.withData(
-						ByteBuffer.wrap(String.valueOf(data).getBytes(ConfigConstants.DEFAULT_CHARSET)))
+						ByteBuffer.wrap(data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
 					.withPartitionKey(UUID.randomUUID().toString())
 					.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
 					.withSequenceNumber(String.valueOf(0));