You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/31 11:40:54 UTC
[flink] branch release-1.15 updated: [FLINK-25440][doc][pulsar] Stop and Start cursor now all uses publishTime instead of eventTime; doc changed to reflect this change
This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new f6a49a5 [FLINK-25440][doc][pulsar] Stop and Start cursor now all uses publishTime instead of eventTime; doc changed to reflect this change
f6a49a5 is described below
commit f6a49a5627acb10c5c32b7d7e95b79c3a2a1cdd8
Author: Yufei Zhang <af...@gmail.com>
AuthorDate: Wed Jan 12 22:29:37 2022 +0800
[FLINK-25440][doc][pulsar] Stop and Start cursor now all uses publishTime instead of eventTime; doc changed to reflect this change
---
.../docs/connectors/datastream/pulsar.md | 8 +-
docs/content/docs/connectors/datastream/pulsar.md | 10 +-
.../source/enumerator/cursor/StopCursor.java | 19 +++-
.../cursor/start/TimestampStartCursor.java | 2 +-
...opCursor.java => EventTimestampStopCursor.java} | 6 +-
...Cursor.java => PublishTimestampStopCursor.java} | 10 +-
.../source/enumerator/cursor/StopCursorTest.java | 105 +++++++++++++++++++++
7 files changed, 144 insertions(+), 16 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md
index 3ba5989..8d44be8 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -239,11 +239,15 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
```java
StopCursor.afterMessageId(MessageId);
```
-- 停止于某个给定的消息时间戳。
+- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`。
```java
- StopCursor.atEventTime(long);
+ StopCursor.atPublishTime(long);
```
+{{< hint warning >}}
+StopCursor.atEventTime(long) 目前已经处于弃用状态。
+{{< /hint >}}
+
### Source 配置项
除了前面提到的配置选项,Pulsar Source 还提供了丰富的选项供 Pulsar 专家使用,在 builder 类里通过 `setConfig(ConfigOption<T>, T)` 和 `setConfig(Configuration)` 方法给定下述的全部配置。
diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md
index 11e62cc..0953753 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -241,7 +241,7 @@ The Pulsar connector consumes from the latest available message if the message I
```java
StartCursor.fromMessageId(MessageId, boolean);
```
-- Start from the specified message time by `Message<byte[]>.getEventTime()`.
+- Start from the specified message time by `Message<byte[]>.getPublishTime()`.
```java
StartCursor.fromMessageTime(long);
```
@@ -281,11 +281,15 @@ Built-in stop cursors include:
```java
StopCursor.afterMessageId(MessageId);
```
-- Stop at the specified message time by `Message<byte[]>.getEventTime()`.
+- Stop at the specified message time by `Message<byte[]>.getPublishTime()`.
```java
- StopCursor.atEventTime(long);
+ StopCursor.atPublishTime(long);
```
+{{< hint warning >}}
+StopCursor.atEventTime(long) is now deprecated.
+ {{< /hint >}}
+
### Source Configurable Options
In addition to configuration options described above, you can set arbitrary options for `PulsarClient`,
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
index aaec143..0bf46ce 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
@@ -19,10 +19,11 @@
package org.apache.flink.connector.pulsar.source.enumerator.cursor;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.EventTimestampStopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.LatestMessageStopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.MessageIdStopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.TimestampStopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.PublishTimestampStopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -63,15 +64,29 @@ public interface StopCursor extends Serializable {
return new LatestMessageStopCursor();
}
+ /**
+ * Stop when the messageId is equal or greater than the specified messageId. Message that is
+ * equal to the specified messageId will not be consumed.
+ */
static StopCursor atMessageId(MessageId messageId) {
return new MessageIdStopCursor(messageId);
}
+ /**
+ * Stop when the messageId is greater than the specified messageId. Message that is equal to the
+ * specified messageId will be consumed.
+ */
static StopCursor afterMessageId(MessageId messageId) {
return new MessageIdStopCursor(messageId, false);
}
+ @Deprecated
static StopCursor atEventTime(long timestamp) {
- return new TimestampStopCursor(timestamp);
+ return new EventTimestampStopCursor(timestamp);
+ }
+
+ /** Stop when message publishTime is greater than the specified timestamp. */
+ static StopCursor atPublishTime(long timestamp) {
+ return new PublishTimestampStopCursor(timestamp);
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
index 88f3b5a..eb4ea32 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
@@ -21,7 +21,7 @@ package org.apache.flink.connector.pulsar.source.enumerator.cursor.start;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
-/** This cursor would left pulsar start consuming from a specific timestamp. */
+/** This cursor would left pulsar start consuming from a specific publish timestamp. */
public class TimestampStartCursor implements StartCursor {
private static final long serialVersionUID = 5170578885838095320L;
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
similarity index 87%
copy from flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java
copy to flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
index 534e77f..e425545 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
@@ -23,12 +23,12 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.pulsar.client.api.Message;
/** Stop consuming message at the given event time. */
-public class TimestampStopCursor implements StopCursor {
- private static final long serialVersionUID = 3381576769339353027L;
+public class EventTimestampStopCursor implements StopCursor {
+ private static final long serialVersionUID = 2391576769339369027L;
private final long timestamp;
- public TimestampStopCursor(long timestamp) {
+ public EventTimestampStopCursor(long timestamp) {
this.timestamp = timestamp;
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
similarity index 79%
rename from flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java
rename to flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
index 534e77f..b598e7a 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
@@ -22,18 +22,18 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.pulsar.client.api.Message;
-/** Stop consuming message at the given event time. */
-public class TimestampStopCursor implements StopCursor {
- private static final long serialVersionUID = 3381576769339353027L;
+/** Stop consuming message at the given publish time. */
+public class PublishTimestampStopCursor implements StopCursor {
+ private static final long serialVersionUID = 4386276745339324527L;
private final long timestamp;
- public TimestampStopCursor(long timestamp) {
+ public PublishTimestampStopCursor(long timestamp) {
this.timestamp = timestamp;
}
@Override
public boolean shouldStop(Message<?> message) {
- return message.getEventTime() >= timestamp;
+ return message.getPublishTime() >= timestamp;
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
new file mode 100644
index 0000000..d003107
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.cursor;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Schema;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
+import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test different implementation of StopCursor. */
+public class StopCursorTest extends PulsarTestSuiteBase {
+
+ @Test
+ void publishTimeStopCursor() throws IOException {
+ String topicName = randomAlphanumeric(5);
+ operator().createTopic(topicName, 2);
+
+ PulsarOrderedPartitionSplitReader<String> splitReader =
+ new PulsarOrderedPartitionSplitReader<>(
+ operator().client(),
+ operator().admin(),
+ sourceConfig(),
+ flinkSchema(new SimpleStringSchema()));
+ // send the first message and set the stopCursor to filter any late stopCursor
+ operator()
+ .sendMessage(
+ TopicNameUtils.topicNameWithPartition(topicName, 0),
+ Schema.STRING,
+ randomAlphanumeric(10));
+ long currentTimeStamp = System.currentTimeMillis();
+ TopicPartition partition = new TopicPartition(topicName, 0, createFullRange());
+ PulsarPartitionSplit split =
+ new PulsarPartitionSplit(
+ partition,
+ StopCursor.atPublishTime(currentTimeStamp),
+ MessageId.earliest,
+ null);
+ SplitsAddition<PulsarPartitionSplit> addition = new SplitsAddition<>(singletonList(split));
+ splitReader.handleSplitsChanges(addition);
+ // first fetch should have result
+ RecordsWithSplitIds<PulsarMessage<String>> firstResult = splitReader.fetch();
+ assertThat(firstResult.nextSplit()).isNotNull();
+ assertThat(firstResult.nextRecordFromSplit()).isNotNull();
+ assertThat(firstResult.finishedSplits()).isEmpty();
+ // send the second message and expect it will not be received
+ operator()
+ .sendMessage(
+ TopicNameUtils.topicNameWithPartition(topicName, 0),
+ Schema.STRING,
+ randomAlphanumeric(10));
+ RecordsWithSplitIds<PulsarMessage<String>> secondResult = splitReader.fetch();
+ assertThat(secondResult.nextSplit()).isNotNull();
+ assertThat(firstResult.nextRecordFromSplit()).isNull();
+ assertThat(secondResult.finishedSplits()).isNotEmpty();
+ }
+
+ private SourceConfiguration sourceConfig() {
+ Configuration config = operator().config();
+ config.set(PULSAR_MAX_FETCH_RECORDS, 1);
+ config.set(PULSAR_MAX_FETCH_TIME, 1000L);
+ config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
+ config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
+ return new SourceConfiguration(config);
+ }
+}