You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/16 09:41:11 UTC
[flink] 09/09: [FLINK-26038][connector/pulsar] Support delay message on PulsarSink.
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 07f23e0d3383941ceb475cdd753a59d07100bdf5
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Fri Feb 11 12:19:42 2022 +0800
[FLINK-26038][connector/pulsar] Support delay message on PulsarSink.
---
.../flink/connector/pulsar/sink/PulsarSink.java | 13 ++++-
.../connector/pulsar/sink/PulsarSinkBuilder.java | 20 ++++++-
.../connector/pulsar/sink/writer/PulsarWriter.java | 10 ++++
.../sink/writer/delayer/FixedMessageDelayer.java | 43 +++++++++++++++
.../pulsar/sink/writer/delayer/MessageDelayer.java | 62 ++++++++++++++++++++++
.../pulsar/sink/PulsarSinkBuilderTest.java | 1 -
.../pulsar/sink/writer/PulsarWriterTest.java | 5 +-
7 files changed, 149 insertions(+), 5 deletions(-)
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
index 811d5b5..4c6c4a9 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
@@ -28,6 +28,7 @@ import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittableSeriali
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter;
+import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
import org.apache.flink.connector.pulsar.sink.writer.router.KeyHashTopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
@@ -82,6 +83,7 @@ public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommitta
private final SinkConfiguration sinkConfiguration;
private final PulsarSerializationSchema<IN> serializationSchema;
private final TopicMetadataListener metadataListener;
+ private final MessageDelayer<IN> messageDelayer;
private final TopicRouter<IN> topicRouter;
PulsarSink(
@@ -89,10 +91,12 @@ public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommitta
PulsarSerializationSchema<IN> serializationSchema,
TopicMetadataListener metadataListener,
TopicRoutingMode topicRoutingMode,
- TopicRouter<IN> topicRouter) {
+ TopicRouter<IN> topicRouter,
+ MessageDelayer<IN> messageDelayer) {
this.sinkConfiguration = checkNotNull(sinkConfiguration);
this.serializationSchema = checkNotNull(serializationSchema);
this.metadataListener = checkNotNull(metadataListener);
+ this.messageDelayer = checkNotNull(messageDelayer);
checkNotNull(topicRoutingMode);
// Create topic router supplier.
@@ -119,7 +123,12 @@ public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommitta
@Override
public PrecommittingSinkWriter<IN, PulsarCommittable> createWriter(InitContext initContext) {
return new PulsarWriter<>(
- sinkConfiguration, serializationSchema, metadataListener, topicRouter, initContext);
+ sinkConfiguration,
+ serializationSchema,
+ metadataListener,
+ topicRouter,
+ messageDelayer,
+ initContext);
}
@Internal
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
index a0352f5..1668e3d 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSchemaWrapper;
@@ -101,6 +102,7 @@ public class PulsarSinkBuilder<IN> {
private TopicMetadataListener metadataListener;
private TopicRoutingMode topicRoutingMode;
private TopicRouter<IN> topicRouter;
+ private MessageDelayer<IN> messageDelayer;
// private builder constructor.
PulsarSinkBuilder() {
@@ -231,6 +233,17 @@ public class PulsarSinkBuilder<IN> {
}
/**
+ * Set a message delayer for enable Pulsar message delay delivery.
+ *
+ * @param messageDelayer The delayer which would defined when to send the message to consumer.
+ * @return this PulsarSinkBuilder.
+ */
+ public PulsarSinkBuilder<IN> delaySendingMessage(MessageDelayer<IN> messageDelayer) {
+ this.messageDelayer = checkNotNull(messageDelayer);
+ return this;
+ }
+
+ /**
* Set an arbitrary property for the PulsarSink and Pulsar Producer. The valid keys can be found
* in {@link PulsarSinkOptions} and {@link PulsarOptions}.
*
@@ -331,6 +344,10 @@ public class PulsarSinkBuilder<IN> {
this.topicRoutingMode = TopicRoutingMode.ROUND_ROBIN;
}
+ if (messageDelayer == null) {
+ this.messageDelayer = MessageDelayer.never();
+ }
+
// This is an unmodifiable configuration for Pulsar.
// We don't use Pulsar's built-in configure classes for compatible requirement.
SinkConfiguration sinkConfiguration =
@@ -341,7 +358,8 @@ public class PulsarSinkBuilder<IN> {
serializationSchema,
metadataListener,
topicRoutingMode,
- topicRouter);
+ topicRouter,
+ messageDelayer);
}
// ------------- private helpers --------------
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
index 9b3c931..1e4113a 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
@@ -29,6 +29,7 @@ import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextImpl;
+import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
@@ -67,6 +68,7 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
private final PulsarSerializationSchema<IN> serializationSchema;
private final TopicMetadataListener metadataListener;
private final TopicRouter<IN> topicRouter;
+ private final MessageDelayer<IN> messageDelayer;
private final DeliveryGuarantee deliveryGuarantee;
private final PulsarSinkContext sinkContext;
private final MailboxExecutor mailboxExecutor;
@@ -92,11 +94,13 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
PulsarSerializationSchema<IN> serializationSchema,
TopicMetadataListener metadataListener,
TopicRouter<IN> topicRouter,
+ MessageDelayer<IN> messageDelayer,
InitContext initContext) {
this.sinkConfiguration = checkNotNull(sinkConfiguration);
this.serializationSchema = checkNotNull(serializationSchema);
this.metadataListener = checkNotNull(metadataListener);
this.topicRouter = checkNotNull(topicRouter);
+ this.messageDelayer = checkNotNull(messageDelayer);
checkNotNull(initContext);
this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
@@ -136,6 +140,12 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
// Create message builder for sending message.
TypedMessageBuilder<?> builder = createMessageBuilder(topic, context, message);
+ // Message Delay delivery.
+ long deliverAt = messageDelayer.deliverAt(element, sinkContext);
+ if (deliverAt > 0) {
+ builder.deliverAt(deliverAt);
+ }
+
// Perform message sending.
if (deliveryGuarantee == DeliveryGuarantee.NONE) {
// We would just ignore the sending exception. This may cause data loss.
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/delayer/FixedMessageDelayer.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/delayer/FixedMessageDelayer.java
new file mode 100644
index 0000000..c11d2f8
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/delayer/FixedMessageDelayer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.delayer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+
+/** A delayer for making sure all the messages would be sent in a fixed delay duration. */
+@PublicEvolving
+public class FixedMessageDelayer<IN> implements MessageDelayer<IN> {
+ private static final long serialVersionUID = -7550834520312097614L;
+
+ private final long delayDuration;
+
+ public FixedMessageDelayer(long delayDuration) {
+ this.delayDuration = delayDuration;
+ }
+
+ @Override
+ public long deliverAt(IN message, PulsarSinkContext sinkContext) {
+ if (delayDuration > 0) {
+ return sinkContext.processTime() + delayDuration;
+ } else {
+ return delayDuration;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/delayer/MessageDelayer.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/delayer/MessageDelayer.java
new file mode 100644
index 0000000..53a345b
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/delayer/MessageDelayer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.delayer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/**
+ * A delayer for Pulsar broker passing the sent message to the downstream consumer. This is only
+ * works in {@link SubscriptionType#Shared} subscription.
+ *
+ * <p>Read <a
+ * href="https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery">delayed
+ * message delivery</a> for better understanding this feature.
+ */
+@PublicEvolving
+public interface MessageDelayer<IN> extends Serializable {
+
+ /**
+ * Return the send time for this message. You should calculate the timestamp by using {@link
+ * PulsarSinkContext#processTime()} and the non-positive value indicate this message should be
+ * sent immediately.
+ */
+ long deliverAt(IN message, PulsarSinkContext sinkContext);
+
+ /** Implement this method if you have some non-serializable field. */
+ default void open(SinkConfiguration sinkConfiguration) {
+ // Nothing to do by default.
+ }
+
+ /** All the messages should be consumed immediately. */
+ static <IN> FixedMessageDelayer<IN> never() {
+ return new FixedMessageDelayer<>(-1L);
+ }
+
+ /** All the messages should be consumed in a fixed duration. */
+ static <IN> FixedMessageDelayer<IN> fixed(Duration duration) {
+ return new FixedMessageDelayer<>(duration.toMillis());
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
index 188e718..0e0db88 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
@@ -26,7 +26,6 @@ import org.junit.jupiter.api.Test;
import java.util.Properties;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS;
-import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION;
import static org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode.CUSTOM;
import static org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode.MESSAGE_KEY_HASH;
import static org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode.ROUND_ROBIN;
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
index 1534fb5..942b759 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.delayer.FixedMessageDelayer;
+import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
import org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
@@ -80,10 +82,11 @@ class PulsarWriterTest extends PulsarTestSuiteBase {
PulsarSerializationSchema<String> schema = pulsarSchema(STRING);
TopicMetadataListener listener = new TopicMetadataListener(singletonList(topic));
RoundRobinTopicRouter<String> router = new RoundRobinTopicRouter<>(configuration);
+ FixedMessageDelayer<String> delayer = MessageDelayer.never();
MockInitContext initContext = new MockInitContext();
PulsarWriter<String> writer =
- new PulsarWriter<>(configuration, schema, listener, router, initContext);
+ new PulsarWriter<>(configuration, schema, listener, router, delayer, initContext);
writer.flush(false);
writer.prepareCommit();