You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/30 13:47:15 UTC

[flink] branch master updated: [FLINK-25440][doc][pulsar] Stop and Start cursor now all uses publishTime instead of eventTime; doc changed to reflect this change

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b411e34  [FLINK-25440][doc][pulsar] Stop and Start cursor now all uses publishTime instead of eventTime; doc changed to reflect this change
b411e34 is described below

commit b411e34a9e996f697f9bc1f3cc571032efea38bf
Author: Yufei Zhang <af...@gmail.com>
AuthorDate: Wed Jan 12 22:29:37 2022 +0800

    [FLINK-25440][doc][pulsar] Stop and Start cursor now all uses publishTime instead of eventTime; doc changed to reflect this change
---
 .../docs/connectors/datastream/pulsar.md           |   8 +-
 docs/content/docs/connectors/datastream/pulsar.md  |  10 +-
 .../source/enumerator/cursor/StopCursor.java       |  19 +++-
 .../cursor/start/TimestampStartCursor.java         |   2 +-
 ...opCursor.java => EventTimestampStopCursor.java} |   6 +-
 ...Cursor.java => PublishTimestampStopCursor.java} |  10 +-
 .../source/enumerator/cursor/StopCursorTest.java   | 105 +++++++++++++++++++++
 7 files changed, 144 insertions(+), 16 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md
index 3ba5989..8d44be8 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -239,11 +239,15 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
   ```java
   StopCursor.afterMessageId(MessageId);
   ```
-- 停止于某个给定的消息时间戳。
+- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`。
   ```java
-  StopCursor.atEventTime(long);
+  StopCursor.atPublishTime(long);
   ```
 
+{{< hint warning >}}
+StopCursor.atEventTime(long) 目前已经处于弃用状态。
+{{< /hint >}}
+
 ### Source 配置项
 
 除了前面提到的配置选项,Pulsar Source 还提供了丰富的选项供 Pulsar 专家使用,在 builder 类里通过 `setConfig(ConfigOption<T>, T)` 和 `setConfig(Configuration)` 方法给定下述的全部配置。
diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md
index 11e62cc..0953753 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -241,7 +241,7 @@ The Pulsar connector consumes from the latest available message if the message I
   ```java
   StartCursor.fromMessageId(MessageId, boolean);
   ```
-- Start from the specified message time by `Message<byte[]>.getEventTime()`.
+- Start from the specified message time by `Message<byte[]>.getPublishTime()`.
   ```java
   StartCursor.fromMessageTime(long);
   ```
@@ -281,11 +281,15 @@ Built-in stop cursors include:
   ```java
   StopCursor.afterMessageId(MessageId);
   ```
-- Stop at the specified message time by `Message<byte[]>.getEventTime()`.
+- Stop at the specified message time by `Message<byte[]>.getPublishTime()`.
   ```java
-  StopCursor.atEventTime(long);
+  StopCursor.atPublishTime(long);
   ```
 
+{{< hint warning >}}
+StopCursor.atEventTime(long) is now deprecated.
+  {{< /hint >}}
+
 ### Source Configurable Options
 
 In addition to configuration options described above, you can set arbitrary options for `PulsarClient`,
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
index aaec143..0bf46ce 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
@@ -19,10 +19,11 @@
 package org.apache.flink.connector.pulsar.source.enumerator.cursor;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.EventTimestampStopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.LatestMessageStopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.MessageIdStopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.TimestampStopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.PublishTimestampStopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -63,15 +64,29 @@ public interface StopCursor extends Serializable {
         return new LatestMessageStopCursor();
     }
 
+    /**
+     * Stop when the messageId is equal or greater than the specified messageId. Message that is
+     * equal to the specified messageId will not be consumed.
+     */
     static StopCursor atMessageId(MessageId messageId) {
         return new MessageIdStopCursor(messageId);
     }
 
+    /**
+     * Stop when the messageId is greater than the specified messageId. Message that is equal to the
+     * specified messageId will be consumed.
+     */
     static StopCursor afterMessageId(MessageId messageId) {
         return new MessageIdStopCursor(messageId, false);
     }
 
+    @Deprecated
     static StopCursor atEventTime(long timestamp) {
-        return new TimestampStopCursor(timestamp);
+        return new EventTimestampStopCursor(timestamp);
+    }
+
+    /** Stop when message publishTime is greater than the specified timestamp. */
+    static StopCursor atPublishTime(long timestamp) {
+        return new PublishTimestampStopCursor(timestamp);
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
index 88f3b5a..eb4ea32 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
@@ -21,7 +21,7 @@ package org.apache.flink.connector.pulsar.source.enumerator.cursor.start;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 
-/** This cursor would left pulsar start consuming from a specific timestamp. */
+/** This cursor would left pulsar start consuming from a specific publish timestamp. */
 public class TimestampStartCursor implements StartCursor {
     private static final long serialVersionUID = 5170578885838095320L;
 
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
similarity index 87%
copy from flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java
copy to flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
index 534e77f..e425545 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
@@ -23,12 +23,12 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.pulsar.client.api.Message;
 
 /** Stop consuming message at the given event time. */
-public class TimestampStopCursor implements StopCursor {
-    private static final long serialVersionUID = 3381576769339353027L;
+public class EventTimestampStopCursor implements StopCursor {
+    private static final long serialVersionUID = 2391576769339369027L;
 
     private final long timestamp;
 
-    public TimestampStopCursor(long timestamp) {
+    public EventTimestampStopCursor(long timestamp) {
         this.timestamp = timestamp;
     }
 
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
similarity index 79%
rename from flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java
rename to flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
index 534e77f..b598e7a 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
@@ -22,18 +22,18 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 
 import org.apache.pulsar.client.api.Message;
 
-/** Stop consuming message at the given event time. */
-public class TimestampStopCursor implements StopCursor {
-    private static final long serialVersionUID = 3381576769339353027L;
+/** Stop consuming message at the given publish time. */
+public class PublishTimestampStopCursor implements StopCursor {
+    private static final long serialVersionUID = 4386276745339324527L;
 
     private final long timestamp;
 
-    public TimestampStopCursor(long timestamp) {
+    public PublishTimestampStopCursor(long timestamp) {
         this.timestamp = timestamp;
     }
 
     @Override
     public boolean shouldStop(Message<?> message) {
-        return message.getEventTime() >= timestamp;
+        return message.getPublishTime() >= timestamp;
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
new file mode 100644
index 0000000..d003107
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.cursor;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Schema;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
+import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test different implementation of StopCursor. */
+public class StopCursorTest extends PulsarTestSuiteBase {
+
+    @Test
+    void publishTimeStopCursor() throws IOException {
+        String topicName = randomAlphanumeric(5);
+        operator().createTopic(topicName, 2);
+
+        PulsarOrderedPartitionSplitReader<String> splitReader =
+                new PulsarOrderedPartitionSplitReader<>(
+                        operator().client(),
+                        operator().admin(),
+                        sourceConfig(),
+                        flinkSchema(new SimpleStringSchema()));
+        // send the first message and set the stopCursor to filter any late stopCursor
+        operator()
+                .sendMessage(
+                        TopicNameUtils.topicNameWithPartition(topicName, 0),
+                        Schema.STRING,
+                        randomAlphanumeric(10));
+        long currentTimeStamp = System.currentTimeMillis();
+        TopicPartition partition = new TopicPartition(topicName, 0, createFullRange());
+        PulsarPartitionSplit split =
+                new PulsarPartitionSplit(
+                        partition,
+                        StopCursor.atPublishTime(currentTimeStamp),
+                        MessageId.earliest,
+                        null);
+        SplitsAddition<PulsarPartitionSplit> addition = new SplitsAddition<>(singletonList(split));
+        splitReader.handleSplitsChanges(addition);
+        // first fetch should have result
+        RecordsWithSplitIds<PulsarMessage<String>> firstResult = splitReader.fetch();
+        assertThat(firstResult.nextSplit()).isNotNull();
+        assertThat(firstResult.nextRecordFromSplit()).isNotNull();
+        assertThat(firstResult.finishedSplits()).isEmpty();
+        // send the second message and expect it will not be received
+        operator()
+                .sendMessage(
+                        TopicNameUtils.topicNameWithPartition(topicName, 0),
+                        Schema.STRING,
+                        randomAlphanumeric(10));
+        RecordsWithSplitIds<PulsarMessage<String>> secondResult = splitReader.fetch();
+        assertThat(secondResult.nextSplit()).isNotNull();
+        assertThat(firstResult.nextRecordFromSplit()).isNull();
+        assertThat(secondResult.finishedSplits()).isNotEmpty();
+    }
+
+    private SourceConfiguration sourceConfig() {
+        Configuration config = operator().config();
+        config.set(PULSAR_MAX_FETCH_RECORDS, 1);
+        config.set(PULSAR_MAX_FETCH_TIME, 1000L);
+        config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
+        config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
+        return new SourceConfiguration(config);
+    }
+}