You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/16 09:41:07 UTC

[flink] 05/09: [FLINK-26024][connector/pulsar] Create a PulsarSerializationSchema for better records serialization.

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e72bfede70a00146f466b3e7491fc0f83eb6c41
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Feb 15 22:21:50 2022 +0800

    [FLINK-26024][connector/pulsar] Create a PulsarSerializationSchema for better records serialization.
---
 .../pulsar/sink/writer/message/PulsarMessage.java  | 111 ++++++++++++++++++
 .../sink/writer/message/PulsarMessageBuilder.java  | 127 ++++++++++++++++++++
 .../writer/serializer/PulsarSchemaWrapper.java     |  59 ++++++++++
 .../serializer/PulsarSerializationSchema.java      | 129 +++++++++++++++++++++
 .../PulsarSerializationSchemaWrapper.java          |  59 ++++++++++
 5 files changed, 485 insertions(+)

diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessage.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessage.java
new file mode 100644
index 0000000..0c45763
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessage.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.message;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The message instance would be used for {@link TypedMessageBuilder}. We create this class because
+ * the Pulsar lacks such kind of POJO class.
+ */
+@PublicEvolving
+public class PulsarMessage<T> {
+
+    @Nullable private final byte[] orderingKey;
+    @Nullable private final String key;
+    private final long eventTime;
+    private final Schema<T> schema;
+    @Nullable private final T value;
+    @Nullable private final Map<String, String> properties;
+    @Nullable private final Long sequenceId;
+    @Nullable private final List<String> replicationClusters;
+    private final boolean disableReplication;
+
+    /** Package private for building this class only in {@link PulsarMessageBuilder}. */
+    PulsarMessage(
+            @Nullable byte[] orderingKey,
+            @Nullable String key,
+            long eventTime,
+            Schema<T> schema,
+            @Nullable T value,
+            @Nullable Map<String, String> properties,
+            @Nullable Long sequenceId,
+            @Nullable List<String> replicationClusters,
+            boolean disableReplication) {
+        this.orderingKey = orderingKey;
+        this.key = key;
+        this.eventTime = eventTime;
+        this.schema = schema;
+        this.value = value;
+        this.properties = properties;
+        this.sequenceId = sequenceId;
+        this.replicationClusters = replicationClusters;
+        this.disableReplication = disableReplication;
+    }
+
+    @Nullable
+    public byte[] getOrderingKey() {
+        return orderingKey;
+    }
+
+    @Nullable
+    public String getKey() {
+        return key;
+    }
+
+    public long getEventTime() {
+        return eventTime;
+    }
+
+    public Schema<T> getSchema() {
+        return schema;
+    }
+
+    @Nullable
+    public T getValue() {
+        return value;
+    }
+
+    @Nullable
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    @Nullable
+    public Long getSequenceId() {
+        return sequenceId;
+    }
+
+    @Nullable
+    public List<String> getReplicationClusters() {
+        return replicationClusters;
+    }
+
+    public boolean isDisableReplication() {
+        return disableReplication;
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessageBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessageBuilder.java
new file mode 100644
index 0000000..9330d09
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessageBuilder.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.message;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.pulsar.sink.writer.router.KeyHashTopicRouter;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link TypedMessageBuilder} wrapper for providing the required method for end-users. */
+@PublicEvolving
+public class PulsarMessageBuilder<T> {
+
+    private byte[] orderingKey;
+    private String key;
+    private long eventTime;
+    Schema<T> schema;
+    private T value;
+    private Map<String, String> properties = new HashMap<>();
+    private Long sequenceId;
+    private List<String> replicationClusters;
+    private boolean disableReplication = false;
+
+    /** Method wrapper of {@link TypedMessageBuilder#orderingKey(byte[])}. */
+    public PulsarMessageBuilder<T> orderingKey(byte[] orderingKey) {
+        this.orderingKey = checkNotNull(orderingKey);
+        return this;
+    }
+
+    /**
+     * Property {@link TypedMessageBuilder#key(String)}. This property would also be used in {@link
+     * KeyHashTopicRouter}.
+     */
+    public PulsarMessageBuilder<T> key(String key) {
+        this.key = checkNotNull(key);
+        return null;
+    }
+
+    /** Method wrapper of {@link TypedMessageBuilder#eventTime(long)}. */
+    public PulsarMessageBuilder<T> eventTime(long eventTime) {
+        this.eventTime = eventTime;
+        return this;
+    }
+
+    /**
+     * Method wrapper of {@link TypedMessageBuilder#value(Object)}. You can pass any schema for
+     * validating it on Pulsar. This is called schema evolution. But the topic on Pulsar should bind
+     * to a fixed {@link Schema}. You can't have multiple schemas on the same topic unless it's
+     * compatible with each other.
+     *
+     * @param value The value could be null, which is called tombstones message in Pulsar. (It will
+     *     be skipped and considered deleted.)
+     */
+    public PulsarMessageBuilder<T> value(Schema<T> schema, T value) {
+        this.schema = checkNotNull(schema);
+        this.value = value;
+        return this;
+    }
+
+    /** Method wrapper of {@link TypedMessageBuilder#property(String, String)}. */
+    public PulsarMessageBuilder<T> property(String key, String value) {
+        this.properties.put(checkNotNull(key), checkNotNull(value));
+        return this;
+    }
+
+    /** Method wrapper of {@link TypedMessageBuilder#properties(Map)}. */
+    public PulsarMessageBuilder<T> properties(Map<String, String> properties) {
+        this.properties.putAll(checkNotNull(properties));
+        return this;
+    }
+
+    /** Method wrapper of {@link TypedMessageBuilder#sequenceId(long)}. */
+    public PulsarMessageBuilder<T> sequenceId(long sequenceId) {
+        this.sequenceId = sequenceId;
+        return this;
+    }
+
+    /** Method wrapper of {@link TypedMessageBuilder#replicationClusters(List)}. */
+    public PulsarMessageBuilder<T> replicationClusters(List<String> replicationClusters) {
+        this.replicationClusters = checkNotNull(replicationClusters);
+        return this;
+    }
+
+    /** Method wrapper of {@link TypedMessageBuilder#disableReplication()}. */
+    public PulsarMessageBuilder<T> disableReplication() {
+        this.disableReplication = true;
+        return this;
+    }
+
+    public PulsarMessage<T> build() {
+        checkNotNull(schema, "Schema should be provided.");
+
+        return new PulsarMessage<>(
+                orderingKey,
+                key,
+                eventTime,
+                schema,
+                value,
+                properties,
+                sequenceId,
+                replicationClusters,
+                disableReplication);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java
new file mode 100644
index 0000000..0d5aaf0
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.serializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder;
+
+import org.apache.pulsar.client.api.Schema;
+
+/**
+ * Wrap the Pulsar's Schema into PulsarSerializationSchema. We support schema evolution out of box
+ * by this implementation.
+ */
+@Internal
+public class PulsarSchemaWrapper<IN> implements PulsarSerializationSchema<IN> {
+    private static final long serialVersionUID = -2567052498398184194L;
+
+    private final PulsarSchema<IN> pulsarSchema;
+
+    public PulsarSchemaWrapper(PulsarSchema<IN> pulsarSchema) {
+        this.pulsarSchema = pulsarSchema;
+    }
+
+    @Override
+    public PulsarMessage<?> serialize(IN element, PulsarSinkContext sinkContext) {
+        Schema<IN> schema = this.pulsarSchema.getPulsarSchema();
+        if (sinkContext.isEnableSchemaEvolution()) {
+            PulsarMessageBuilder<IN> builder = new PulsarMessageBuilder<>();
+            builder.value(schema, element);
+
+            return builder.build();
+        } else {
+            PulsarMessageBuilder<byte[]> builder = new PulsarMessageBuilder<>();
+            byte[] bytes = schema.encode(element);
+            builder.value(Schema.BYTES, bytes);
+
+            return builder.build();
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java
new file mode 100644
index 0000000..da7f706
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.serializer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkBuilder;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.schema.KeyValue;
+
+import java.io.Serializable;
+
+/**
+ * The serialization schema for how to serialize records into Pulsar.
+ *
+ * @param <IN> The message type send to Pulsar.
+ */
+@PublicEvolving
+public interface PulsarSerializationSchema<IN> extends Serializable {
+
+    /**
+     * Initialization method for the schema. It is called before the actual working methods {@link
+     * #serialize(Object, PulsarSinkContext)} and thus suitable for one-time setup work.
+     *
+     * <p>The provided {@link InitializationContext} can be used to access additional features such
+     * as registering user metrics.
+     *
+     * @param initializationContext Contextual information that can be used during initialization.
+     * @param sinkContext Runtime information i.e. partitions, subtaskId.
+     * @param sinkConfiguration All the configure options for the Pulsar sink. You can add custom
+     *     options.
+     */
+    default void open(
+            InitializationContext initializationContext,
+            PulsarSinkContext sinkContext,
+            SinkConfiguration sinkConfiguration)
+            throws Exception {
+        // Nothing to do by default.
+    }
+
+    /**
+     * Serializes the given element into bytes and {@link Schema#BYTES}. Or you can convert it to a
+     * new type of instance with a {@link Schema}. The return value {@link PulsarMessage} can be
+     * built by {@link PulsarMessageBuilder}. All the methods provided in the {@link
+     * PulsarMessageBuilder} is just equals to the {@link TypedMessageBuilder}.
+     *
+     * @param element Element to be serialized.
+     * @param sinkContext Context to provide extra information.
+     */
+    PulsarMessage<?> serialize(IN element, PulsarSinkContext sinkContext);
+
+    /**
+     * Create a PulsarSerializationSchema by using the flink's {@link SerializationSchema}. It would
+     * serialize the message into byte array and send it to Pulsar with {@link Schema#BYTES}.
+     */
+    static <T> PulsarSerializationSchema<T> flinkSchema(
+            SerializationSchema<T> serializationSchema) {
+        return new PulsarSerializationSchemaWrapper<>(serializationSchema);
+    }
+
+    /**
+     * Create a PulsarSerializationSchema by using the Pulsar {@link Schema} instance. We can send
+     * message with the given schema to Pulsar, this would be enabled by {@link
+     * PulsarSinkBuilder#enableSchemaEvolution()}. We would serialize the message into bytes and
+     * send it as {@link Schema#BYTES} by default.
+     *
+     * <p>We only support <a
+     * href="https://pulsar.apache.org/docs/en/schema-understand/#primitive-type">primitive
+     * types</a> here.
+     */
+    static <T> PulsarSerializationSchema<T> pulsarSchema(Schema<T> schema) {
+        PulsarSchema<T> pulsarSchema = new PulsarSchema<>(schema);
+        return new PulsarSchemaWrapper<>(pulsarSchema);
+    }
+
+    /**
+     * Create a PulsarSerializationSchema by using the Pulsar {@link Schema} instance. We can send
+     * message with the given schema to Pulsar, this would be enabled by {@link
+     * PulsarSinkBuilder#enableSchemaEvolution()}. We would serialize the message into bytes and
+     * send it as {@link Schema#BYTES} by default.
+     *
+     * <p>We only support <a
+     * href="https://pulsar.apache.org/docs/en/schema-understand/#struct">struct types</a> here.
+     */
+    static <T> PulsarSerializationSchema<T> pulsarSchema(Schema<T> schema, Class<T> typeClass) {
+        PulsarSchema<T> pulsarSchema = new PulsarSchema<>(schema, typeClass);
+        return new PulsarSchemaWrapper<>(pulsarSchema);
+    }
+
+    /**
+     * Create a PulsarSerializationSchema by using the Pulsar {@link Schema} instance. We can send
+     * message with the given schema to Pulsar, this would be enabled by {@link
+     * PulsarSinkBuilder#enableSchemaEvolution()}. We would serialize the message into bytes and
+     * send it as {@link Schema#BYTES} by default.
+     *
+     * <p>We only support <a
+     * href="https://pulsar.apache.org/docs/en/schema-understand/#keyvalue">keyvalue types</a> here.
+     */
+    static <K, V> PulsarSerializationSchema<KeyValue<K, V>> pulsarSchema(
+            Schema<KeyValue<K, V>> schema, Class<K> keyClass, Class<V> valueClass) {
+        PulsarSchema<KeyValue<K, V>> pulsarSchema =
+                new PulsarSchema<>(schema, keyClass, valueClass);
+        return new PulsarSchemaWrapper<>(pulsarSchema);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchemaWrapper.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchemaWrapper.java
new file mode 100644
index 0000000..716d2db
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchemaWrapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.serializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder;
+
+import org.apache.pulsar.client.api.Schema;
+
+/** Wrap the Flink's SerializationSchema into PulsarSerializationSchema. */
+@Internal
+public class PulsarSerializationSchemaWrapper<IN> implements PulsarSerializationSchema<IN> {
+    private static final long serialVersionUID = 4948155843623161119L;
+
+    private final SerializationSchema<IN> serializationSchema;
+
+    public PulsarSerializationSchemaWrapper(SerializationSchema<IN> serializationSchema) {
+        this.serializationSchema = serializationSchema;
+    }
+
+    @Override
+    public void open(
+            InitializationContext initializationContext,
+            PulsarSinkContext sinkContext,
+            SinkConfiguration sinkConfiguration)
+            throws Exception {
+        serializationSchema.open(initializationContext);
+    }
+
+    @Override
+    public PulsarMessage<?> serialize(IN element, PulsarSinkContext sinkContext) {
+        PulsarMessageBuilder<byte[]> builder = new PulsarMessageBuilder<>();
+        byte[] value = serializationSchema.serialize(element);
+        builder.value(Schema.BYTES, value);
+
+        return builder.build();
+    }
+}