You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/02 17:39:46 UTC
[flink] branch master updated (61352fb -> 6b1be17)
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.
from 61352fb [FLINK-13541][state-processor-api] State Processor Api sets the wrong key selector when writing savepoints
new 8da6965 [FLINK-12768][tests] Fix FlinkKinesisConsumerTest.testSourceSynchronization race condition
new 6b1be17 [hotfix] Let FlinkKinesisConsumerTest extend TestLogger
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../kinesis/internals/KinesisDataFetcher.java | 9 +---
.../connectors/kinesis/util/RecordEmitter.java | 9 ++--
.../kinesis/FlinkKinesisConsumerTest.java | 48 +++++++++++++++-------
.../testutils/FakeKinesisBehavioursFactory.java | 2 +-
4 files changed, 40 insertions(+), 28 deletions(-)
[flink] 02/02: [hotfix] Let FlinkKinesisConsumerTest extend
TestLogger
Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6b1be170a617c7b3a75a9c7dbbc5e4d369476459
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Aug 2 16:10:33 2019 +0200
[hotfix] Let FlinkKinesisConsumerTest extend TestLogger
---
.../flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 0eb7bb3..1a910cb 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -57,6 +57,7 @@ import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.CollectingSourceContext;
+import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -103,7 +104,7 @@ import static org.mockito.Mockito.when;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
-public class FlinkKinesisConsumerTest {
+public class FlinkKinesisConsumerTest extends TestLogger {
@Rule
private ExpectedException exception = ExpectedException.none();
[flink] 01/02: [FLINK-12768][tests] Fix
FlinkKinesisConsumerTest.testSourceSynchronization race condition
Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8da696535d0c39323f480cae8f4e9c66e866bec4
Author: Thomas Weise <th...@apache.org>
AuthorDate: Tue Jul 16 12:41:22 2019 -0700
[FLINK-12768][tests] Fix FlinkKinesisConsumerTest.testSourceSynchronization race condition
This closes #9183.
---
.../kinesis/internals/KinesisDataFetcher.java | 9 +----
.../connectors/kinesis/util/RecordEmitter.java | 9 ++---
.../kinesis/FlinkKinesisConsumerTest.java | 45 +++++++++++++++-------
.../testutils/FakeKinesisBehavioursFactory.java | 2 +-
4 files changed, 38 insertions(+), 27 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index eae3153..f38e6eb 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -270,8 +270,6 @@ public class KinesisDataFetcher<T> {
@Override
public void emit(RecordWrapper<T> record, RecordQueue<RecordWrapper<T>> queue) {
emitRecordAndUpdateState(record);
- ShardWatermarkState<T> sws = shardWatermarks.get(queue.getQueueId());
- sws.lastEmittedRecordWatermark = record.watermark;
}
}
@@ -290,11 +288,6 @@ public class KinesisDataFetcher<T> {
}
@Override
- public int getQueueId() {
- return producerIndex;
- }
-
- @Override
public int getSize() {
return 0;
}
@@ -770,6 +763,8 @@ public class KinesisDataFetcher<T> {
synchronized (checkpointLock) {
if (rw.getValue() != null) {
sourceContext.collectWithTimestamp(rw.getValue(), rw.timestamp);
+ ShardWatermarkState<T> sws = shardWatermarks.get(rw.shardStateIndex);
+ sws.lastEmittedRecordWatermark = rw.watermark;
} else {
LOG.warn("Skipping non-deserializable record at sequence number {} of shard {}.",
rw.lastSequenceNumber,
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
index 17344b1..da74b08 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
@@ -79,8 +79,6 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna
public interface RecordQueue<T> {
void put(T record) throws InterruptedException;
- int getQueueId();
-
int getSize();
T peek();
@@ -98,6 +96,7 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna
this.headTimestamp = Long.MAX_VALUE;
}
+ @Override
public void put(T record) throws InterruptedException {
queue.put(record);
synchronized (condition) {
@@ -105,14 +104,12 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna
}
}
- public int getQueueId() {
- return queueId;
- }
-
+ @Override
public int getSize() {
return queue.size();
}
+ @Override
public T peek() {
return queue.peek();
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index cbcd8b4..0eb7bb3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
@@ -52,6 +53,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen
import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.CollectingSourceContext;
@@ -72,7 +74,9 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -531,8 +535,8 @@ public class FlinkKinesisConsumerTest {
}
/**
- * FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#shardId} or
- * {@link StreamShardMetadata#streamName} does not result in the shard not being able to be restored.
+ * FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#getShardId()} or
+ * {@link StreamShardMetadata#getStreamName()} does not result in the shard not being able to be restored.
* This handles the corner case where the stored shard metadata is open (no ending sequence number), but after the
* job restore, the shard has been closed (ending number set) due to re-sharding, and we can no longer rely on
* {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection of restored shard metadata.
@@ -833,6 +837,7 @@ public class FlinkKinesisConsumerTest {
final long autoWatermarkInterval = 1_000;
final long watermarkSyncInterval = autoWatermarkInterval + 1;
+ TestWatermarkTracker.WATERMARK.set(0);
HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds = new HashMap<>();
subscribedStreamsToLastDiscoveredShardIds.put(streamName, null);
@@ -846,10 +851,9 @@ public class FlinkKinesisConsumerTest {
props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, Long.toString(5));
BlockingQueue<String> shard1 = new LinkedBlockingQueue();
- BlockingQueue<String> shard2 = new LinkedBlockingQueue();
Map<String, List<BlockingQueue<String>>> streamToQueueMap = new HashMap<>();
- streamToQueueMap.put(streamName, Lists.newArrayList(shard1, shard2));
+ streamToQueueMap.put(streamName, Collections.singletonList(shard1));
// override createFetcher to mock Kinesis
FlinkKinesisConsumer<String> sourceFunc =
@@ -878,7 +882,16 @@ public class FlinkKinesisConsumerTest {
subscribedStreamsToLastDiscoveredShardIds,
(props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(
streamToQueueMap)
- ) {};
+ ) {
+ @Override
+ protected void emitWatermark() {
+ // necessary in this test to ensure that watermark state is updated
+ // before the watermark timer callback is triggered
+ synchronized (sourceContext.getCheckpointLock()) {
+ super.emitWatermark();
+ }
+ }
+ };
return fetcher;
}
};
@@ -952,27 +965,33 @@ public class FlinkKinesisConsumerTest {
// trigger sync
testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
- TestWatermarkTracker.assertSingleWatermark(-4);
+ TestWatermarkTracker.assertGlobalWatermark(-4);
final long record2 = record1 + (watermarkSyncInterval * 3) + 1;
shard1.put(Long.toString(record2));
- // TODO: check for record received instead
- Thread.sleep(100);
+ // wait for the record to be buffered in the emitter
+ final RecordEmitter<?> emitter = org.powermock.reflect.Whitebox.getInternalState(fetcher, "recordEmitter");
+ RecordEmitter.RecordQueue emitterQueue = emitter.getQueue(0);
+ Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+ while (deadline.hasTimeLeft() && emitterQueue.getSize() < 1) {
+ Thread.sleep(10);
+ }
+ assertEquals("first record received", 1, emitterQueue.getSize());
// Advance the watermark. Since the new record is past global watermark + threshold,
// it won't be emitted and the watermark does not advance
testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
assertEquals(3000L, (long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark"));
- TestWatermarkTracker.assertSingleWatermark(-4);
+ TestWatermarkTracker.assertGlobalWatermark(-4);
// Trigger global watermark sync
testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
expectedResults.add(Long.toString(record2));
awaitRecordCount(results, expectedResults.size());
assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
- TestWatermarkTracker.assertSingleWatermark(3000);
+ TestWatermarkTracker.assertGlobalWatermark(3000);
// Trigger watermark update and emit
testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
@@ -984,8 +1003,8 @@ public class FlinkKinesisConsumerTest {
}
private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count) throws Exception {
- long timeoutMillis = System.currentTimeMillis() + 10_000;
- while (System.currentTimeMillis() < timeoutMillis && queue.size() < count) {
+ Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+ while (deadline.hasTimeLeft() && queue.size() < count) {
Thread.sleep(10);
}
}
@@ -1018,7 +1037,7 @@ public class FlinkKinesisConsumerTest {
return localWatermark;
}
- static void assertSingleWatermark(long expected) {
+ static void assertGlobalWatermark(long expected) {
Assert.assertEquals(expected, WATERMARK.get());
}
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 20373ff..ee4e0a3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -499,7 +499,7 @@ public class FakeKinesisBehavioursFactory {
String data = queue.take();
Record record = new Record()
.withData(
- ByteBuffer.wrap(String.valueOf(data).getBytes(ConfigConstants.DEFAULT_CHARSET)))
+ ByteBuffer.wrap(data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
.withPartitionKey(UUID.randomUUID().toString())
.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
.withSequenceNumber(String.valueOf(0));