You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/16 09:41:11 UTC

[flink] 09/09: [FLINK-26038][connector/pulsar] Support delay message on PulsarSink.

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07f23e0d3383941ceb475cdd753a59d07100bdf5
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Fri Feb 11 12:19:42 2022 +0800

    [FLINK-26038][connector/pulsar] Support delay message on PulsarSink.
---
 .../flink/connector/pulsar/sink/PulsarSink.java    | 13 ++++-
 .../connector/pulsar/sink/PulsarSinkBuilder.java   | 20 ++++++-
 .../connector/pulsar/sink/writer/PulsarWriter.java | 10 ++++
 .../sink/writer/delayer/FixedMessageDelayer.java   | 43 +++++++++++++++
 .../pulsar/sink/writer/delayer/MessageDelayer.java | 62 ++++++++++++++++++++++
 .../pulsar/sink/PulsarSinkBuilderTest.java         |  1 -
 .../pulsar/sink/writer/PulsarWriterTest.java       |  5 +-
 7 files changed, 149 insertions(+), 5 deletions(-)

diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
index 811d5b5..4c6c4a9 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
@@ -28,6 +28,7 @@ import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittableSeriali
 import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter;
 import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
 import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter;
+import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
 import org.apache.flink.connector.pulsar.sink.writer.router.KeyHashTopicRouter;
 import org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter;
 import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
@@ -82,6 +83,7 @@ public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommitta
     private final SinkConfiguration sinkConfiguration;
     private final PulsarSerializationSchema<IN> serializationSchema;
     private final TopicMetadataListener metadataListener;
+    private final MessageDelayer<IN> messageDelayer;
     private final TopicRouter<IN> topicRouter;
 
     PulsarSink(
@@ -89,10 +91,12 @@ public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommitta
             PulsarSerializationSchema<IN> serializationSchema,
             TopicMetadataListener metadataListener,
             TopicRoutingMode topicRoutingMode,
-            TopicRouter<IN> topicRouter) {
+            TopicRouter<IN> topicRouter,
+            MessageDelayer<IN> messageDelayer) {
         this.sinkConfiguration = checkNotNull(sinkConfiguration);
         this.serializationSchema = checkNotNull(serializationSchema);
         this.metadataListener = checkNotNull(metadataListener);
+        this.messageDelayer = checkNotNull(messageDelayer);
         checkNotNull(topicRoutingMode);
 
         // Create topic router supplier.
@@ -119,7 +123,12 @@ public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommitta
     @Override
     public PrecommittingSinkWriter<IN, PulsarCommittable> createWriter(InitContext initContext) {
         return new PulsarWriter<>(
-                sinkConfiguration, serializationSchema, metadataListener, topicRouter, initContext);
+                sinkConfiguration,
+                serializationSchema,
+                metadataListener,
+                topicRouter,
+                messageDelayer,
+                initContext);
     }
 
     @Internal
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
index a0352f5..1668e3d 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder;
 import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
 import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
 import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
 import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
 import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSchemaWrapper;
@@ -101,6 +102,7 @@ public class PulsarSinkBuilder<IN> {
     private TopicMetadataListener metadataListener;
     private TopicRoutingMode topicRoutingMode;
     private TopicRouter<IN> topicRouter;
+    private MessageDelayer<IN> messageDelayer;
 
     // private builder constructor.
     PulsarSinkBuilder() {
@@ -231,6 +233,17 @@ public class PulsarSinkBuilder<IN> {
     }
 
     /**
+     * Set a message delayer for enable Pulsar message delay delivery.
+     *
+     * @param messageDelayer The delayer which would defined when to send the message to consumer.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> delaySendingMessage(MessageDelayer<IN> messageDelayer) {
+        this.messageDelayer = checkNotNull(messageDelayer);
+        return this;
+    }
+
+    /**
      * Set an arbitrary property for the PulsarSink and Pulsar Producer. The valid keys can be found
      * in {@link PulsarSinkOptions} and {@link PulsarOptions}.
      *
@@ -331,6 +344,10 @@ public class PulsarSinkBuilder<IN> {
             this.topicRoutingMode = TopicRoutingMode.ROUND_ROBIN;
         }
 
+        if (messageDelayer == null) {
+            this.messageDelayer = MessageDelayer.never();
+        }
+
         // This is an unmodifiable configuration for Pulsar.
         // We don't use Pulsar's built-in configure classes for compatible requirement.
         SinkConfiguration sinkConfiguration =
@@ -341,7 +358,8 @@ public class PulsarSinkBuilder<IN> {
                 serializationSchema,
                 metadataListener,
                 topicRoutingMode,
-                topicRouter);
+                topicRouter,
+                messageDelayer);
     }
 
     // ------------- private helpers  --------------
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
index 9b3c931..1e4113a 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
@@ -29,6 +29,7 @@ import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
 import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
 import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
 import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextImpl;
+import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
 import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
 import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
 import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
@@ -67,6 +68,7 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
     private final PulsarSerializationSchema<IN> serializationSchema;
     private final TopicMetadataListener metadataListener;
     private final TopicRouter<IN> topicRouter;
+    private final MessageDelayer<IN> messageDelayer;
     private final DeliveryGuarantee deliveryGuarantee;
     private final PulsarSinkContext sinkContext;
     private final MailboxExecutor mailboxExecutor;
@@ -92,11 +94,13 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
             PulsarSerializationSchema<IN> serializationSchema,
             TopicMetadataListener metadataListener,
             TopicRouter<IN> topicRouter,
+            MessageDelayer<IN> messageDelayer,
             InitContext initContext) {
         this.sinkConfiguration = checkNotNull(sinkConfiguration);
         this.serializationSchema = checkNotNull(serializationSchema);
         this.metadataListener = checkNotNull(metadataListener);
         this.topicRouter = checkNotNull(topicRouter);
+        this.messageDelayer = checkNotNull(messageDelayer);
         checkNotNull(initContext);
 
         this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
@@ -136,6 +140,12 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
         // Create message builder for sending message.
         TypedMessageBuilder<?> builder = createMessageBuilder(topic, context, message);
 
+        // Message Delay delivery.
+        long deliverAt = messageDelayer.deliverAt(element, sinkContext);
+        if (deliverAt > 0) {
+            builder.deliverAt(deliverAt);
+        }
+
         // Perform message sending.
         if (deliveryGuarantee == DeliveryGuarantee.NONE) {
             // We would just ignore the sending exception. This may cause data loss.
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/delayer/FixedMessageDelayer.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/delayer/FixedMessageDelayer.java
new file mode 100644
index 0000000..c11d2f8
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/delayer/FixedMessageDelayer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.delayer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+
+/** A delayer for making sure all the messages would be sent in a fixed delay duration. */
+@PublicEvolving
+public class FixedMessageDelayer<IN> implements MessageDelayer<IN> {
+    private static final long serialVersionUID = -7550834520312097614L;
+
+    private final long delayDuration;
+
+    public FixedMessageDelayer(long delayDuration) {
+        this.delayDuration = delayDuration;
+    }
+
+    @Override
+    public long deliverAt(IN message, PulsarSinkContext sinkContext) {
+        if (delayDuration > 0) {
+            return sinkContext.processTime() + delayDuration;
+        } else {
+            return delayDuration;
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/delayer/MessageDelayer.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/delayer/MessageDelayer.java
new file mode 100644
index 0000000..53a345b
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/delayer/MessageDelayer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.delayer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/**
+ * A delayer for Pulsar broker passing the sent message to the downstream consumer. This is only
+ * works in {@link SubscriptionType#Shared} subscription.
+ *
+ * <p>Read <a
+ * href="https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery">delayed
+ * message delivery</a> for better understanding this feature.
+ */
+@PublicEvolving
+public interface MessageDelayer<IN> extends Serializable {
+
+    /**
+     * Return the send time for this message. You should calculate the timestamp by using {@link
+     * PulsarSinkContext#processTime()} and the non-positive value indicate this message should be
+     * sent immediately.
+     */
+    long deliverAt(IN message, PulsarSinkContext sinkContext);
+
+    /** Implement this method if you have some non-serializable field. */
+    default void open(SinkConfiguration sinkConfiguration) {
+        // Nothing to do by default.
+    }
+
+    /** All the messages should be consumed immediately. */
+    static <IN> FixedMessageDelayer<IN> never() {
+        return new FixedMessageDelayer<>(-1L);
+    }
+
+    /** All the messages should be consumed in a fixed duration. */
+    static <IN> FixedMessageDelayer<IN> fixed(Duration duration) {
+        return new FixedMessageDelayer<>(duration.toMillis());
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
index 188e718..0e0db88 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
@@ -26,7 +26,6 @@ import org.junit.jupiter.api.Test;
 import java.util.Properties;
 
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS;
-import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION;
 import static org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode.CUSTOM;
 import static org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode.MESSAGE_KEY_HASH;
 import static org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode.ROUND_ROBIN;
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
index 1534fb5..942b759 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
 import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.delayer.FixedMessageDelayer;
+import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
 import org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter;
 import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
 import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
@@ -80,10 +82,11 @@ class PulsarWriterTest extends PulsarTestSuiteBase {
         PulsarSerializationSchema<String> schema = pulsarSchema(STRING);
         TopicMetadataListener listener = new TopicMetadataListener(singletonList(topic));
         RoundRobinTopicRouter<String> router = new RoundRobinTopicRouter<>(configuration);
+        FixedMessageDelayer<String> delayer = MessageDelayer.never();
         MockInitContext initContext = new MockInitContext();
 
         PulsarWriter<String> writer =
-                new PulsarWriter<>(configuration, schema, listener, router, initContext);
+                new PulsarWriter<>(configuration, schema, listener, router, delayer, initContext);
 
         writer.flush(false);
         writer.prepareCommit();