You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/12/06 16:38:04 UTC
[flink-connector-pulsar] 03/16: [FLINK-28853] Address PR comments / Add Kafka and Pulsar split pausing tests
This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
commit b0a1678f91dae9b7bee35f77c79b441f365499be
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Thu Sep 1 16:15:33 2022 +0200
[FLINK-28853] Address PR comments / Add Kafka and Pulsar split pausing tests
---
.../split/PulsarPartitionSplitReaderBase.java | 9 +++---
.../source/PulsarOrderedSourceReaderTest.java | 32 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 4 deletions(-)
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
index c1459a6..b884c57 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
@@ -162,7 +162,7 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
List<PulsarPartitionSplit> newSplits = splitsChanges.splits();
Preconditions.checkArgument(
- newSplits.size() == 1, "This pulsar split reader only support one split.");
+ newSplits.size() == 1, "This pulsar split reader only supports one split.");
this.registeredSplit = newSplits.get(0);
// Open stop cursor.
@@ -184,9 +184,10 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
public void pauseOrResumeSplits(
Collection<PulsarPartitionSplit> splitsToPause,
Collection<PulsarPartitionSplit> splitsToResume) {
- if (splitsToPause.size() > 1 || splitsToResume.size() > 1) {
- throw new IllegalStateException("This pulsar split reader only support one split.");
- }
+ // This shouldn't happen but just in case...
+ Preconditions.checkState(
+ splitsToPause.size() + splitsToResume.size() <= 1,
+ "This pulsar split reader only supports one split.");
if (!splitsToPause.isEmpty()) {
pulsarConsumer.pause();
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
index 9806108..4ec1912 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
@@ -29,11 +29,13 @@ import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.Timeout;
import java.time.Duration;
import java.util.Collections;
@@ -130,6 +132,36 @@ class PulsarOrderedSourceReaderTest extends PulsarSourceReaderTestBase {
}
}
+ @TestTemplate
+ @Timeout(600)
+ void supportsPausingOrResumingSplits(
+ PulsarSourceReaderBase<Integer> reader, Boundedness boundedness, String topicName)
+ throws Exception {
+ final PulsarPartitionSplit split =
+ createPartitionSplit(topicName, 0, boundedness, MessageId.earliest);
+
+ reader.addSplits(Collections.singletonList(split));
+
+ TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
+
+ reader.pauseOrResumeSplits(
+ Collections.singletonList(split.splitId()), Collections.emptyList());
+
+ InputStatus status = reader.pollNext(output);
+ assertThat(status).isEqualTo(InputStatus.NOTHING_AVAILABLE);
+
+ reader.pauseOrResumeSplits(Collections.emptyList(), Collections.singleton(split.splitId()));
+
+ do {
+ status = reader.pollNext(output);
+ Thread.sleep(5);
+ } while (status != InputStatus.MORE_AVAILABLE);
+
+ assertThat(status).isEqualTo(InputStatus.MORE_AVAILABLE);
+
+ reader.close();
+ }
+
private void setupSourceReader(
PulsarSourceReaderBase<Integer> reader,
String topicName,