You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/24 16:03:52 UTC
[flink] branch release-1.14 updated: [FLINK-26160][pulsar][doc] update the doc of setUnboundedStopCursor()
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 3c830d0 [FLINK-26160][pulsar][doc] update the doc of setUnboundedStopCursor()
3c830d0 is described below
commit 3c830d02bbf763c9fe2fa3391cd17e078106ce1d
Author: Yufei Zhang <af...@gmail.com>
AuthorDate: Tue Dec 21 23:12:16 2021 +0800
[FLINK-26160][pulsar][doc] update the doc of setUnboundedStopCursor()
---
.../pulsar/source/PulsarSourceBuilder.java | 30 ++++++++++++++--------
1 file changed, 20 insertions(+), 10 deletions(-)
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index e539a0c..6e9cfc0 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -86,11 +86,17 @@ import static org.apache.flink.util.Preconditions.checkState;
* <p>To specify the starting position of PulsarSource, one can call {@link
* #setStartCursor(StartCursor)}.
*
- * <p>By default the PulsarSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and never
- * stop until the Flink job is canceled or fails. To let the PulsarSource run in {@link
+ * <p>By default, the PulsarSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and
+ * never stop until the Flink job is canceled or fails. To let the PulsarSource run in {@link
* Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link
- * #setUnboundedStopCursor(StopCursor)}. For example the following PulsarSource stops after it
- * consumes up to a event time when the Flink started.
+ * #setUnboundedStopCursor(StopCursor)} and disable auto partition discovery as described below. For
+ * example the following PulsarSource stops after it consumes up to a event time when the Flink
+ * started.
+ *
+ * <p>To stop the connector user has to disable the auto partition discovery. As auto partition
+ * discovery always expected new splits to come and not exiting. To disable auto partition
+ * discovery, use builder.setConfig({@link
+ * PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
*
* <pre>{@code
* PulsarSource<String> source = PulsarSource
@@ -100,7 +106,7 @@ import static org.apache.flink.util.Preconditions.checkState;
* .setSubscriptionName("flink-source-1")
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
* .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
- * .setUnbounded(StopCursor.atEventTime(System.currentTimeMillis()))
+ * .setUnboundedStopCursor(StopCursor.atEventTime(System.currentTimeMillis()))
* .build();
* }</pre>
*
@@ -300,17 +306,21 @@ public final class PulsarSourceBuilder<OUT> {
}
/**
- * By default the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
- * and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run as
- * a streaming source but still stops at some point, one can set an {@link StopCursor} to
- * specify the stopping offsets for each partition. When all the partitions have reached their
- * stopping offsets, the PulsarSource will then exit.
+ * By default, the PulsarSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and
+ * never stop until the Flink job is canceled or fails. To let the PulsarSource run in {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link
+ * #setUnboundedStopCursor(StopCursor)} and disable auto partition discovery as described below.
*
* <p>This method is different from {@link #setBoundedStopCursor(StopCursor)} that after setting
* the stopping offsets with this method, {@link PulsarSource#getBoundedness()} will still
* return {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will stop at the stopping
* offsets specified by the stopping offsets {@link StopCursor}.
*
+ * <p>To stop the connector user has to disable the auto partition discovery. As auto partition
+ * discovery always expected new splits to come and not exiting. To disable auto partition
+ * discovery, use builder.setConfig({@link
+ * PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
+ *
* @param stopCursor The {@link StopCursor} to specify the stopping offset.
* @return this PulsarSourceBuilder.
* @see #setBoundedStopCursor(StopCursor)