You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/02 17:39:16 UTC
[flink] 01/02: [FLINK-12768][tests] Fix
FlinkKinesisConsumerTest.testSourceSynchronization race condition
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 66d0e31294b2588e1aabccb952fdbf2bcfabe878
Author: Thomas Weise <th...@apache.org>
AuthorDate: Tue Jul 16 12:41:22 2019 -0700
[FLINK-12768][tests] Fix FlinkKinesisConsumerTest.testSourceSynchronization race condition
This closes #9183.
---
.../kinesis/internals/KinesisDataFetcher.java | 9 +----
.../connectors/kinesis/util/RecordEmitter.java | 9 ++---
.../kinesis/FlinkKinesisConsumerTest.java | 45 +++++++++++++++-------
.../testutils/FakeKinesisBehavioursFactory.java | 2 +-
4 files changed, 38 insertions(+), 27 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index eae3153..f38e6eb 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -270,8 +270,6 @@ public class KinesisDataFetcher<T> {
@Override
public void emit(RecordWrapper<T> record, RecordQueue<RecordWrapper<T>> queue) {
emitRecordAndUpdateState(record);
- ShardWatermarkState<T> sws = shardWatermarks.get(queue.getQueueId());
- sws.lastEmittedRecordWatermark = record.watermark;
}
}
@@ -290,11 +288,6 @@ public class KinesisDataFetcher<T> {
}
@Override
- public int getQueueId() {
- return producerIndex;
- }
-
- @Override
public int getSize() {
return 0;
}
@@ -770,6 +763,8 @@ public class KinesisDataFetcher<T> {
synchronized (checkpointLock) {
if (rw.getValue() != null) {
sourceContext.collectWithTimestamp(rw.getValue(), rw.timestamp);
+ ShardWatermarkState<T> sws = shardWatermarks.get(rw.shardStateIndex);
+ sws.lastEmittedRecordWatermark = rw.watermark;
} else {
LOG.warn("Skipping non-deserializable record at sequence number {} of shard {}.",
rw.lastSequenceNumber,
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
index 17344b1..da74b08 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
@@ -79,8 +79,6 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna
public interface RecordQueue<T> {
void put(T record) throws InterruptedException;
- int getQueueId();
-
int getSize();
T peek();
@@ -98,6 +96,7 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna
this.headTimestamp = Long.MAX_VALUE;
}
+ @Override
public void put(T record) throws InterruptedException {
queue.put(record);
synchronized (condition) {
@@ -105,14 +104,12 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna
}
}
- public int getQueueId() {
- return queueId;
- }
-
+ @Override
public int getSize() {
return queue.size();
}
+ @Override
public T peek() {
return queue.peek();
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index cbcd8b4..0eb7bb3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
@@ -52,6 +53,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen
import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.CollectingSourceContext;
@@ -72,7 +74,9 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -531,8 +535,8 @@ public class FlinkKinesisConsumerTest {
}
/**
- * FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#shardId} or
- * {@link StreamShardMetadata#streamName} does not result in the shard not being able to be restored.
+ * FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#getShardId()} or
+ * {@link StreamShardMetadata#getStreamName()} does not result in the shard not being able to be restored.
* This handles the corner case where the stored shard metadata is open (no ending sequence number), but after the
* job restore, the shard has been closed (ending number set) due to re-sharding, and we can no longer rely on
* {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection of restored shard metadata.
@@ -833,6 +837,7 @@ public class FlinkKinesisConsumerTest {
final long autoWatermarkInterval = 1_000;
final long watermarkSyncInterval = autoWatermarkInterval + 1;
+ TestWatermarkTracker.WATERMARK.set(0);
HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds = new HashMap<>();
subscribedStreamsToLastDiscoveredShardIds.put(streamName, null);
@@ -846,10 +851,9 @@ public class FlinkKinesisConsumerTest {
props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, Long.toString(5));
BlockingQueue<String> shard1 = new LinkedBlockingQueue();
- BlockingQueue<String> shard2 = new LinkedBlockingQueue();
Map<String, List<BlockingQueue<String>>> streamToQueueMap = new HashMap<>();
- streamToQueueMap.put(streamName, Lists.newArrayList(shard1, shard2));
+ streamToQueueMap.put(streamName, Collections.singletonList(shard1));
// override createFetcher to mock Kinesis
FlinkKinesisConsumer<String> sourceFunc =
@@ -878,7 +882,16 @@ public class FlinkKinesisConsumerTest {
subscribedStreamsToLastDiscoveredShardIds,
(props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(
streamToQueueMap)
- ) {};
+ ) {
+ @Override
+ protected void emitWatermark() {
+ // necessary in this test to ensure that watermark state is updated
+ // before the watermark timer callback is triggered
+ synchronized (sourceContext.getCheckpointLock()) {
+ super.emitWatermark();
+ }
+ }
+ };
return fetcher;
}
};
@@ -952,27 +965,33 @@ public class FlinkKinesisConsumerTest {
// trigger sync
testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
- TestWatermarkTracker.assertSingleWatermark(-4);
+ TestWatermarkTracker.assertGlobalWatermark(-4);
final long record2 = record1 + (watermarkSyncInterval * 3) + 1;
shard1.put(Long.toString(record2));
- // TODO: check for record received instead
- Thread.sleep(100);
+ // wait for the record to be buffered in the emitter
+ final RecordEmitter<?> emitter = org.powermock.reflect.Whitebox.getInternalState(fetcher, "recordEmitter");
+ RecordEmitter.RecordQueue emitterQueue = emitter.getQueue(0);
+ Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+ while (deadline.hasTimeLeft() && emitterQueue.getSize() < 1) {
+ Thread.sleep(10);
+ }
+ assertEquals("first record received", 1, emitterQueue.getSize());
// Advance the watermark. Since the new record is past global watermark + threshold,
// it won't be emitted and the watermark does not advance
testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
assertEquals(3000L, (long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark"));
- TestWatermarkTracker.assertSingleWatermark(-4);
+ TestWatermarkTracker.assertGlobalWatermark(-4);
// Trigger global watermark sync
testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
expectedResults.add(Long.toString(record2));
awaitRecordCount(results, expectedResults.size());
assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
- TestWatermarkTracker.assertSingleWatermark(3000);
+ TestWatermarkTracker.assertGlobalWatermark(3000);
// Trigger watermark update and emit
testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
@@ -984,8 +1003,8 @@ public class FlinkKinesisConsumerTest {
}
private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count) throws Exception {
- long timeoutMillis = System.currentTimeMillis() + 10_000;
- while (System.currentTimeMillis() < timeoutMillis && queue.size() < count) {
+ Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+ while (deadline.hasTimeLeft() && queue.size() < count) {
Thread.sleep(10);
}
}
@@ -1018,7 +1037,7 @@ public class FlinkKinesisConsumerTest {
return localWatermark;
}
- static void assertSingleWatermark(long expected) {
+ static void assertGlobalWatermark(long expected) {
Assert.assertEquals(expected, WATERMARK.get());
}
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 20373ff..ee4e0a3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -499,7 +499,7 @@ public class FakeKinesisBehavioursFactory {
String data = queue.take();
Record record = new Record()
.withData(
- ByteBuffer.wrap(String.valueOf(data).getBytes(ConfigConstants.DEFAULT_CHARSET)))
+ ByteBuffer.wrap(data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
.withPartitionKey(UUID.randomUUID().toString())
.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
.withSequenceNumber(String.valueOf(0));