You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/02 17:39:46 UTC

[flink] branch master updated (61352fb -> 6b1be17)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 61352fb  [FLINK-13541][state-processor-api] State Processor Api sets the wrong key selector when writing savepoints
     new 8da6965  [FLINK-12768][tests] Fix FlinkKinesisConsumerTest.testSourceSynchronization race condition
     new 6b1be17  [hotfix] Let FlinkKinesisConsumerTest extend TestLogger

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kinesis/internals/KinesisDataFetcher.java      |  9 +---
 .../connectors/kinesis/util/RecordEmitter.java     |  9 ++--
 .../kinesis/FlinkKinesisConsumerTest.java          | 48 +++++++++++++++-------
 .../testutils/FakeKinesisBehavioursFactory.java    |  2 +-
 4 files changed, 40 insertions(+), 28 deletions(-)


[flink] 02/02: [hotfix] Let FlinkKinesisConsumerTest extend TestLogger

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6b1be170a617c7b3a75a9c7dbbc5e4d369476459
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Aug 2 16:10:33 2019 +0200

    [hotfix] Let FlinkKinesisConsumerTest extend TestLogger
---
 .../flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 0eb7bb3..1a910cb 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -57,6 +57,7 @@ import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
 import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.CollectingSourceContext;
+import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -103,7 +104,7 @@ import static org.mockito.Mockito.when;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
-public class FlinkKinesisConsumerTest {
+public class FlinkKinesisConsumerTest extends TestLogger {
 
 	@Rule
 	private ExpectedException exception = ExpectedException.none();


[flink] 01/02: [FLINK-12768][tests] Fix FlinkKinesisConsumerTest.testSourceSynchronization race condition

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8da696535d0c39323f480cae8f4e9c66e866bec4
Author: Thomas Weise <th...@apache.org>
AuthorDate: Tue Jul 16 12:41:22 2019 -0700

    [FLINK-12768][tests] Fix FlinkKinesisConsumerTest.testSourceSynchronization race condition
    
    This closes #9183.
---
 .../kinesis/internals/KinesisDataFetcher.java      |  9 +----
 .../connectors/kinesis/util/RecordEmitter.java     |  9 ++---
 .../kinesis/FlinkKinesisConsumerTest.java          | 45 +++++++++++++++-------
 .../testutils/FakeKinesisBehavioursFactory.java    |  2 +-
 4 files changed, 38 insertions(+), 27 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index eae3153..f38e6eb 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -270,8 +270,6 @@ public class KinesisDataFetcher<T> {
 		@Override
 		public void emit(RecordWrapper<T> record, RecordQueue<RecordWrapper<T>> queue) {
 			emitRecordAndUpdateState(record);
-			ShardWatermarkState<T> sws = shardWatermarks.get(queue.getQueueId());
-			sws.lastEmittedRecordWatermark = record.watermark;
 		}
 	}
 
@@ -290,11 +288,6 @@ public class KinesisDataFetcher<T> {
 					}
 
 					@Override
-					public int getQueueId() {
-						return producerIndex;
-					}
-
-					@Override
 					public int getSize() {
 						return 0;
 					}
@@ -770,6 +763,8 @@ public class KinesisDataFetcher<T> {
 		synchronized (checkpointLock) {
 			if (rw.getValue() != null) {
 				sourceContext.collectWithTimestamp(rw.getValue(), rw.timestamp);
+				ShardWatermarkState<T> sws = shardWatermarks.get(rw.shardStateIndex);
+				sws.lastEmittedRecordWatermark = rw.watermark;
 			} else {
 				LOG.warn("Skipping non-deserializable record at sequence number {} of shard {}.",
 					rw.lastSequenceNumber,
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
index 17344b1..da74b08 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
@@ -79,8 +79,6 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna
 	public interface RecordQueue<T> {
 		void put(T record) throws InterruptedException;
 
-		int getQueueId();
-
 		int getSize();
 
 		T peek();
@@ -98,6 +96,7 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna
 			this.headTimestamp = Long.MAX_VALUE;
 		}
 
+		@Override
 		public void put(T record) throws InterruptedException {
 			queue.put(record);
 			synchronized (condition) {
@@ -105,14 +104,12 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna
 			}
 		}
 
-		public int getQueueId() {
-			return queueId;
-		}
-
+		@Override
 		public int getSize() {
 			return queue.size();
 		}
 
+		@Override
 		public T peek() {
 			return queue.peek();
 		}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index cbcd8b4..0eb7bb3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
@@ -52,6 +53,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
 import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.CollectingSourceContext;
@@ -72,7 +74,9 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -531,8 +535,8 @@ public class FlinkKinesisConsumerTest {
 	}
 
 	/**
-	 * FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#shardId} or
-	 * {@link StreamShardMetadata#streamName} does not result in the shard not being able to be restored.
+	 * FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#getShardId()} or
+	 * {@link StreamShardMetadata#getStreamName()} does not result in the shard not being able to be restored.
 	 * This handles the corner case where the stored shard metadata is open (no ending sequence number), but after the
 	 * job restore, the shard has been closed (ending number set) due to re-sharding, and we can no longer rely on
 	 * {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection of restored shard metadata.
@@ -833,6 +837,7 @@ public class FlinkKinesisConsumerTest {
 		final long autoWatermarkInterval = 1_000;
 		final long watermarkSyncInterval = autoWatermarkInterval + 1;
 
+		TestWatermarkTracker.WATERMARK.set(0);
 		HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds = new HashMap<>();
 		subscribedStreamsToLastDiscoveredShardIds.put(streamName, null);
 
@@ -846,10 +851,9 @@ public class FlinkKinesisConsumerTest {
 		props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, Long.toString(5));
 
 		BlockingQueue<String> shard1 = new LinkedBlockingQueue();
-		BlockingQueue<String> shard2 = new LinkedBlockingQueue();
 
 		Map<String, List<BlockingQueue<String>>> streamToQueueMap = new HashMap<>();
-		streamToQueueMap.put(streamName, Lists.newArrayList(shard1, shard2));
+		streamToQueueMap.put(streamName, Collections.singletonList(shard1));
 
 		// override createFetcher to mock Kinesis
 		FlinkKinesisConsumer<String> sourceFunc =
@@ -878,7 +882,16 @@ public class FlinkKinesisConsumerTest {
 							subscribedStreamsToLastDiscoveredShardIds,
 							(props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(
 								streamToQueueMap)
-						) {};
+						) {
+							@Override
+							protected void emitWatermark() {
+								// necessary in this test to ensure that watermark state is updated
+								// before the watermark timer callback is triggered
+								synchronized (sourceContext.getCheckpointLock()) {
+									super.emitWatermark();
+								}
+							}
+						};
 					return fetcher;
 				}
 			};
@@ -952,27 +965,33 @@ public class FlinkKinesisConsumerTest {
 
 		// trigger sync
 		testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
-		TestWatermarkTracker.assertSingleWatermark(-4);
+		TestWatermarkTracker.assertGlobalWatermark(-4);
 
 		final long record2 = record1 + (watermarkSyncInterval * 3) + 1;
 		shard1.put(Long.toString(record2));
 
-		// TODO: check for record received instead
-		Thread.sleep(100);
+		// wait for the record to be buffered in the emitter
+		final RecordEmitter<?> emitter = org.powermock.reflect.Whitebox.getInternalState(fetcher, "recordEmitter");
+		RecordEmitter.RecordQueue emitterQueue = emitter.getQueue(0);
+		Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+		while (deadline.hasTimeLeft() && emitterQueue.getSize() < 1) {
+			Thread.sleep(10);
+		}
+		assertEquals("first record received", 1, emitterQueue.getSize());
 
 		// Advance the watermark. Since the new record is past global watermark + threshold,
 		// it won't be emitted and the watermark does not advance
 		testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
 		assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
 		assertEquals(3000L, (long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark"));
-		TestWatermarkTracker.assertSingleWatermark(-4);
+		TestWatermarkTracker.assertGlobalWatermark(-4);
 
 		// Trigger global watermark sync
 		testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
 		expectedResults.add(Long.toString(record2));
 		awaitRecordCount(results, expectedResults.size());
 		assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
-		TestWatermarkTracker.assertSingleWatermark(3000);
+		TestWatermarkTracker.assertGlobalWatermark(3000);
 
 		// Trigger watermark update and emit
 		testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
@@ -984,8 +1003,8 @@ public class FlinkKinesisConsumerTest {
 	}
 
 	private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count) throws Exception {
-		long timeoutMillis = System.currentTimeMillis() + 10_000;
-		while (System.currentTimeMillis() < timeoutMillis && queue.size() < count) {
+		Deadline deadline  = Deadline.fromNow(Duration.ofSeconds(10));
+		while (deadline.hasTimeLeft() && queue.size() < count) {
 			Thread.sleep(10);
 		}
 	}
@@ -1018,7 +1037,7 @@ public class FlinkKinesisConsumerTest {
 			return localWatermark;
 		}
 
-		static void assertSingleWatermark(long expected) {
+		static void assertGlobalWatermark(long expected) {
 			Assert.assertEquals(expected, WATERMARK.get());
 		}
 	}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 20373ff..ee4e0a3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -499,7 +499,7 @@ public class FakeKinesisBehavioursFactory {
 				String data = queue.take();
 				Record record = new Record()
 					.withData(
-						ByteBuffer.wrap(String.valueOf(data).getBytes(ConfigConstants.DEFAULT_CHARSET)))
+						ByteBuffer.wrap(data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
 					.withPartitionKey(UUID.randomUUID().toString())
 					.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
 					.withSequenceNumber(String.valueOf(0));