You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2022/08/12 12:48:42 UTC
[flink] branch master updated: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. (#19972)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 18d21a0618f [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. (#19972)
18d21a0618f is described below
commit 18d21a0618f2195d4279828f610094ffccd052b3
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Fri Aug 12 20:48:34 2022 +0800
[FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. (#19972)
* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for https://github.com/apache/pulsar/pull/16171
---
.../docs/connectors/datastream/pulsar.md | 30 ++-
docs/content/docs/connectors/datastream/pulsar.md | 37 +++-
.../generated/pulsar_consumer_configuration.html | 6 -
.../connector/pulsar/source/PulsarSource.java | 14 +-
.../pulsar/source/PulsarSourceBuilder.java | 8 +-
.../pulsar/source/PulsarSourceOptions.java | 7 +
.../source/config/PulsarSourceConfigUtils.java | 3 -
.../pulsar/source/config/SourceConfiguration.java | 4 +
.../source/enumerator/PulsarSourceEnumState.java | 16 +-
.../source/enumerator/PulsarSourceEnumerator.java | 148 ++++++-------
.../source/enumerator/SplitsAssignmentState.java | 239 ---------------------
.../assigner/NonSharedSplitAssigner.java | 126 +++++++++++
.../enumerator/assigner/SharedSplitAssigner.java | 148 +++++++++++++
.../source/enumerator/assigner/SplitAssigner.java | 64 ++++++
.../enumerator/assigner/SplitAssignerFactory.java | 65 ++++++
.../source/enumerator/cursor/CursorPosition.java | 36 ++--
.../source/enumerator/cursor/MessageIdUtils.java | 71 ++++++
.../source/enumerator/cursor/StartCursor.java | 32 ++-
.../source/enumerator/cursor/StopCursor.java | 96 +++++++--
.../cursor/start/MessageIdStartCursor.java | 24 +--
.../cursor/start/TimestampStartCursor.java | 4 +-
.../cursor/stop/EventTimestampStopCursor.java | 9 +-
.../cursor/stop/LatestMessageStopCursor.java | 21 +-
.../cursor/stop/MessageIdStopCursor.java | 32 +--
.../enumerator/cursor/stop/NeverStopCursor.java | 4 +-
.../cursor/stop/PublishTimestampStopCursor.java | 9 +-
.../enumerator/subscriber/PulsarSubscriber.java | 2 +-
.../source/reader/PulsarSourceReaderFactory.java | 2 +-
.../split/PulsarOrderedPartitionSplitReader.java | 42 +++-
.../split/PulsarPartitionSplitReaderBase.java | 55 ++---
.../split/PulsarUnorderedPartitionSplitReader.java | 2 +-
.../pulsar/source/split/PulsarPartitionSplit.java | 4 +-
.../pulsar/source/PulsarSourceITCase.java | 20 --
.../PulsarSourceEnumStateSerializerTest.java | 5 +-
.../enumerator/PulsarSourceEnumeratorTest.java | 10 +-
.../enumerator/SplitsAssignmentStateTest.java | 119 ----------
.../assigner/NonSharedSplitAssignerTest.java | 95 ++++++++
.../assigner/SharedSplitAssignerTest.java | 98 +++++++++
.../enumerator/assigner/SplitAssignerTestBase.java | 113 ++++++++++
.../source/enumerator/cursor/StopCursorTest.java | 11 +-
.../split/PulsarPartitionSplitReaderTestBase.java | 8 +-
.../pulsar/testutils/PulsarTestContext.java | 10 +-
.../testutils/runtime/PulsarRuntimeOperator.java | 80 +++----
43 files changed, 1231 insertions(+), 698 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md
index eace8bb8fc5..7287ee3f3d4 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -322,6 +322,7 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
```
{{< /tab >}}
{{< /tabs >}}
+
- 与前者不同的是,给定的消息可以跳过,再进行消费。
{{< tabs "pulsar-starting-position-from-message-id-bool" >}}
{{< tab "Java" >}}
@@ -335,7 +336,8 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
```
{{< /tab >}}
{{< /tabs >}}
-- 从给定的消息时间开始消费。
+
+- 从给定的消息发布时间开始消费,这个方法因为名称容易导致误解现在已经不建议使用。你可以使用方法 `StartCursor.fromPublishTime(long)`。
{{< tabs "pulsar-starting-position-message-time" >}}
{{< tab "Java" >}}
```java
@@ -349,6 +351,11 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
{{< /tab >}}
{{< /tabs >}}
+- 从给定的消息发布时间开始消费。
+ ```java
+ StartCursor.fromPublishTime(long);
+ ```
+
{{< hint info >}}
每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 Pulsar 底层存储上查找到具体的消息。
@@ -404,6 +411,7 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
```
{{< /tab >}}
{{< /tabs >}}
+
- 停止于某条消息之后,结果里包含此消息。
{{< tabs "pulsar-boundedness-after-message-id" >}}
{{< tab "Java" >}}
@@ -417,7 +425,18 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
```
{{< /tab >}}
{{< /tabs >}}
-- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`。
+
+- 停止于某个给定的消息事件时间戳,比如 `Message<byte[]>.getEventTime()`,消费结果里不包含此时间戳的消息。
+ ```java
+ StopCursor.atEventTime(long);
+ ```
+
+- 停止于某个给定的消息事件时间戳,比如 `Message<byte[]>.getEventTime()`,消费结果里包含此时间戳的消息。
+ ```java
+ StopCursor.afterEventTime(long);
+ ```
+
+- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`,消费结果里不包含此时间戳的消息。
{{< tabs "pulsar-boundedness-publish-time" >}}
{{< tab "Java" >}}
```java
@@ -431,9 +450,10 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
{{< /tab >}}
{{< /tabs >}}
- {{< hint warning >}}
- StopCursor.atEventTime(long) 目前已经处于弃用状态。
- {{< /hint >}}
+- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`,消费结果里包含此时间戳的消息。
+ ```java
+ StopCursor.afterPublishTime(long);
+ ```
### Source 配置项
diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md
index 391fb97a1bf..9311c62328f 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -356,6 +356,7 @@ The Pulsar connector consumes from the latest available message if the message I
```
{{< /tab >}}
{{< /tabs >}}
+
- Start from a specified message between the earliest and the latest.
The Pulsar connector consumes from the latest available message if the message ID doesn't exist.
@@ -373,7 +374,10 @@ The Pulsar connector consumes from the latest available message if the message I
{{< /tab >}}
{{< /tabs >}}
-- Start from the specified message time by `Message<byte[]>.getPublishTime()`.
+- Start from the specified message publish time by `Message<byte[]>.getPublishTime()`.
+This method is deprecated because the name is totally wrong which may cause confuse.
+You can use `StartCursor.fromPublishTime(long)` instead.
+
{{< tabs "pulsar-starting-position-message-time" >}}
{{< tab "Java" >}}
```java
@@ -387,6 +391,11 @@ The Pulsar connector consumes from the latest available message if the message I
{{< /tab >}}
{{< /tabs >}}
+- Start from the specified message publish time by `Message<byte[]>.getPublishTime()`.
+ ```java
+ StartCursor.fromPublishTime(long);
+ ```
+
{{< hint info >}}
Each Pulsar message belongs to an ordered sequence on its topic.
The sequence ID (`MessageId`) of the message is ordered in that sequence.
@@ -420,7 +429,7 @@ Built-in stop cursors include:
{{< /tab >}}
{{< /tabs >}}
-- Stop at the latest available message when the Pulsar source starts consuming messages.
+- Stop at the latest available message when the Pulsar source starts consuming messages.
{{< tabs "pulsar-boundedness-latest" >}}
{{< tab "Java" >}}
```java
@@ -447,6 +456,7 @@ Built-in stop cursors include:
```
{{< /tab >}}
{{< /tabs >}}
+
- Stop but include the given message in the consuming result.
{{< tabs "pulsar-boundedness-after-message-id" >}}
{{< tab "Java" >}}
@@ -461,7 +471,20 @@ Built-in stop cursors include:
{{< /tab >}}
{{< /tabs >}}
-- Stop at the specified message time by `Message<byte[]>.getPublishTime()`.
+- Stop at the specified event time by `Message<byte[]>.getEventTime()`. The message with the
+given event time won't be included in the consuming result.
+ ```java
+ StopCursor.atEventTime(long);
+ ```
+
+- Stop after the specified event time by `Message<byte[]>.getEventTime()`. The message with the
+given event time will be included in the consuming result.
+ ```java
+ StopCursor.afterEventTime(long);
+ ```
+
+- Stop at the specified publish time by `Message<byte[]>.getPublishTime()`. The message with the
+given publish time won't be included in the consuming result.
{{< tabs "pulsar-boundedness-publish-time" >}}
{{< tab "Java" >}}
```java
@@ -475,9 +498,11 @@ Built-in stop cursors include:
{{< /tab >}}
{{< /tabs >}}
- {{< hint warning >}}
- StopCursor.atEventTime(long) is now deprecated.
- {{< /hint >}}
+- Stop after the specified publish time by `Message<byte[]>.getPublishTime()`. The message with the
+given publish time will be included in the consuming result.
+ ```java
+ StopCursor.afterPublishTime(long);
+ ```
### Source Configurable Options
diff --git a/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
index bc8b6df4060..4e05b270de4 100644
--- a/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
+++ b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
@@ -140,12 +140,6 @@ C5, 1, 1
<td>Boolean</td>
<td>If enabled, the consumer will automatically retry messages.</td>
</tr>
- <tr>
- <td><h5>pulsar.consumer.subscriptionInitialPosition</h5></td>
- <td style="word-wrap: break-word;">Latest</td>
- <td><p>Enum</p></td>
- <td>Initial position at which to set cursor when subscribing to a topic at first time.<br /><br />Possible values:<ul><li>"Latest"</li><li>"Earliest"</li></ul></td>
- </tr>
<tr>
<td><h5>pulsar.consumer.subscriptionMode</h5></td>
<td style="word-wrap: break-word;">Durable</td>
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
index a6c48d14bc8..6f4775df366 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
@@ -32,7 +32,8 @@ import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
-import org.apache.flink.connector.pulsar.source.enumerator.SplitsAssignmentState;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
@@ -142,15 +143,14 @@ public final class PulsarSource<OUT>
@Override
public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> createEnumerator(
SplitEnumeratorContext<PulsarPartitionSplit> enumContext) {
- SplitsAssignmentState assignmentState =
- new SplitsAssignmentState(stopCursor, sourceConfiguration);
+ SplitAssigner splitAssigner = SplitAssignerFactory.create(stopCursor, sourceConfiguration);
return new PulsarSourceEnumerator(
subscriber,
startCursor,
rangeGenerator,
sourceConfiguration,
enumContext,
- assignmentState);
+ splitAssigner);
}
@Internal
@@ -158,15 +158,15 @@ public final class PulsarSource<OUT>
public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> restoreEnumerator(
SplitEnumeratorContext<PulsarPartitionSplit> enumContext,
PulsarSourceEnumState checkpoint) {
- SplitsAssignmentState assignmentState =
- new SplitsAssignmentState(stopCursor, sourceConfiguration, checkpoint);
+ SplitAssigner splitAssigner =
+ SplitAssignerFactory.create(stopCursor, sourceConfiguration, checkpoint);
return new PulsarSourceEnumerator(
subscriber,
startCursor,
rangeGenerator,
sourceConfiguration,
enumContext,
- assignmentState);
+ splitAssigner);
}
@Internal
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index ed68215819f..1a5d6ea9758 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -97,7 +97,7 @@ import static org.apache.flink.util.Preconditions.checkState;
* <p>To stop the connector user has to disable the auto partition discovery. As auto partition
* discovery always expected new splits to come and not exiting. To disable auto partition
* discovery, use builder.setConfig({@link
- * PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
+ * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
*
* <pre>{@code
* PulsarSource<String> source = PulsarSource
@@ -266,7 +266,7 @@ public final class PulsarSourceBuilder<OUT> {
}
/**
- * The consumer name is informative and it can be used to identify a particular consumer
+ * The consumer name is informative, and it can be used to identify a particular consumer
* instance from the topic stats.
*/
public PulsarSourceBuilder<OUT> setConsumerName(String consumerName) {
@@ -321,7 +321,7 @@ public final class PulsarSourceBuilder<OUT> {
* <p>To stop the connector user has to disable the auto partition discovery. As auto partition
* discovery always expected new splits to come and not exiting. To disable auto partition
* discovery, use builder.setConfig({@link
- * PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
+ * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
*
* @param stopCursor The {@link StopCursor} to specify the stopping offset.
* @return this PulsarSourceBuilder.
@@ -334,7 +334,7 @@ public final class PulsarSourceBuilder<OUT> {
}
/**
- * By default the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
+ * By default, the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
* and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in
* {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link StopCursor}
* to specify the stopping offsets for each partition. When all the partitions have reached
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index 39a73974f5c..c80ddd3b1fa 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.config.CursorVerification;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -503,6 +504,12 @@ public final class PulsarSourceOptions {
code("PulsarClientException"))
.build());
+ /**
+ * @deprecated This option would be reset by {@link StartCursor}, no need to use it anymore.
+ * Pulsar didn't support this config option before 1.10.1, so we have to remove this config
+ * option.
+ */
+ @Deprecated
public static final ConfigOption<SubscriptionInitialPosition>
PULSAR_SUBSCRIPTION_INITIAL_POSITION =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionInitialPosition")
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
index 602a1577938..0a4dc31e8d3 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
@@ -59,7 +59,6 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REPLICATE_SUBSCRIPTION_STATE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_ENABLE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_LETTER_TOPIC;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_INITIAL_POSITION;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_MODE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
@@ -113,8 +112,6 @@ public final class PulsarSourceConfigUtils {
builder::consumerName);
configuration.useOption(PULSAR_READ_COMPACTED, builder::readCompacted);
configuration.useOption(PULSAR_PRIORITY_LEVEL, builder::priorityLevel);
- configuration.useOption(
- PULSAR_SUBSCRIPTION_INITIAL_POSITION, builder::subscriptionInitialPosition);
createDeadLetterPolicy(configuration).ifPresent(builder::deadLetterPolicy);
configuration.useOption(
PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS,
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
index 24e7ec0c9c0..f957e1a5775 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
@@ -83,6 +83,10 @@ public class SourceConfiguration extends PulsarConfiguration {
return messageQueueCapacity;
}
+ /**
+ * We would override the interval into a negative number when we set the connector with bounded
+ * stop cursor.
+ */
public boolean isEnablePartitionDiscovery() {
return getPartitionDiscoveryIntervalMs() > 0;
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java
index dbab9e21781..56bbbd20a32 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java
@@ -18,15 +18,18 @@
package org.apache.flink.connector.pulsar.source.enumerator;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* The state class for pulsar source enumerator, used for storing the split state. This class is
- * managed and controlled by {@link SplitsAssignmentState}.
+ * managed and controlled by {@link SplitAssigner}.
*/
public class PulsarSourceEnumState {
@@ -46,11 +49,12 @@ public class PulsarSourceEnumState {
private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
/**
- * A {@link PulsarPartitionSplit} should be assigned for all flink readers. Using this map for
- * recording assign status.
+ * It is used for Shared subscription. Every {@link PulsarPartitionSplit} should be assigned for
+ * all flink readers. Using this map for recording assign status.
*/
private final Map<Integer, Set<String>> readerAssignedSplits;
+ /** The pipeline has been triggered and topic partitions have been assigned to readers. */
private final boolean initialized;
public PulsarSourceEnumState(
@@ -85,4 +89,10 @@ public class PulsarSourceEnumState {
public boolean isInitialized() {
return initialized;
}
+
+ /** The initial assignment state for Pulsar. */
+ public static PulsarSourceEnumState initialState() {
+ return new PulsarSourceEnumState(
+ new HashSet<>(), new HashSet<>(), new HashMap<>(), new HashMap<>(), false);
+ }
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 7890dcf1847..a64e9272c70 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -22,40 +22,29 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.KeySharedPolicy;
-import org.apache.pulsar.client.api.KeySharedPolicy.KeySharedPolicySticky;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Range;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static java.util.Collections.singletonList;
import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
-import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
-import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
-import static org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH;
-import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
/** The enumerator class for pulsar source. */
@Internal
@@ -65,13 +54,12 @@ public class PulsarSourceEnumerator
private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceEnumerator.class);
private final PulsarAdmin pulsarAdmin;
- private final PulsarClient pulsarClient;
private final PulsarSubscriber subscriber;
private final StartCursor startCursor;
private final RangeGenerator rangeGenerator;
private final SourceConfiguration sourceConfiguration;
private final SplitEnumeratorContext<PulsarPartitionSplit> context;
- private final SplitsAssignmentState assignmentState;
+ private final SplitAssigner splitAssigner;
public PulsarSourceEnumerator(
PulsarSubscriber subscriber,
@@ -79,15 +67,14 @@ public class PulsarSourceEnumerator
RangeGenerator rangeGenerator,
SourceConfiguration sourceConfiguration,
SplitEnumeratorContext<PulsarPartitionSplit> context,
- SplitsAssignmentState assignmentState) {
+ SplitAssigner splitAssigner) {
this.pulsarAdmin = createAdmin(sourceConfiguration);
- this.pulsarClient = createClient(sourceConfiguration);
this.subscriber = subscriber;
this.startCursor = startCursor;
this.rangeGenerator = rangeGenerator;
this.sourceConfiguration = sourceConfiguration;
this.context = context;
- this.assignmentState = assignmentState;
+ this.splitAssigner = splitAssigner;
}
@Override
@@ -123,9 +110,9 @@ public class PulsarSourceEnumerator
@Override
public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
// Put the split back to current pending splits.
- assignmentState.putSplitsBackToPendingList(splits, subtaskId);
+ splitAssigner.addSplitsBack(splits, subtaskId);
- // If the failed subtask has already restarted, we need to assign pending splits to it
+ // If the failed subtask has already restarted, we need to assign pending splits to it.
if (context.registeredReaders().containsKey(subtaskId)) {
assignPendingPartitionSplits(singletonList(subtaskId));
}
@@ -142,7 +129,7 @@ public class PulsarSourceEnumerator
@Override
public PulsarSourceEnumState snapshotState(long checkpointId) {
- return assignmentState.snapshotState();
+ return splitAssigner.snapshotState();
}
@Override
@@ -164,54 +151,7 @@ public class PulsarSourceEnumerator
*/
private Set<TopicPartition> getSubscribedTopicPartitions() {
int parallelism = context.currentParallelism();
- Set<TopicPartition> partitions =
- subscriber.getSubscribedTopicPartitions(pulsarAdmin, rangeGenerator, parallelism);
-
- // Seek start position for given partitions.
- seekStartPosition(partitions);
-
- return partitions;
- }
-
- private void seekStartPosition(Set<TopicPartition> partitions) {
- ConsumerBuilder<byte[]> consumerBuilder = consumerBuilder();
- Set<String> seekedTopics = new HashSet<>();
-
- for (TopicPartition partition : partitions) {
- String topicName = partition.getFullTopicName();
- if (!assignmentState.containsTopic(topicName) && seekedTopics.add(topicName)) {
- try (Consumer<byte[]> consumer =
- sneakyClient(() -> consumerBuilder.clone().topic(topicName).subscribe())) {
- startCursor.seekPosition(
- partition.getTopic(), partition.getPartitionId(), consumer);
- } catch (PulsarClientException e) {
- if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) {
- throw new IllegalArgumentException(e);
- } else {
- // WARN_ON_MISMATCH would just print this warning message.
- // No need to print the stacktrace.
- LOG.warn(
- "Failed to set initial consuming position for partition {}",
- partition,
- e);
- }
- }
- }
- }
- }
-
- private ConsumerBuilder<byte[]> consumerBuilder() {
- ConsumerBuilder<byte[]> builder =
- createConsumerBuilder(pulsarClient, Schema.BYTES, sourceConfiguration);
- if (sourceConfiguration.getSubscriptionType() == SubscriptionType.Key_Shared) {
- Range range = TopicRange.createFullRange().toPulsarRange();
- KeySharedPolicySticky keySharedPolicy = KeySharedPolicy.stickyHashRange().ranges(range);
- // Force this consume use sticky hash range in Key_Shared subscription.
- // Pulsar won't remove old message dispatcher before 2.8.2 release.
- builder.keySharedPolicy(keySharedPolicy);
- }
-
- return builder;
+ return subscriber.getSubscribedTopicPartitions(pulsarAdmin, rangeGenerator, parallelism);
}
/**
@@ -230,13 +170,55 @@ public class PulsarSourceEnumerator
}
// Append the partitions into current assignment state.
- assignmentState.appendTopicPartitions(fetchedPartitions);
- List<Integer> registeredReaders = new ArrayList<>(context.registeredReaders().keySet());
+ List<TopicPartition> newPartitions =
+ splitAssigner.registerTopicPartitions(fetchedPartitions);
+ createSubscription(newPartitions);
// Assign the new readers.
+ List<Integer> registeredReaders = new ArrayList<>(context.registeredReaders().keySet());
assignPendingPartitionSplits(registeredReaders);
}
+ /** Create subscription on topic partition if it doesn't exist. */
+ private void createSubscription(List<TopicPartition> newPartitions) {
+ for (TopicPartition partition : newPartitions) {
+ String topicName = partition.getFullTopicName();
+ String subscriptionName = sourceConfiguration.getSubscriptionName();
+
+ List<String> subscriptions =
+ sneakyAdmin(() -> pulsarAdmin.topics().getSubscriptions(topicName));
+ if (!subscriptions.contains(subscriptionName)) {
+ CursorPosition position =
+ startCursor.position(partition.getTopic(), partition.getPartitionId());
+ MessageId initialPosition = queryInitialPosition(topicName, position);
+
+ sneakyAdmin(
+ () ->
+ pulsarAdmin
+ .topics()
+ .createSubscription(
+ topicName, subscriptionName, initialPosition));
+ }
+ }
+ }
+
+ /** Query the available message id from Pulsar. */
+ private MessageId queryInitialPosition(String topicName, CursorPosition position) {
+ CursorPosition.Type type = position.getType();
+ if (type == CursorPosition.Type.TIMESTAMP) {
+ return sneakyAdmin(
+ () ->
+ pulsarAdmin
+ .topics()
+ .getMessageIdByTimestamp(topicName, position.getTimestamp()));
+ } else if (type == CursorPosition.Type.MESSAGE_ID) {
+ return position.getMessageId();
+ } else {
+ throw new UnsupportedOperationException("We don't support this seek type " + type);
+ }
+ }
+
+ /** Query the unassigned splits and assign them to the available readers. */
private void assignPendingPartitionSplits(List<Integer> pendingReaders) {
// Validate the reader.
pendingReaders.forEach(
@@ -248,17 +230,19 @@ public class PulsarSourceEnumerator
});
// Assign splits to downstream readers.
- assignmentState.assignSplits(pendingReaders).ifPresent(context::assignSplits);
+ splitAssigner.createAssignment(pendingReaders).ifPresent(context::assignSplits);
// If periodically partition discovery is disabled and the initializing discovery has done,
- // signal NoMoreSplitsEvent to pending readers
- if (assignmentState.noMoreNewPartitionSplits()) {
- LOG.debug(
- "No more PulsarPartitionSplits to assign."
- + " Sending NoMoreSplitsEvent to reader {} in subscription {}.",
- pendingReaders,
- sourceConfiguration.getSubscriptionDesc());
- pendingReaders.forEach(this.context::signalNoMoreSplits);
+ // signal NoMoreSplitsEvent to pending readers.
+ for (Integer reader : pendingReaders) {
+ if (splitAssigner.noMoreSplits(reader)) {
+ LOG.debug(
+ "No more PulsarPartitionSplits to assign."
+ + " Sending NoMoreSplitsEvent to reader {} in subscription {}.",
+ reader,
+ sourceConfiguration.getSubscriptionDesc());
+ context.signalNoMoreSplits(reader);
+ }
}
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java
deleted file mode 100644
index cbc4826583a..00000000000
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-import org.apache.flink.util.InstantiationUtil;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-
-/** The state class for recording the split assignment. */
-@Internal
-public class SplitsAssignmentState {
-
- private final StopCursor stopCursor;
- private final SourceConfiguration sourceConfiguration;
-
- // The dynamic states for checkpoint.
- private final Set<TopicPartition> appendedPartitions;
- // This pending splits is used for Key_Shared, Failover, Exclusive subscription.
- private final Set<PulsarPartitionSplit> pendingPartitionSplits;
- // These two fields are used for Shared subscription.
- private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
- private final Map<Integer, Set<String>> readerAssignedSplits;
- private boolean initialized;
-
- public SplitsAssignmentState(StopCursor stopCursor, SourceConfiguration sourceConfiguration) {
- this.stopCursor = stopCursor;
- this.sourceConfiguration = sourceConfiguration;
- this.appendedPartitions = new HashSet<>();
- this.pendingPartitionSplits = new HashSet<>();
- this.sharedPendingPartitionSplits = new HashMap<>();
- this.readerAssignedSplits = new HashMap<>();
- this.initialized = false;
- }
-
- public SplitsAssignmentState(
- StopCursor stopCursor,
- SourceConfiguration sourceConfiguration,
- PulsarSourceEnumState sourceEnumState) {
- this.stopCursor = stopCursor;
- this.sourceConfiguration = sourceConfiguration;
- this.appendedPartitions = sourceEnumState.getAppendedPartitions();
- this.pendingPartitionSplits = sourceEnumState.getPendingPartitionSplits();
- this.sharedPendingPartitionSplits = sourceEnumState.getSharedPendingPartitionSplits();
- this.readerAssignedSplits = sourceEnumState.getReaderAssignedSplits();
- this.initialized = sourceEnumState.isInitialized();
- }
-
- public PulsarSourceEnumState snapshotState() {
- return new PulsarSourceEnumState(
- appendedPartitions,
- pendingPartitionSplits,
- sharedPendingPartitionSplits,
- readerAssignedSplits,
- initialized);
- }
-
- /**
- * Append the new fetched partitions to current state. We would generate pending source split
- * for downstream pulsar readers. Since the {@link SplitEnumeratorContext} don't support put the
- * split back to enumerator, we don't support partition deletion.
- *
- * @param fetchedPartitions The partitions from the {@link PulsarSubscriber}.
- */
- public void appendTopicPartitions(Set<TopicPartition> fetchedPartitions) {
- for (TopicPartition partition : fetchedPartitions) {
- // If this partition is a new partition.
- if (!appendedPartitions.contains(partition)) {
- if (!sharePartition()) {
- // Create a split and add it to pending list.
- pendingPartitionSplits.add(createSplit(partition));
- }
-
- // Shared subscription don't create splits, we just register partitions.
- appendedPartitions.add(partition);
- }
- }
-
- // Update this initialize flag.
- if (!initialized) {
- this.initialized = true;
- }
- }
-
- public boolean containsTopic(String topicName) {
- return appendedPartitions.stream()
- .anyMatch(partition -> Objects.equals(partition.getFullTopicName(), topicName));
- }
-
- /** Put these splits back to pending list. */
- public void putSplitsBackToPendingList(List<PulsarPartitionSplit> splits, int readerId) {
- if (!sharePartition()) {
- // Put these splits back to normal pending list.
- pendingPartitionSplits.addAll(splits);
- } else {
- // Put the splits back to shared pending list.
- Set<PulsarPartitionSplit> pending =
- sharedPendingPartitionSplits.computeIfAbsent(readerId, id -> new HashSet<>());
- pending.addAll(splits);
- }
- }
-
- public Optional<SplitsAssignment<PulsarPartitionSplit>> assignSplits(
- List<Integer> pendingReaders) {
- // Avoid empty readers assign.
- if (pendingReaders.isEmpty()) {
- return Optional.empty();
- }
-
- Map<Integer, List<PulsarPartitionSplit>> assignMap;
-
- // We extract the assign logic into two method for better readability.
- if (!sharePartition()) {
- assignMap = assignNormalSplits(pendingReaders);
- } else {
- assignMap = assignSharedSplits(pendingReaders);
- }
-
- if (assignMap.isEmpty()) {
- return Optional.empty();
- } else {
- return Optional.of(new SplitsAssignment<>(assignMap));
- }
- }
-
- /**
- * @return It would return true only if periodically partition discovery is disabled, the
- * initializing partition discovery has finished AND there is no pending splits for
- * assignment.
- */
- public boolean noMoreNewPartitionSplits() {
- return !sourceConfiguration.isEnablePartitionDiscovery()
- && initialized
- && pendingPartitionSplits.isEmpty();
- }
-
- // ----------------- private methods -------------------
-
- /** The splits don't shared for all the readers. */
- private Map<Integer, List<PulsarPartitionSplit>> assignNormalSplits(
- List<Integer> pendingReaders) {
- Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
-
- // Drain a list of splits.
- List<PulsarPartitionSplit> pendingSplits = drainPendingPartitionsSplits();
- for (int i = 0; i < pendingSplits.size(); i++) {
- PulsarPartitionSplit split = pendingSplits.get(i);
- int readerId = pendingReaders.get(i % pendingReaders.size());
- assignMap.computeIfAbsent(readerId, id -> new ArrayList<>()).add(split);
- }
-
- return assignMap;
- }
-
- /** Every split would be shared among available readers. */
- private Map<Integer, List<PulsarPartitionSplit>> assignSharedSplits(
- List<Integer> pendingReaders) {
- Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
-
- // Drain the splits from share pending list.
- for (Integer reader : pendingReaders) {
- Set<PulsarPartitionSplit> pendingSplits = sharedPendingPartitionSplits.remove(reader);
- if (pendingSplits == null) {
- pendingSplits = new HashSet<>();
- }
-
- Set<String> assignedSplits =
- readerAssignedSplits.computeIfAbsent(reader, r -> new HashSet<>());
-
- for (TopicPartition partition : appendedPartitions) {
- String partitionName = partition.toString();
- if (!assignedSplits.contains(partitionName)) {
- pendingSplits.add(createSplit(partition));
- assignedSplits.add(partitionName);
- }
- }
-
- if (!pendingSplits.isEmpty()) {
- assignMap.put(reader, new ArrayList<>(pendingSplits));
- }
- }
-
- return assignMap;
- }
-
- private PulsarPartitionSplit createSplit(TopicPartition partition) {
- try {
- StopCursor stop = InstantiationUtil.clone(stopCursor);
- return new PulsarPartitionSplit(partition, stop);
- } catch (IOException | ClassNotFoundException e) {
- throw new IllegalStateException(e);
- }
- }
-
- private List<PulsarPartitionSplit> drainPendingPartitionsSplits() {
- List<PulsarPartitionSplit> splits = new ArrayList<>(pendingPartitionSplits);
- pendingPartitionSplits.clear();
-
- return splits;
- }
-
- /** {@link SubscriptionType#Shared} mode should share a same split for all the readers. */
- private boolean sharePartition() {
- return sourceConfiguration.getSubscriptionType() == SubscriptionType.Shared;
- }
-}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
new file mode 100644
index 00000000000..087e96157d6
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This assigner is used for {@link SubscriptionType#Failover}, {@link SubscriptionType#Exclusive}
+ * and {@link SubscriptionType#Key_Shared} subscriptions.
+ */
+@Internal
+public class NonSharedSplitAssigner implements SplitAssigner {
+ private static final long serialVersionUID = 8412586087991597092L;
+
+ private final StopCursor stopCursor;
+ private final boolean enablePartitionDiscovery;
+
+ // These fields would be saved into checkpoint.
+
+ private final Set<TopicPartition> appendedPartitions;
+ private final Set<PulsarPartitionSplit> pendingPartitionSplits;
+ private boolean initialized;
+
+ public NonSharedSplitAssigner(
+ StopCursor stopCursor,
+ SourceConfiguration sourceConfiguration,
+ PulsarSourceEnumState sourceEnumState) {
+ this.stopCursor = stopCursor;
+ this.enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery();
+ this.appendedPartitions = sourceEnumState.getAppendedPartitions();
+ this.pendingPartitionSplits = sourceEnumState.getPendingPartitionSplits();
+ this.initialized = sourceEnumState.isInitialized();
+ }
+
+ @Override
+ public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
+ List<TopicPartition> newPartitions = new ArrayList<>();
+
+ for (TopicPartition partition : fetchedPartitions) {
+ if (!appendedPartitions.contains(partition)) {
+ pendingPartitionSplits.add(new PulsarPartitionSplit(partition, stopCursor));
+ appendedPartitions.add(partition);
+ newPartitions.add(partition);
+ }
+ }
+
+ if (!initialized) {
+ initialized = true;
+ }
+
+ return newPartitions;
+ }
+
+ @Override
+ public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
+ pendingPartitionSplits.addAll(splits);
+ }
+
+ @Override
+ public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
+ List<Integer> readers) {
+ if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) {
+ return Optional.empty();
+ }
+
+ Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
+
+ List<PulsarPartitionSplit> partitionSplits = new ArrayList<>(pendingPartitionSplits);
+ int readerCount = readers.size();
+ for (int i = 0; i < partitionSplits.size(); i++) {
+ int index = i % readerCount;
+ Integer readerId = readers.get(index);
+ PulsarPartitionSplit split = partitionSplits.get(i);
+ assignMap.computeIfAbsent(readerId, id -> new ArrayList<>()).add(split);
+ }
+ pendingPartitionSplits.clear();
+
+ return Optional.of(new SplitsAssignment<>(assignMap));
+ }
+
+ @Override
+ public boolean noMoreSplits(Integer reader) {
+ return !enablePartitionDiscovery && initialized && pendingPartitionSplits.isEmpty();
+ }
+
+ @Override
+ public PulsarSourceEnumState snapshotState() {
+ return new PulsarSourceEnumState(
+ appendedPartitions,
+ pendingPartitionSplits,
+ new HashMap<>(),
+ new HashMap<>(),
+ initialized);
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java
new file mode 100644
index 00000000000..48d75c8dee3
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** This assigner is used for {@link SubscriptionType#Shared} subscriptions. */
+@Internal
+public class SharedSplitAssigner implements SplitAssigner {
+ private static final long serialVersionUID = 8468503133499402491L;
+
+ private final StopCursor stopCursor;
+ private final boolean enablePartitionDiscovery;
+
+ // These fields would be saved into checkpoint.
+
+ private final Set<TopicPartition> appendedPartitions;
+ private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
+ private final Map<Integer, Set<String>> readerAssignedSplits;
+ private boolean initialized;
+
+ public SharedSplitAssigner(
+ StopCursor stopCursor,
+ SourceConfiguration sourceConfiguration,
+ PulsarSourceEnumState sourceEnumState) {
+ this.stopCursor = stopCursor;
+ this.enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery();
+ this.appendedPartitions = sourceEnumState.getAppendedPartitions();
+ this.sharedPendingPartitionSplits = sourceEnumState.getSharedPendingPartitionSplits();
+ this.readerAssignedSplits = sourceEnumState.getReaderAssignedSplits();
+ this.initialized = sourceEnumState.isInitialized();
+ }
+
+ @Override
+ public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
+ List<TopicPartition> newPartitions = new ArrayList<>();
+
+ for (TopicPartition partition : fetchedPartitions) {
+ if (!appendedPartitions.contains(partition)) {
+ appendedPartitions.add(partition);
+ newPartitions.add(partition);
+ }
+ }
+
+ if (!initialized) {
+ initialized = true;
+ }
+
+ return newPartitions;
+ }
+
+ @Override
+ public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
+ Set<PulsarPartitionSplit> pendingPartitionSplits =
+ sharedPendingPartitionSplits.computeIfAbsent(subtaskId, id -> new HashSet<>());
+ pendingPartitionSplits.addAll(splits);
+ }
+
+ @Override
+ public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
+ List<Integer> readers) {
+ if (readers.isEmpty()) {
+ return Optional.empty();
+ }
+
+ Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
+ for (Integer reader : readers) {
+ Set<PulsarPartitionSplit> pendingSplits = sharedPendingPartitionSplits.remove(reader);
+ if (pendingSplits == null) {
+ pendingSplits = new HashSet<>();
+ }
+
+ Set<String> assignedSplits =
+ readerAssignedSplits.computeIfAbsent(reader, r -> new HashSet<>());
+
+ for (TopicPartition partition : appendedPartitions) {
+ String partitionName = partition.toString();
+ if (!assignedSplits.contains(partitionName)) {
+ pendingSplits.add(new PulsarPartitionSplit(partition, stopCursor));
+ assignedSplits.add(partitionName);
+ }
+ }
+
+ if (!pendingSplits.isEmpty()) {
+ assignMap.put(reader, new ArrayList<>(pendingSplits));
+ }
+ }
+
+ if (assignMap.isEmpty()) {
+ return Optional.empty();
+ } else {
+ return Optional.of(new SplitsAssignment<>(assignMap));
+ }
+ }
+
+ @Override
+ public boolean noMoreSplits(Integer reader) {
+ Set<PulsarPartitionSplit> pendingSplits = sharedPendingPartitionSplits.get(reader);
+ Set<String> assignedSplits = readerAssignedSplits.get(reader);
+
+ return !enablePartitionDiscovery
+ && initialized
+ && (pendingSplits == null || pendingSplits.isEmpty())
+ && (assignedSplits != null && assignedSplits.size() == appendedPartitions.size());
+ }
+
+ @Override
+ public PulsarSourceEnumState snapshotState() {
+ return new PulsarSourceEnumState(
+ appendedPartitions,
+ new HashSet<>(),
+ sharedPendingPartitionSplits,
+ readerAssignedSplits,
+ initialized);
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
new file mode 100644
index 00000000000..bc03f5103fd
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * The split assigner for different subscription. We would spread all the splits to different
+ * readers and store all the state into checkpoint.
+ */
+@Internal
+public interface SplitAssigner extends Serializable {
+
+ /**
+ * Add the current available partitions into assigner.
+ *
+ * @param fetchedPartitions The available partitions queried from Pulsar broker.
+ * @return New topic partitions compare to previous registered partitions.
+ */
+ List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions);
+
+ /**
+ * Add a split back to the split assigner if the reader fails. We would try to reassign the
+ * split or add it to the pending list.
+ */
+ void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId);
+
+ /** Create a split assignment from the current readers. */
+ Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(List<Integer> readers);
+
+ /**
+ * It would return true only if periodically partition discovery is disabled, the initializing
+ * partition discovery has finished AND there is no pending splits for assignment.
+ */
+ boolean noMoreSplits(Integer reader);
+
+ /** Snapshot the current assign state into checkpoint. */
+ PulsarSourceEnumState snapshotState();
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
new file mode 100644
index 00000000000..3e6ebccb49b
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
+import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import static org.apache.pulsar.client.api.SubscriptionType.Failover;
+import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
+
+/** The factory for creating split assigner. */
+@Internal
+public final class SplitAssignerFactory {
+
+ private SplitAssignerFactory() {
+ // No public constructor.
+ }
+
+ /** Create blank assigner. */
+ public static SplitAssigner create(
+ StopCursor stopCursor, SourceConfiguration sourceConfiguration) {
+ return create(stopCursor, sourceConfiguration, initialState());
+ }
+
+ /** Create assigner from checkpoint state. */
+ public static SplitAssigner create(
+ StopCursor stopCursor,
+ SourceConfiguration sourceConfiguration,
+ PulsarSourceEnumState sourceEnumState) {
+ SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType();
+ if (subscriptionType == Exclusive
+ || subscriptionType == Failover
+ || subscriptionType == Key_Shared) {
+ return new NonSharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+ } else if (subscriptionType == Shared) {
+ return new SharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+ } else {
+ throw new IllegalArgumentException(
+ "We don't support this subscription type: " + subscriptionType);
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
index a2aaff62906..c965ff962f8 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
@@ -18,18 +18,16 @@
package org.apache.flink.connector.pulsar.source.enumerator.cursor;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-import javax.annotation.Nullable;
import java.io.Serializable;
-/** The class for defining the start or stop position. */
+/**
+ * The class for defining the start or stop position. We only expose the constructor for end user.
+ */
@PublicEvolving
public final class CursorPosition implements Serializable {
private static final long serialVersionUID = -802405183307684549L;
@@ -40,34 +38,31 @@ public final class CursorPosition implements Serializable {
private final Long timestamp;
- public CursorPosition(@Nullable MessageId messageId) {
+ public CursorPosition(MessageId messageId) {
this.type = Type.MESSAGE_ID;
this.messageId = messageId;
this.timestamp = null;
}
- public CursorPosition(@Nullable Long timestamp) {
+ public CursorPosition(Long timestamp) {
this.type = Type.TIMESTAMP;
this.messageId = null;
this.timestamp = timestamp;
}
- @VisibleForTesting
+ @Internal
+ public Type getType() {
+ return type;
+ }
+
+ @Internal
public MessageId getMessageId() {
return messageId;
}
- /** Pulsar consumer could be subscribed by the position. */
- public void seekPosition(Consumer<?> consumer) throws PulsarClientException {
- if (type == Type.MESSAGE_ID) {
- consumer.seek(messageId);
- } else {
- if (timestamp != null) {
- consumer.seek(timestamp);
- } else {
- consumer.seek(System.currentTimeMillis());
- }
- }
+ @Internal
+ public Long getTimestamp() {
+ return timestamp;
}
@Override
@@ -82,6 +77,7 @@ public final class CursorPosition implements Serializable {
/**
* The position type for reader to choose whether timestamp or message id as the start position.
*/
+ @Internal
public enum Type {
TIMESTAMP,
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java
new file mode 100644
index 00000000000..a8c3a6b2ef2
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.cursor;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The helper class for Pulsar's message id. */
+@Internal
+public final class MessageIdUtils {
+
+ private MessageIdUtils() {
+ // No public constructor.
+ }
+
+ /**
+ * The implementation from <a
+ * href="https://github.com/apache/pulsar/blob/7c8dc3201baad7d02d886dbc26db5c03abce77d6/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java#L85">this
+ * code snippet</a> to get next message id.
+ */
+ public static MessageId nextMessageId(MessageId messageId) {
+ MessageIdImpl idImpl = unwrapMessageId(messageId);
+
+ if (idImpl.getEntryId() < 0) {
+ return newMessageId(idImpl.getLedgerId(), 0, idImpl.getPartitionIndex());
+ } else {
+ return newMessageId(
+ idImpl.getLedgerId(), idImpl.getEntryId() + 1, idImpl.getPartitionIndex());
+ }
+ }
+
+ /**
+ * Convert the message id interface to its backend implementation. And check if it's a batch
+ * message id. We don't support the batch message for its low performance now.
+ */
+ public static MessageIdImpl unwrapMessageId(MessageId messageId) {
+ MessageIdImpl idImpl = MessageIdImpl.convertToMessageIdImpl(messageId);
+ if (idImpl instanceof BatchMessageIdImpl) {
+ int batchSize = ((BatchMessageIdImpl) idImpl).getBatchSize();
+ checkArgument(batchSize == 1, "We only support normal message id currently.");
+ }
+
+ return idImpl;
+ }
+
+ /** Hide the message id implementation. */
+ public static MessageId newMessageId(long ledgerId, long entryId, int partitionIndex) {
+ return new MessageIdImpl(ledgerId, entryId, partitionIndex);
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java
index af35319a5a7..9c1d699a269 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java
@@ -22,9 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.start.MessageIdStartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.start.TimestampStartCursor;
-import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.io.Serializable;
@@ -43,13 +41,6 @@ public interface StartCursor extends Serializable {
CursorPosition position(String topic, int partitionId);
- /** Helper method for seek the right position for given pulsar consumer. */
- default void seekPosition(String topic, int partitionId, Consumer<?> consumer)
- throws PulsarClientException {
- CursorPosition position = position(topic, partitionId);
- position.seekPosition(consumer);
- }
-
// --------------------------- Static Factory Methods -----------------------------
static StartCursor defaultStartCursor() {
@@ -64,19 +55,38 @@ public interface StartCursor extends Serializable {
return fromMessageId(MessageId.latest);
}
+ /**
+ * Find the available message id and start consuming from it. The given message is included in
+ * the consuming result by default if you provide a specified message id instead of {@link
+ * MessageId#earliest} or {@link MessageId#latest}.
+ */
static StartCursor fromMessageId(MessageId messageId) {
return fromMessageId(messageId, true);
}
/**
* @param messageId Find the available message id and start consuming from it.
- * @param inclusive {@code true} would include the given message id.
+ * @param inclusive {@code true} would include the given message id if it's not the {@link
+ * MessageId#earliest} or {@link MessageId#latest}.
*/
static StartCursor fromMessageId(MessageId messageId, boolean inclusive) {
return new MessageIdStartCursor(messageId, inclusive);
}
+ /**
+ * This method is designed for seeking message from event time. But Pulsar didn't support
+ * seeking from message time, instead, it would seek the position from publish time. We only
+ * keep this method for backward compatible.
+ *
+ * @deprecated Use {@link #fromPublishTime(long)} instead.
+ */
+ @Deprecated
static StartCursor fromMessageTime(long timestamp) {
- return new TimestampStartCursor(timestamp);
+ return new TimestampStartCursor(timestamp, true);
+ }
+
+ /** Seek the start position by using message publish time. */
+ static StartCursor fromPublishTime(long timestamp) {
+ return new TimestampStartCursor(timestamp, true);
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
index 0bf46ce1282..d44c78fcf1a 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
@@ -44,11 +44,55 @@ public interface StopCursor extends Serializable {
/** The open method for the cursor initializer. This method could be executed multiple times. */
default void open(PulsarAdmin admin, TopicPartition partition) {}
- /**
- * Determine whether to pause consumption on the current message by the returned boolean value.
- * The message presented in method argument wouldn't be consumed if the return result is true.
- */
- boolean shouldStop(Message<?> message);
+ /** Determine whether to pause consumption on the current message by the returned enum. */
+ StopCondition shouldStop(Message<?> message);
+
+ /** The conditional for control the stop behavior of the pulsar source. */
+ @PublicEvolving
+ enum StopCondition {
+
+ /** This message should be included in the result. */
+ CONTINUE,
+ /** This message should be included in the result and stop consuming. */
+ EXACTLY,
+ /** Stop consuming, the given message wouldn't be included in the result. */
+ TERMINATE;
+
+ /**
+ * Common methods for comparing the message id.
+ *
+ * @param desired The stop goal of the message id.
+ * @param current The upcoming message id.
+ * @param inclusive Should the desired message be included in the consuming result.
+ */
+ public static StopCondition compare(
+ MessageId desired, MessageId current, boolean inclusive) {
+ if (current.compareTo(desired) < 0) {
+ return StopCondition.CONTINUE;
+ } else if (current.compareTo(desired) == 0) {
+ return inclusive ? StopCondition.EXACTLY : StopCondition.TERMINATE;
+ } else {
+ return StopCondition.TERMINATE;
+ }
+ }
+
+ /**
+ * Common methods for comparing the message time.
+ *
+ * @param desired The stop goal of the message time.
+ * @param current The upcoming message time.
+ * @param inclusive Should the desired message be included in the consuming result.
+ */
+ public static StopCondition compare(long desired, long current, boolean inclusive) {
+ if (current < desired) {
+ return StopCondition.CONTINUE;
+ } else if (current == desired) {
+ return inclusive ? StopCondition.EXACTLY : StopCondition.TERMINATE;
+ } else {
+ return StopCondition.TERMINATE;
+ }
+ }
+ }
// --------------------------- Static Factory Methods -----------------------------
@@ -61,32 +105,52 @@ public interface StopCursor extends Serializable {
}
static StopCursor latest() {
- return new LatestMessageStopCursor();
+ return new LatestMessageStopCursor(true);
}
/**
- * Stop when the messageId is equal or greater than the specified messageId. Message that is
- * equal to the specified messageId will not be consumed.
+ * Stop consuming when the messageId is equal or greater than the specified messageId. Message
+ * that is equal to the specified messageId will not be consumed.
*/
static StopCursor atMessageId(MessageId messageId) {
- return new MessageIdStopCursor(messageId);
+ if (MessageId.latest.equals(messageId)) {
+ return new LatestMessageStopCursor(false);
+ } else {
+ return new MessageIdStopCursor(messageId, false);
+ }
}
/**
- * Stop when the messageId is greater than the specified messageId. Message that is equal to the
- * specified messageId will be consumed.
+ * Stop consuming when the messageId is greater than the specified messageId. Message that is
+ * equal to the specified messageId will be consumed.
*/
static StopCursor afterMessageId(MessageId messageId) {
- return new MessageIdStopCursor(messageId, false);
+ if (MessageId.latest.equals(messageId)) {
+ return new LatestMessageStopCursor(true);
+ } else {
+ return new MessageIdStopCursor(messageId, true);
+ }
}
- @Deprecated
+ /** Stop consuming when message eventTime is greater than or equals the specified timestamp. */
static StopCursor atEventTime(long timestamp) {
- return new EventTimestampStopCursor(timestamp);
+ return new EventTimestampStopCursor(timestamp, false);
}
- /** Stop when message publishTime is greater than the specified timestamp. */
+ /** Stop consuming when message eventTime is greater than the specified timestamp. */
+ static StopCursor afterEventTime(long timestamp) {
+ return new EventTimestampStopCursor(timestamp, true);
+ }
+
+ /**
+ * Stop consuming when message publishTime is greater than or equals the specified timestamp.
+ */
static StopCursor atPublishTime(long timestamp) {
- return new PublishTimestampStopCursor(timestamp);
+ return new PublishTimestampStopCursor(timestamp, false);
+ }
+
+ /** Stop consuming when message publishTime is greater than the specified timestamp. */
+ static StopCursor afterPublishTime(long timestamp) {
+ return new PublishTimestampStopCursor(timestamp, true);
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
index 71a4eb6a026..03d5ba3e133 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
@@ -25,7 +25,8 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId;
/** This cursor would leave pulsar start consuming from a specific message id. */
public class MessageIdStartCursor implements StartCursor {
@@ -43,23 +44,16 @@ public class MessageIdStartCursor implements StartCursor {
* code</a> for understanding pulsar internal logic.
*
* @param messageId The message id for start position.
- * @param inclusive Should we include the start message id in consuming result.
+ * @param inclusive Whether we include the start message id in consuming result. This works only
+ * if we provide a specified message id instead of {@link MessageId#earliest} or {@link
+ * MessageId#latest}.
*/
public MessageIdStartCursor(MessageId messageId, boolean inclusive) {
- if (inclusive) {
- this.messageId = messageId;
+ MessageIdImpl idImpl = unwrapMessageId(messageId);
+ if (MessageId.earliest.equals(idImpl) || MessageId.latest.equals(idImpl) || inclusive) {
+ this.messageId = idImpl;
} else {
- checkState(
- messageId instanceof MessageIdImpl,
- "We only support normal message id and batch message id.");
- MessageIdImpl id = (MessageIdImpl) messageId;
- if (MessageId.earliest.equals(messageId) || MessageId.latest.equals(messageId)) {
- this.messageId = messageId;
- } else {
- this.messageId =
- new MessageIdImpl(
- id.getLedgerId(), id.getEntryId() + 1, id.getPartitionIndex());
- }
+ this.messageId = nextMessageId(idImpl);
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
index eb4ea32ebb6..da51a58e943 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
@@ -27,8 +27,8 @@ public class TimestampStartCursor implements StartCursor {
private final long timestamp;
- public TimestampStartCursor(long timestamp) {
- this.timestamp = timestamp;
+ public TimestampStartCursor(long timestamp, boolean inclusive) {
+ this.timestamp = inclusive ? timestamp : timestamp + 1;
}
@Override
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
index e425545de44..d2a44ea362d 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
@@ -27,13 +27,16 @@ public class EventTimestampStopCursor implements StopCursor {
private static final long serialVersionUID = 2391576769339369027L;
private final long timestamp;
+ private final boolean inclusive;
- public EventTimestampStopCursor(long timestamp) {
+ public EventTimestampStopCursor(long timestamp, boolean inclusive) {
this.timestamp = timestamp;
+ this.inclusive = inclusive;
}
@Override
- public boolean shouldStop(Message<?> message) {
- return message.getEventTime() >= timestamp;
+ public StopCondition shouldStop(Message<?> message) {
+ long eventTime = message.getEventTime();
+ return StopCondition.compare(timestamp, eventTime, inclusive);
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
index 257081f5c89..0de963ed4fb 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
@@ -30,24 +30,29 @@ import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtil
/**
* A stop cursor that initialize the position to the latest message id. The offsets initialization
* are taken care of by the {@code PulsarPartitionSplitReaderBase} instead of by the {@code
- * PulsarSourceEnumerator}.
+ * PulsarSourceEnumerator}. We would include the latest message available in Pulsar by default.
*/
public class LatestMessageStopCursor implements StopCursor {
private static final long serialVersionUID = 1702059838323965723L;
private MessageId messageId;
+ private final boolean inclusive;
+
+ public LatestMessageStopCursor(boolean inclusive) {
+ this.inclusive = inclusive;
+ }
+
+ @Override
+ public StopCondition shouldStop(Message<?> message) {
+ MessageId current = message.getMessageId();
+ return StopCondition.compare(messageId, current, inclusive);
+ }
@Override
public void open(PulsarAdmin admin, TopicPartition partition) {
if (messageId == null) {
String topic = partition.getFullTopicName();
- messageId = sneakyAdmin(() -> admin.topics().getLastMessageId(topic));
+ this.messageId = sneakyAdmin(() -> admin.topics().getLastMessageId(topic));
}
}
-
- @Override
- public boolean shouldStop(Message<?> message) {
- MessageId id = message.getMessageId();
- return id.compareTo(messageId) >= 0;
- }
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java
index 7af55a00cc0..03d83aa4495 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java
@@ -22,6 +22,12 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.pulsar.client.api.MessageId.earliest;
+import static org.apache.pulsar.client.api.MessageId.latest;
/**
* Stop consuming message at a given message id. We use the {@link MessageId#compareTo(Object)} for
@@ -32,24 +38,22 @@ public class MessageIdStopCursor implements StopCursor {
private final MessageId messageId;
- private final boolean exclusive;
+ private final boolean inclusive;
- public MessageIdStopCursor(MessageId messageId) {
- this(messageId, true);
- }
+ public MessageIdStopCursor(MessageId messageId, boolean inclusive) {
+ MessageIdImpl idImpl = unwrapMessageId(messageId);
+ checkArgument(!earliest.equals(idImpl), "MessageId.earliest is not supported.");
+ checkArgument(
+ !latest.equals(idImpl),
+ "MessageId.latest is not supported, use LatestMessageStopCursor instead.");
- public MessageIdStopCursor(MessageId messageId, boolean exclusive) {
- this.messageId = messageId;
- this.exclusive = exclusive;
+ this.messageId = idImpl;
+ this.inclusive = inclusive;
}
@Override
- public boolean shouldStop(Message<?> message) {
- MessageId id = message.getMessageId();
- if (exclusive) {
- return id.compareTo(messageId) > 0;
- } else {
- return id.compareTo(messageId) >= 0;
- }
+ public StopCondition shouldStop(Message<?> message) {
+ MessageId current = message.getMessageId();
+ return StopCondition.compare(messageId, current, inclusive);
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java
index ff2c619afb8..3eb035634ae 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java
@@ -27,7 +27,7 @@ public class NeverStopCursor implements StopCursor {
private static final long serialVersionUID = -3113601090292771786L;
@Override
- public boolean shouldStop(Message<?> message) {
- return false;
+ public StopCondition shouldStop(Message<?> message) {
+ return StopCondition.CONTINUE;
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
index b598e7addd4..2dfdd765842 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
@@ -27,13 +27,16 @@ public class PublishTimestampStopCursor implements StopCursor {
private static final long serialVersionUID = 4386276745339324527L;
private final long timestamp;
+ private final boolean inclusive;
- public PublishTimestampStopCursor(long timestamp) {
+ public PublishTimestampStopCursor(long timestamp, boolean inclusive) {
this.timestamp = timestamp;
+ this.inclusive = inclusive;
}
@Override
- public boolean shouldStop(Message<?> message) {
- return message.getPublishTime() >= timestamp;
+ public StopCondition shouldStop(Message<?> message) {
+ long publishTime = message.getPublishTime();
+ return StopCondition.compare(timestamp, publishTime, inclusive);
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
index 08ba1faa442..b8a55bf8a34 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
@@ -48,7 +48,7 @@ public interface PulsarSubscriber extends Serializable {
/**
* Get a set of subscribed {@link TopicPartition}s. The method could throw {@link
- * IllegalStateException}, a extra try catch is required.
+ * IllegalStateException}, an extra try catch is required.
*
* @param pulsarAdmin The admin interface used to retrieve subscribed topic partitions.
* @param rangeGenerator The range for different partitions.
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
index 6a5d5152231..820888a7bf3 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
@@ -77,7 +77,7 @@ public final class PulsarSourceReaderFactory {
SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType();
if (subscriptionType == SubscriptionType.Failover
|| subscriptionType == SubscriptionType.Exclusive) {
- // Create a ordered split reader supplier.
+ // Create an ordered split reader supplier.
Supplier<PulsarOrderedPartitionSplitReader<OUT>> splitReaderSupplier =
() ->
new PulsarOrderedPartitionSplitReader<>(
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
index bb6d79641f5..316431aa75e 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
@@ -20,14 +20,13 @@ package org.apache.flink.connector.pulsar.source.reader.split;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
@@ -36,10 +35,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
import static org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId;
/**
* The split reader a given {@link PulsarPartitionSplit}, it would be closed once the {@link
@@ -75,18 +76,39 @@ public class PulsarOrderedPartitionSplitReader<OUT> extends PulsarPartitionSplit
}
@Override
- protected void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
+ protected void beforeCreatingConsumer(PulsarPartitionSplit split) {
MessageId latestConsumedId = split.getLatestConsumedId();
// Reset the start position for ordered pulsar consumer.
if (latestConsumedId != null) {
- StartCursor startCursor = StartCursor.fromMessageId(latestConsumedId, false);
- TopicPartition partition = split.getPartition();
-
+ LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
try {
- startCursor.seekPosition(
- partition.getTopic(), partition.getPartitionId(), consumer);
- } catch (PulsarClientException e) {
+ MessageId initialPosition;
+ if (latestConsumedId == MessageId.latest
+ || latestConsumedId == MessageId.earliest) {
+ // for compatibility
+ initialPosition = latestConsumedId;
+ } else {
+ initialPosition = nextMessageId(latestConsumedId);
+ }
+
+ // Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0
+ // See https://github.com/apache/pulsar/issues/16757 for more details.
+
+ String topicName = split.getPartition().getFullTopicName();
+ List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topicName);
+ String subscriptionName = sourceConfiguration.getSubscriptionName();
+
+ if (!subscriptions.contains(subscriptionName)) {
+ // If this subscription is not available. Just create it.
+ pulsarAdmin
+ .topics()
+ .createSubscription(topicName, subscriptionName, initialPosition);
+ } else {
+ // Reset the subscription if this is existed.
+ pulsarAdmin.topics().resetCursor(topicName, subscriptionName, initialPosition);
+ }
+ } catch (PulsarAdminException e) {
if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) {
throw new IllegalArgumentException(e);
} else {
@@ -95,7 +117,7 @@ public class PulsarOrderedPartitionSplitReader<OUT> extends PulsarPartitionSplit
LOG.warn(
"Failed to reset cursor to {} on partition {}",
latestConsumedId,
- partition,
+ split.getPartition(),
e);
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
index 37b5630a8d1..2d4214262c8 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.StopCondition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
@@ -52,7 +53,6 @@ import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
@@ -70,7 +70,6 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
protected final PulsarAdmin pulsarAdmin;
protected final SourceConfiguration sourceConfiguration;
protected final PulsarDeserializationSchema<OUT> deserializationSchema;
- protected final AtomicBoolean wakeup;
protected Consumer<byte[]> pulsarConsumer;
protected PulsarPartitionSplit registeredSplit;
@@ -84,7 +83,6 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
this.pulsarAdmin = pulsarAdmin;
this.sourceConfiguration = sourceConfiguration;
this.deserializationSchema = deserializationSchema;
- this.wakeup = new AtomicBoolean(false);
}
@Override
@@ -96,9 +94,6 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
return builder.build();
}
- // Set wakeup to false for start consuming.
- wakeup.compareAndSet(true, false);
-
StopCursor stopCursor = registeredSplit.getStopCursor();
String splitId = registeredSplit.splitId();
PulsarMessageCollector<OUT> collector = new PulsarMessageCollector<>(splitId, builder);
@@ -106,9 +101,7 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
// Consume message from pulsar until it was woke up by flink reader.
for (int messageNum = 0;
- messageNum < sourceConfiguration.getMaxFetchRecords()
- && deadline.hasTimeLeft()
- && isNotWakeup();
+ messageNum < sourceConfiguration.getMaxFetchRecords() && deadline.hasTimeLeft();
messageNum++) {
try {
Duration timeout = deadline.timeLeftIfAny();
@@ -117,14 +110,18 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
break;
}
- // Deserialize message.
- collector.setMessage(message);
- deserializationSchema.deserialize(message, collector);
+ StopCondition condition = stopCursor.shouldStop(message);
+
+ if (condition == StopCondition.CONTINUE || condition == StopCondition.EXACTLY) {
+ // Deserialize message.
+ collector.setMessage(message);
+ deserializationSchema.deserialize(message, collector);
- // Acknowledge message if need.
- finishedPollMessage(message);
+ // Acknowledge message if need.
+ finishedPollMessage(message);
+ }
- if (stopCursor.shouldStop(message)) {
+ if (condition == StopCondition.EXACTLY || condition == StopCondition.TERMINATE) {
builder.addFinishedSplit(splitId);
break;
}
@@ -165,23 +162,27 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
newSplits.size() == 1, "This pulsar split reader only support one split.");
PulsarPartitionSplit newSplit = newSplits.get(0);
+ // Open stop cursor.
+ newSplit.open(pulsarAdmin);
+
+ // Before creating the consumer.
+ beforeCreatingConsumer(newSplit);
+
// Create pulsar consumer.
Consumer<byte[]> consumer = createPulsarConsumer(newSplit);
- // Open start & stop cursor.
- newSplit.open(pulsarAdmin);
-
- // Start Consumer.
- startConsumer(newSplit, consumer);
+ // After creating the consumer.
+ afterCreatingConsumer(newSplit, consumer);
LOG.info("Register split {} consumer for current reader.", newSplit);
+
this.registeredSplit = newSplit;
this.pulsarConsumer = consumer;
}
@Override
public void wakeUp() {
- wakeup.compareAndSet(false, true);
+ // Nothing to do on this method.
}
@Override
@@ -197,14 +198,16 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
protected abstract void finishedPollMessage(Message<byte[]> message);
- protected abstract void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer);
-
- // --------------------------- Helper Methods -----------------------------
+ protected void beforeCreatingConsumer(PulsarPartitionSplit split) {
+ // Nothing to do by default.
+ }
- protected boolean isNotWakeup() {
- return !wakeup.get();
+ protected void afterCreatingConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
+ // Nothing to do by default.
}
+ // --------------------------- Helper Methods -----------------------------
+
/** Create a specified {@link Consumer} by the given split information. */
protected Consumer<byte[]> createPulsarConsumer(PulsarPartitionSplit split) {
return createPulsarConsumer(split.getPartition());
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
index 5940cc9ac19..cc0167d8d14 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
@@ -126,7 +126,7 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSpl
}
@Override
- protected void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
+ protected void afterCreatingConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
TxnID uncommittedTransactionId = split.getUncommittedTransactionId();
// Abort the uncommitted pulsar transaction.
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
index 90e29ca4712..458be403fbf 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
@@ -30,13 +30,15 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import javax.annotation.Nullable;
+import java.io.Serializable;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A {@link SourceSplit} implementation for a Pulsar's partition. */
@Internal
-public class PulsarPartitionSplit implements SourceSplit {
+public class PulsarPartitionSplit implements SourceSplit, Serializable {
+ private static final long serialVersionUID = -6857317360756062625L;
private final TopicPartition partition;
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 2f1faa53672..fa46f9a4cdc 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -24,8 +24,6 @@ import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicConsumingC
import org.apache.flink.connector.pulsar.testutils.cases.SingleTopicConsumingContext;
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
-import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -58,22 +56,4 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> {
@TestContext
PulsarTestContextFactory<String, MultipleTopicConsumingContext> multipleTopic =
new PulsarTestContextFactory<>(pulsar, MultipleTopicConsumingContext::new);
-
- @Override
- public void testScaleUp(
- TestEnvironment testEnv,
- DataStreamSourceExternalContext<String> externalContext,
- CheckpointingMode semantic)
- throws Exception {
- super.testScaleUp(testEnv, externalContext, semantic);
- }
-
- @Override
- public void testScaleDown(
- TestEnvironment testEnv,
- DataStreamSourceExternalContext<String> externalContext,
- CheckpointingMode semantic)
- throws Exception {
- super.testScaleDown(testEnv, externalContext, semantic);
- }
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
index 38c40ed88ef..5f18e8f5131 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
@@ -34,6 +34,7 @@ import java.util.Set;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer.INSTANCE;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.newMessageId;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
@@ -51,7 +52,9 @@ class PulsarSourceEnumStateSerializerTest {
Collections.singleton(
new PulsarPartitionSplit(
new TopicPartition(randomAlphabetic(10), 10, createFullRange()),
- StopCursor.defaultStopCursor()));
+ StopCursor.defaultStopCursor(),
+ newMessageId(100L, 23L, 44),
+ null));
Map<Integer, Set<PulsarPartitionSplit>> shared = Collections.singletonMap(5, splits);
Map<Integer, Set<String>> mapping =
ImmutableMap.of(
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
index 1dcbe84ba61..aebb76119df 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
@@ -51,6 +53,7 @@ import java.util.stream.Collectors;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
import static org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.latest;
import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicPatternSubscriber;
@@ -368,6 +371,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
Configuration configuration = operator().config();
configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType);
+ configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
if (enablePeriodicPartitionDiscovery) {
configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 60L);
} else {
@@ -375,15 +379,15 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
}
SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration);
- SplitsAssignmentState assignmentState =
- new SplitsAssignmentState(latest(), sourceConfiguration, sourceEnumState);
+ SplitAssigner assigner =
+ SplitAssignerFactory.create(latest(), sourceConfiguration, sourceEnumState);
return new PulsarSourceEnumerator(
subscriber,
StartCursor.earliest(),
new FullRangeGenerator(),
sourceConfiguration,
enumContext,
- assignmentState);
+ assigner);
}
private void registerReader(
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentStateTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentStateTest.java
deleted file mode 100644
index ac811c3dddb..00000000000
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentStateTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator;
-
-import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-
-import static java.util.Collections.singletonList;
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MAX_RANGE;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.InstanceOfAssertFactories.map;
-
-/** Unit tests for {@link SplitsAssignmentState}. */
-class SplitsAssignmentStateTest {
-
- private final Set<TopicPartition> partitions =
- Sets.newHashSet(
- new TopicPartition("some-topic", 1, new TopicRange(1, 30)),
- new TopicPartition("some-topic", 2, new TopicRange(31, 60)),
- new TopicPartition("some-topic", 3, new TopicRange(61, MAX_RANGE)),
- new TopicPartition(randomAlphabetic(10), -1, createFullRange()));
-
- @Test
- void assignSplitsForSharedSubscription() {
- SplitsAssignmentState state1 =
- new SplitsAssignmentState(
- StopCursor.defaultStopCursor(), createConfig(SubscriptionType.Shared));
- state1.appendTopicPartitions(partitions);
- Optional<SplitsAssignment<PulsarPartitionSplit>> assignment1 =
- state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4));
-
- assertThat(assignment1)
- .isPresent()
- .get()
- .extracting(SplitsAssignment::assignment)
- .asInstanceOf(map(Integer.class, List.class))
- .hasSize(5)
- .allSatisfy((idx, list) -> assertThat(list).hasSize(4));
-
- Optional<SplitsAssignment<PulsarPartitionSplit>> assignment2 =
- state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4));
- assertThat(assignment2).isNotPresent();
-
- // Reassign reader 3.
- state1.putSplitsBackToPendingList(assignment1.get().assignment().get(3), 3);
- Optional<SplitsAssignment<PulsarPartitionSplit>> assignment3 =
- state1.assignSplits(Lists.newArrayList(0, 1, 2, 4));
- assertThat(assignment3).isNotPresent();
-
- Optional<SplitsAssignment<PulsarPartitionSplit>> assignment4 =
- state1.assignSplits(singletonList(3));
- assertThat(assignment4)
- .isPresent()
- .get()
- .extracting(SplitsAssignment::assignment)
- .asInstanceOf(map(Integer.class, List.class))
- .hasSize(1);
- }
-
- @Test
- void assignSplitsForExclusiveSubscription() {
- SplitsAssignmentState state1 =
- new SplitsAssignmentState(
- StopCursor.defaultStopCursor(), createConfig(SubscriptionType.Exclusive));
- state1.appendTopicPartitions(partitions);
- Optional<SplitsAssignment<PulsarPartitionSplit>> assignment1 =
- state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4));
-
- assertThat(assignment1).isPresent();
- assertThat(assignment1.get().assignment())
- .hasSize(4)
- .allSatisfy((idx, list) -> assertThat(list).hasSize(1));
-
- Optional<SplitsAssignment<PulsarPartitionSplit>> assignment2 =
- state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4));
- assertThat(assignment2).isNotPresent();
- }
-
- private SourceConfiguration createConfig(SubscriptionType type) {
- Configuration configuration = new Configuration();
- configuration.set(PULSAR_SUBSCRIPTION_TYPE, type);
-
- return new SourceConfiguration(configuration);
- }
-}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
new file mode 100644
index 00000000000..2e9ada3b741
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for {@link NonSharedSplitAssigner}. */
+class NonSharedSplitAssignerTest extends SplitAssignerTestBase<NonSharedSplitAssigner> {
+
+ @Test
+ void noMoreSplits() {
+ NonSharedSplitAssigner assigner = splitAssigner(true);
+ assertFalse(assigner.noMoreSplits(3));
+
+ assigner = splitAssigner(false);
+ assertFalse(assigner.noMoreSplits(3));
+
+ assigner.registerTopicPartitions(createPartitions("f", 8));
+ assertFalse(assigner.noMoreSplits(3));
+
+ assigner.createAssignment(singletonList(1));
+ assertTrue(assigner.noMoreSplits(1));
+ assertTrue(assigner.noMoreSplits(3));
+ }
+
+ @Test
+ void partitionsAssignment() {
+ NonSharedSplitAssigner assigner = splitAssigner(true);
+ assigner.registerTopicPartitions(createPartitions("d", 4));
+ List<Integer> readers = Arrays.asList(1, 3, 5, 7);
+
+ // Assignment with initial states.
+ Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
+ assigner.createAssignment(readers);
+ assertThat(assignment).isPresent();
+ assertThat(assignment.get().assignment()).hasSize(1);
+
+ // Reassignment with same readers.
+ assignment = assigner.createAssignment(readers);
+ assertThat(assignment).isNotPresent();
+
+ // Register new partition and assign.
+ assigner.registerTopicPartitions(createPartitions("e", 5));
+ assigner.registerTopicPartitions(createPartitions("f", 1));
+ assigner.registerTopicPartitions(createPartitions("g", 3));
+ assigner.registerTopicPartitions(createPartitions("h", 4));
+ assignment = assigner.createAssignment(readers);
+ assertThat(assignment).isPresent();
+ assertThat(assignment.get().assignment()).hasSize(4);
+
+ // Assign to new readers.
+ readers = Arrays.asList(2, 4, 6, 8);
+ assignment = assigner.createAssignment(readers);
+ assertThat(assignment).isNotPresent();
+ }
+
+ @Override
+ protected NonSharedSplitAssigner createAssigner(
+ StopCursor stopCursor,
+ SourceConfiguration sourceConfiguration,
+ PulsarSourceEnumState sourceEnumState) {
+ return new NonSharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
new file mode 100644
index 00000000000..91584b87688
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for {@link SharedSplitAssigner}. */
+class SharedSplitAssignerTest extends SplitAssignerTestBase<SharedSplitAssigner> {
+
+ @Test
+ void noMoreSplits() {
+ SharedSplitAssigner assigner = splitAssigner(true);
+ assertFalse(assigner.noMoreSplits(3));
+
+ assigner = splitAssigner(false);
+ assertFalse(assigner.noMoreSplits(3));
+
+ assigner.registerTopicPartitions(createPartitions("f", 8));
+ assertFalse(assigner.noMoreSplits(3));
+
+ assigner.createAssignment(singletonList(1));
+ assertTrue(assigner.noMoreSplits(1));
+ assertFalse(assigner.noMoreSplits(3));
+
+ assigner.createAssignment(singletonList(3));
+ assertTrue(assigner.noMoreSplits(3));
+ }
+
+ @Test
+ void partitionsAssignment() {
+ SharedSplitAssigner assigner = splitAssigner(true);
+ assigner.registerTopicPartitions(createPartitions("d", 4));
+ List<Integer> readers = Arrays.asList(1, 3, 5, 7);
+
+ // Assignment with initial states.
+ Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
+ assigner.createAssignment(readers);
+ assertThat(assignment).isPresent();
+ assertThat(assignment.get().assignment()).hasSize(4);
+
+ // Reassignment with same readers.
+ assignment = assigner.createAssignment(readers);
+ assertThat(assignment).isNotPresent();
+
+ // Register new partition and assign.
+ assigner.registerTopicPartitions(createPartitions("e", 5));
+ assignment = assigner.createAssignment(readers);
+ assertThat(assignment).isPresent();
+ assertThat(assignment.get().assignment()).hasSize(4);
+
+ // Assign to new readers.
+ readers = Arrays.asList(2, 4, 6, 8);
+ assignment = assigner.createAssignment(readers);
+ assertThat(assignment).isPresent();
+ assertThat(assignment.get().assignment())
+ .hasSize(4)
+ .allSatisfy((k, v) -> assertThat(v).hasSize(2));
+ }
+
+ @Override
+ protected SharedSplitAssigner createAssigner(
+ StopCursor stopCursor,
+ SourceConfiguration sourceConfiguration,
+ PulsarSourceEnumState sourceEnumState) {
+ return new SharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java
new file mode 100644
index 00000000000..65094014720
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.defaultStopCursor;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test utils for split assigners. */
+abstract class SplitAssignerTestBase<T extends SplitAssigner> extends TestLogger {
+
+ @Test
+ void registerTopicPartitionsWillOnlyReturnNewPartitions() {
+ T assigner = splitAssigner(true);
+
+ Set<TopicPartition> partitions = createPartitions("persistent://public/default/a", 1);
+ List<TopicPartition> newPartitions = assigner.registerTopicPartitions(partitions);
+ assertThat(newPartitions)
+ .hasSize(1)
+ .first()
+ .hasFieldOrPropertyWithValue("topic", "persistent://public/default/a")
+ .hasFieldOrPropertyWithValue("partitionId", 1);
+
+ newPartitions = assigner.registerTopicPartitions(partitions);
+ assertThat(newPartitions).isEmpty();
+
+ partitions = createPartitions("persistent://public/default/b", 2);
+ newPartitions = assigner.registerTopicPartitions(partitions);
+ assertThat(newPartitions)
+ .hasSize(1)
+ .hasSize(1)
+ .first()
+ .hasFieldOrPropertyWithValue("topic", "persistent://public/default/b")
+ .hasFieldOrPropertyWithValue("partitionId", 2);
+ }
+
+ @Test
+ void noReadersProvideForAssignment() {
+ T assigner = splitAssigner(false);
+ assigner.registerTopicPartitions(createPartitions("c", 5));
+
+ Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
+ assigner.createAssignment(emptyList());
+ assertThat(assignment).isNotPresent();
+ }
+
+ @Test
+ void noPartitionsProvideForAssignment() {
+ T assigner = splitAssigner(true);
+ Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
+ assigner.createAssignment(singletonList(4));
+ assertThat(assignment).isNotPresent();
+ }
+
+ protected Set<TopicPartition> createPartitions(String topic, int partitionId) {
+ TopicPartition p1 = new TopicPartition(topic, partitionId, createFullRange());
+ return singleton(p1);
+ }
+
+ protected T splitAssigner(boolean discovery) {
+ Configuration configuration = new Configuration();
+
+ if (discovery) {
+ configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 1000L);
+ } else {
+ configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L);
+ }
+
+ SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration);
+ return createAssigner(defaultStopCursor(), sourceConfiguration, initialState());
+ }
+
+ protected abstract T createAssigner(
+ StopCursor stopCursor,
+ SourceConfiguration sourceConfiguration,
+ PulsarSourceEnumState sourceEnumState);
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
index d003107793a..301368b6f4a 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
@@ -43,12 +42,13 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema;
import static org.assertj.core.api.Assertions.assertThat;
/** Test different implementation of StopCursor. */
-public class StopCursorTest extends PulsarTestSuiteBase {
+class StopCursorTest extends PulsarTestSuiteBase {
@Test
void publishTimeStopCursor() throws IOException {
@@ -64,7 +64,7 @@ public class StopCursorTest extends PulsarTestSuiteBase {
// send the first message and set the stopCursor to filter any late stopCursor
operator()
.sendMessage(
- TopicNameUtils.topicNameWithPartition(topicName, 0),
+ topicNameWithPartition(topicName, 0),
Schema.STRING,
randomAlphanumeric(10));
long currentTimeStamp = System.currentTimeMillis();
@@ -85,12 +85,11 @@ public class StopCursorTest extends PulsarTestSuiteBase {
// send the second message and expect it will not be received
operator()
.sendMessage(
- TopicNameUtils.topicNameWithPartition(topicName, 0),
+ topicNameWithPartition(topicName, 0),
Schema.STRING,
randomAlphanumeric(10));
RecordsWithSplitIds<PulsarMessage<String>> secondResult = splitReader.fetch();
- assertThat(secondResult.nextSplit()).isNotNull();
- assertThat(firstResult.nextRecordFromSplit()).isNull();
+ assertThat(secondResult.nextSplit()).isNull();
assertThat(secondResult.finishedSplits()).isNotEmpty();
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
index 538e45826d7..9ffbda74260 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
@@ -86,7 +85,7 @@ import static org.assertj.core.api.Assertions.assertThat;
TestOrderlinessExtension.class,
TestLoggerExtension.class,
})
-public abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuiteBase {
+abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuiteBase {
@RegisterExtension
PulsarSplitReaderInvocationContextProvider provider =
@@ -138,8 +137,7 @@ public abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuite
// create consumer and seek before split changes
try (Consumer<byte[]> consumer = reader.createPulsarConsumer(partition)) {
// inclusive messageId
- StartCursor startCursor = StartCursor.fromMessageId(startPosition);
- startCursor.seekPosition(partition.getTopic(), partition.getPartitionId(), consumer);
+ consumer.seek(startPosition);
} catch (PulsarClientException e) {
sneakyThrow(e);
}
@@ -185,7 +183,7 @@ public abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuite
if (verify) {
assertThat(messages).as("We should fetch the expected size").hasSize(expectedCount);
if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED) {
- assertThat(finishedSplits).as("Split should not be marked as finished").hasSize(0);
+ assertThat(finishedSplits).as("Split should not be marked as finished").isEmpty();
} else {
assertThat(finishedSplits).as("Split should be marked as finished").hasSize(1);
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
index f238a03bfa5..9eaa24041c7 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
@@ -24,6 +24,9 @@ import org.apache.flink.connector.testframe.external.source.DataStreamSourceExte
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
/** Common test context for pulsar based test. */
public abstract class PulsarTestContext<T> implements DataStreamSourceExternalContext<T> {
@@ -39,10 +42,13 @@ public abstract class PulsarTestContext<T> implements DataStreamSourceExternalCo
// Helper methods for generating data.
protected List<String> generateStringTestData(int splitIndex, long seed) {
- int recordNum = 300;
+ Random random = new Random(seed);
+ int recordNum = 300 + random.nextInt(200);
List<String> records = new ArrayList<>(recordNum);
+
for (int i = 0; i < recordNum; i++) {
- records.add(splitIndex + "-" + i);
+ int length = random.nextInt(40) + 10;
+ records.add(splitIndex + "-" + i + "-" + randomAlphanumeric(length));
}
return records;
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index b8c49581f75..be387c583b1 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -31,13 +31,14 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TxnID;
@@ -49,10 +50,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -62,9 +61,7 @@ import java.util.stream.Stream;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toMap;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
@@ -79,8 +76,10 @@ import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WR
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest;
import static org.apache.pulsar.client.api.SubscriptionMode.Durable;
import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
/**
* A pulsar cluster operator used for operating pulsar instance. It's serializable for using in
@@ -178,7 +177,7 @@ public class PulsarRuntimeOperator implements Closeable {
*/
public void createTopic(String topic, int numberOfPartitions) {
checkArgument(numberOfPartitions >= 0);
- if (numberOfPartitions <= 0) {
+ if (numberOfPartitions == 0) {
createNonPartitionedTopic(topic);
} else {
createPartitionedTopic(topic, numberOfPartitions);
@@ -196,7 +195,7 @@ public class PulsarRuntimeOperator implements Closeable {
sneakyAdmin(() -> admin().topics().getPartitionedTopicMetadata(topic));
checkArgument(
metadata.partitions < newPartitionsNum,
- "The new partition size which should exceed previous size.");
+ "The new partition size which should greater than previous size.");
sneakyAdmin(() -> admin().topics().updatePartitionedTopic(topic, newPartitionsNum));
}
@@ -220,9 +219,11 @@ public class PulsarRuntimeOperator implements Closeable {
return;
}
+ // Close all the available consumers and producers.
removeConsumers(topic);
removeProducers(topic);
- if (metadata.partitions <= 0) {
+
+ if (metadata.partitions == NON_PARTITIONED) {
sneakyAdmin(() -> admin().topics().delete(topicName));
} else {
sneakyAdmin(() -> admin().topics().deletePartitionedTopic(topicName));
@@ -245,22 +246,6 @@ public class PulsarRuntimeOperator implements Closeable {
}
}
- /**
- * Query a list of topics. Convert the topic metadata into a list of topic partitions. Return a
- * mapping for topic and its partitions.
- */
- public Map<String, List<TopicPartition>> topicsInfo(String... topics) {
- return topicsInfo(Arrays.asList(topics));
- }
-
- /**
- * Query a list of topics. Convert the topic metadata into a list of topic partitions. Return a
- * mapping for topic and its partitions.
- */
- public Map<String, List<TopicPartition>> topicsInfo(Collection<String> topics) {
- return topics.stream().collect(toMap(identity(), this::topicInfo));
- }
-
/**
* Send a single message to Pulsar, return the message id after the ack from Pulsar.
*
@@ -518,12 +503,13 @@ public class PulsarRuntimeOperator implements Closeable {
topicProducers.computeIfAbsent(
index,
i -> {
- try {
- return client().newProducer(schema).topic(topic).create();
- } catch (PulsarClientException e) {
- sneakyThrow(e);
- return null;
- }
+ ProducerBuilder<T> builder =
+ client().newProducer(schema)
+ .topic(topic)
+ .enableBatching(false)
+ .enableMultiSchema(true);
+
+ return sneakyClient(builder::create);
});
}
@@ -540,19 +526,15 @@ public class PulsarRuntimeOperator implements Closeable {
topicConsumers.computeIfAbsent(
index,
i -> {
- try {
- return client().newConsumer(schema)
- .topic(topic)
- .subscriptionName(SUBSCRIPTION_NAME)
- .subscriptionMode(Durable)
- .subscriptionType(Exclusive)
- .subscriptionInitialPosition(
- SubscriptionInitialPosition.Earliest)
- .subscribe();
- } catch (PulsarClientException e) {
- sneakyThrow(e);
- return null;
- }
+ ConsumerBuilder<T> builder =
+ client().newConsumer(schema)
+ .topic(topic)
+ .subscriptionName(SUBSCRIPTION_NAME)
+ .subscriptionMode(Durable)
+ .subscriptionType(Exclusive)
+ .subscriptionInitialPosition(Earliest);
+
+ return sneakyClient(builder::subscribe);
});
}
@@ -561,11 +543,7 @@ public class PulsarRuntimeOperator implements Closeable {
ConcurrentHashMap<Integer, Producer<?>> integerProducers = producers.remove(topicName);
if (integerProducers != null) {
for (Producer<?> producer : integerProducers.values()) {
- try {
- producer.close();
- } catch (PulsarClientException e) {
- sneakyThrow(e);
- }
+ sneakyClient(producer::close);
}
}
}
@@ -575,11 +553,7 @@ public class PulsarRuntimeOperator implements Closeable {
ConcurrentHashMap<Integer, Consumer<?>> integerConsumers = consumers.remove(topicName);
if (integerConsumers != null) {
for (Consumer<?> consumer : integerConsumers.values()) {
- try {
- consumer.close();
- } catch (PulsarClientException e) {
- sneakyThrow(e);
- }
+ sneakyClient(consumer::close);
}
}
}