You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/12/06 16:38:04 UTC

[flink-connector-pulsar] 03/16: [FLINK-28853] Address PR comments / Add Kafka and Pulsar split pausing tests

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit b0a1678f91dae9b7bee35f77c79b441f365499be
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Thu Sep 1 16:15:33 2022 +0200

    [FLINK-28853] Address PR comments / Add Kafka and Pulsar split pausing tests
---
 .../split/PulsarPartitionSplitReaderBase.java      |  9 +++---
 .../source/PulsarOrderedSourceReaderTest.java      | 32 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 4 deletions(-)

diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
index c1459a6..b884c57 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
@@ -162,7 +162,7 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
 
         List<PulsarPartitionSplit> newSplits = splitsChanges.splits();
         Preconditions.checkArgument(
-                newSplits.size() == 1, "This pulsar split reader only support one split.");
+                newSplits.size() == 1, "This pulsar split reader only supports one split.");
         this.registeredSplit = newSplits.get(0);
 
         // Open stop cursor.
@@ -184,9 +184,10 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
     public void pauseOrResumeSplits(
             Collection<PulsarPartitionSplit> splitsToPause,
             Collection<PulsarPartitionSplit> splitsToResume) {
-        if (splitsToPause.size() > 1 || splitsToResume.size() > 1) {
-            throw new IllegalStateException("This pulsar split reader only support one split.");
-        }
+        // This shouldn't happen but just in case...
+        Preconditions.checkState(
+                splitsToPause.size() + splitsToResume.size() <= 1,
+                "This pulsar split reader only supports one split.");
 
         if (!splitsToPause.isEmpty()) {
             pulsarConsumer.pause();
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
index 9806108..4ec1912 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
@@ -29,11 +29,13 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.Timeout;
 
 import java.time.Duration;
 import java.util.Collections;
@@ -130,6 +132,36 @@ class PulsarOrderedSourceReaderTest extends PulsarSourceReaderTestBase {
         }
     }
 
+    @TestTemplate
+    @Timeout(600)
+    void supportsPausingOrResumingSplits(
+            PulsarSourceReaderBase<Integer> reader, Boundedness boundedness, String topicName)
+            throws Exception {
+        final PulsarPartitionSplit split =
+                createPartitionSplit(topicName, 0, boundedness, MessageId.earliest);
+
+        reader.addSplits(Collections.singletonList(split));
+
+        TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
+
+        reader.pauseOrResumeSplits(
+                Collections.singletonList(split.splitId()), Collections.emptyList());
+
+        InputStatus status = reader.pollNext(output);
+        assertThat(status).isEqualTo(InputStatus.NOTHING_AVAILABLE);
+
+        reader.pauseOrResumeSplits(Collections.emptyList(), Collections.singleton(split.splitId()));
+
+        do {
+            status = reader.pollNext(output);
+            Thread.sleep(5);
+        } while (status != InputStatus.MORE_AVAILABLE);
+
+        assertThat(status).isEqualTo(InputStatus.MORE_AVAILABLE);
+
+        reader.close();
+    }
+
     private void setupSourceReader(
             PulsarSourceReaderBase<Integer> reader,
             String topicName,