You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/16 09:41:07 UTC
[flink] 05/09: [FLINK-26024][connector/pulsar] Create a PulsarSerializationSchema for better records serialization.
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0e72bfede70a00146f466b3e7491fc0f83eb6c41
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Feb 15 22:21:50 2022 +0800
[FLINK-26024][connector/pulsar] Create a PulsarSerializationSchema for better records serialization.
---
.../pulsar/sink/writer/message/PulsarMessage.java | 111 ++++++++++++++++++
.../sink/writer/message/PulsarMessageBuilder.java | 127 ++++++++++++++++++++
.../writer/serializer/PulsarSchemaWrapper.java | 59 ++++++++++
.../serializer/PulsarSerializationSchema.java | 129 +++++++++++++++++++++
.../PulsarSerializationSchemaWrapper.java | 59 ++++++++++
5 files changed, 485 insertions(+)
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessage.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessage.java
new file mode 100644
index 0000000..0c45763
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessage.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.message;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The message instance would be used for {@link TypedMessageBuilder}. We create this class because
+ * the Pulsar lacks such kind of POJO class.
+ */
+@PublicEvolving
+public class PulsarMessage<T> {
+
+ @Nullable private final byte[] orderingKey;
+ @Nullable private final String key;
+ private final long eventTime;
+ private final Schema<T> schema;
+ @Nullable private final T value;
+ @Nullable private final Map<String, String> properties;
+ @Nullable private final Long sequenceId;
+ @Nullable private final List<String> replicationClusters;
+ private final boolean disableReplication;
+
+ /** Package private for building this class only in {@link PulsarMessageBuilder}. */
+ PulsarMessage(
+ @Nullable byte[] orderingKey,
+ @Nullable String key,
+ long eventTime,
+ Schema<T> schema,
+ @Nullable T value,
+ @Nullable Map<String, String> properties,
+ @Nullable Long sequenceId,
+ @Nullable List<String> replicationClusters,
+ boolean disableReplication) {
+ this.orderingKey = orderingKey;
+ this.key = key;
+ this.eventTime = eventTime;
+ this.schema = schema;
+ this.value = value;
+ this.properties = properties;
+ this.sequenceId = sequenceId;
+ this.replicationClusters = replicationClusters;
+ this.disableReplication = disableReplication;
+ }
+
+ @Nullable
+ public byte[] getOrderingKey() {
+ return orderingKey;
+ }
+
+ @Nullable
+ public String getKey() {
+ return key;
+ }
+
+ public long getEventTime() {
+ return eventTime;
+ }
+
+ public Schema<T> getSchema() {
+ return schema;
+ }
+
+ @Nullable
+ public T getValue() {
+ return value;
+ }
+
+ @Nullable
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ @Nullable
+ public Long getSequenceId() {
+ return sequenceId;
+ }
+
+ @Nullable
+ public List<String> getReplicationClusters() {
+ return replicationClusters;
+ }
+
+ public boolean isDisableReplication() {
+ return disableReplication;
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessageBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessageBuilder.java
new file mode 100644
index 0000000..9330d09
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessageBuilder.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.message;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.pulsar.sink.writer.router.KeyHashTopicRouter;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link TypedMessageBuilder} wrapper for providing the required method for end-users. */
+@PublicEvolving
+public class PulsarMessageBuilder<T> {
+
+ private byte[] orderingKey;
+ private String key;
+ private long eventTime;
+ Schema<T> schema;
+ private T value;
+ private Map<String, String> properties = new HashMap<>();
+ private Long sequenceId;
+ private List<String> replicationClusters;
+ private boolean disableReplication = false;
+
+ /** Method wrapper of {@link TypedMessageBuilder#orderingKey(byte[])}. */
+ public PulsarMessageBuilder<T> orderingKey(byte[] orderingKey) {
+ this.orderingKey = checkNotNull(orderingKey);
+ return this;
+ }
+
+ /**
+ * Property {@link TypedMessageBuilder#key(String)}. This property would also be used in {@link
+ * KeyHashTopicRouter}.
+ */
+ public PulsarMessageBuilder<T> key(String key) {
+ this.key = checkNotNull(key);
+ return null;
+ }
+
+ /** Method wrapper of {@link TypedMessageBuilder#eventTime(long)}. */
+ public PulsarMessageBuilder<T> eventTime(long eventTime) {
+ this.eventTime = eventTime;
+ return this;
+ }
+
+ /**
+ * Method wrapper of {@link TypedMessageBuilder#value(Object)}. You can pass any schema for
+ * validating it on Pulsar. This is called schema evolution. But the topic on Pulsar should bind
+ * to a fixed {@link Schema}. You can't have multiple schemas on the same topic unless it's
+ * compatible with each other.
+ *
+ * @param value The value could be null, which is called tombstones message in Pulsar. (It will
+ * be skipped and considered deleted.)
+ */
+ public PulsarMessageBuilder<T> value(Schema<T> schema, T value) {
+ this.schema = checkNotNull(schema);
+ this.value = value;
+ return this;
+ }
+
+ /** Method wrapper of {@link TypedMessageBuilder#property(String, String)}. */
+ public PulsarMessageBuilder<T> property(String key, String value) {
+ this.properties.put(checkNotNull(key), checkNotNull(value));
+ return this;
+ }
+
+ /** Method wrapper of {@link TypedMessageBuilder#properties(Map)}. */
+ public PulsarMessageBuilder<T> properties(Map<String, String> properties) {
+ this.properties.putAll(checkNotNull(properties));
+ return this;
+ }
+
+ /** Method wrapper of {@link TypedMessageBuilder#sequenceId(long)}. */
+ public PulsarMessageBuilder<T> sequenceId(long sequenceId) {
+ this.sequenceId = sequenceId;
+ return this;
+ }
+
+ /** Method wrapper of {@link TypedMessageBuilder#replicationClusters(List)}. */
+ public PulsarMessageBuilder<T> replicationClusters(List<String> replicationClusters) {
+ this.replicationClusters = checkNotNull(replicationClusters);
+ return this;
+ }
+
+ /** Method wrapper of {@link TypedMessageBuilder#disableReplication()}. */
+ public PulsarMessageBuilder<T> disableReplication() {
+ this.disableReplication = true;
+ return this;
+ }
+
+ public PulsarMessage<T> build() {
+ checkNotNull(schema, "Schema should be provided.");
+
+ return new PulsarMessage<>(
+ orderingKey,
+ key,
+ eventTime,
+ schema,
+ value,
+ properties,
+ sequenceId,
+ replicationClusters,
+ disableReplication);
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java
new file mode 100644
index 0000000..0d5aaf0
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.serializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder;
+
+import org.apache.pulsar.client.api.Schema;
+
+/**
+ * Wrap the Pulsar's Schema into PulsarSerializationSchema. We support schema evolution out of box
+ * by this implementation.
+ */
+@Internal
+public class PulsarSchemaWrapper<IN> implements PulsarSerializationSchema<IN> {
+ private static final long serialVersionUID = -2567052498398184194L;
+
+ private final PulsarSchema<IN> pulsarSchema;
+
+ public PulsarSchemaWrapper(PulsarSchema<IN> pulsarSchema) {
+ this.pulsarSchema = pulsarSchema;
+ }
+
+ @Override
+ public PulsarMessage<?> serialize(IN element, PulsarSinkContext sinkContext) {
+ Schema<IN> schema = this.pulsarSchema.getPulsarSchema();
+ if (sinkContext.isEnableSchemaEvolution()) {
+ PulsarMessageBuilder<IN> builder = new PulsarMessageBuilder<>();
+ builder.value(schema, element);
+
+ return builder.build();
+ } else {
+ PulsarMessageBuilder<byte[]> builder = new PulsarMessageBuilder<>();
+ byte[] bytes = schema.encode(element);
+ builder.value(Schema.BYTES, bytes);
+
+ return builder.build();
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java
new file mode 100644
index 0000000..da7f706
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.serializer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkBuilder;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.schema.KeyValue;
+
+import java.io.Serializable;
+
+/**
+ * The serialization schema for how to serialize records into Pulsar.
+ *
+ * @param <IN> The message type send to Pulsar.
+ */
+@PublicEvolving
+public interface PulsarSerializationSchema<IN> extends Serializable {
+
+ /**
+ * Initialization method for the schema. It is called before the actual working methods {@link
+ * #serialize(Object, PulsarSinkContext)} and thus suitable for one-time setup work.
+ *
+ * <p>The provided {@link InitializationContext} can be used to access additional features such
+ * as registering user metrics.
+ *
+ * @param initializationContext Contextual information that can be used during initialization.
+ * @param sinkContext Runtime information i.e. partitions, subtaskId.
+ * @param sinkConfiguration All the configure options for the Pulsar sink. You can add custom
+ * options.
+ */
+ default void open(
+ InitializationContext initializationContext,
+ PulsarSinkContext sinkContext,
+ SinkConfiguration sinkConfiguration)
+ throws Exception {
+ // Nothing to do by default.
+ }
+
+ /**
+ * Serializes the given element into bytes and {@link Schema#BYTES}. Or you can convert it to a
+ * new type of instance with a {@link Schema}. The return value {@link PulsarMessage} can be
+ * built by {@link PulsarMessageBuilder}. All the methods provided in the {@link
+ * PulsarMessageBuilder} is just equals to the {@link TypedMessageBuilder}.
+ *
+ * @param element Element to be serialized.
+ * @param sinkContext Context to provide extra information.
+ */
+ PulsarMessage<?> serialize(IN element, PulsarSinkContext sinkContext);
+
+ /**
+ * Create a PulsarSerializationSchema by using the flink's {@link SerializationSchema}. It would
+ * serialize the message into byte array and send it to Pulsar with {@link Schema#BYTES}.
+ */
+ static <T> PulsarSerializationSchema<T> flinkSchema(
+ SerializationSchema<T> serializationSchema) {
+ return new PulsarSerializationSchemaWrapper<>(serializationSchema);
+ }
+
+ /**
+ * Create a PulsarSerializationSchema by using the Pulsar {@link Schema} instance. We can send
+ * message with the given schema to Pulsar, this would be enabled by {@link
+ * PulsarSinkBuilder#enableSchemaEvolution()}. We would serialize the message into bytes and
+ * send it as {@link Schema#BYTES} by default.
+ *
+ * <p>We only support <a
+ * href="https://pulsar.apache.org/docs/en/schema-understand/#primitive-type">primitive
+ * types</a> here.
+ */
+ static <T> PulsarSerializationSchema<T> pulsarSchema(Schema<T> schema) {
+ PulsarSchema<T> pulsarSchema = new PulsarSchema<>(schema);
+ return new PulsarSchemaWrapper<>(pulsarSchema);
+ }
+
+ /**
+ * Create a PulsarSerializationSchema by using the Pulsar {@link Schema} instance. We can send
+ * message with the given schema to Pulsar, this would be enabled by {@link
+ * PulsarSinkBuilder#enableSchemaEvolution()}. We would serialize the message into bytes and
+ * send it as {@link Schema#BYTES} by default.
+ *
+ * <p>We only support <a
+ * href="https://pulsar.apache.org/docs/en/schema-understand/#struct">struct types</a> here.
+ */
+ static <T> PulsarSerializationSchema<T> pulsarSchema(Schema<T> schema, Class<T> typeClass) {
+ PulsarSchema<T> pulsarSchema = new PulsarSchema<>(schema, typeClass);
+ return new PulsarSchemaWrapper<>(pulsarSchema);
+ }
+
+ /**
+ * Create a PulsarSerializationSchema by using the Pulsar {@link Schema} instance. We can send
+ * message with the given schema to Pulsar, this would be enabled by {@link
+ * PulsarSinkBuilder#enableSchemaEvolution()}. We would serialize the message into bytes and
+ * send it as {@link Schema#BYTES} by default.
+ *
+ * <p>We only support <a
+ * href="https://pulsar.apache.org/docs/en/schema-understand/#keyvalue">keyvalue types</a> here.
+ */
+ static <K, V> PulsarSerializationSchema<KeyValue<K, V>> pulsarSchema(
+ Schema<KeyValue<K, V>> schema, Class<K> keyClass, Class<V> valueClass) {
+ PulsarSchema<KeyValue<K, V>> pulsarSchema =
+ new PulsarSchema<>(schema, keyClass, valueClass);
+ return new PulsarSchemaWrapper<>(pulsarSchema);
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchemaWrapper.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchemaWrapper.java
new file mode 100644
index 0000000..716d2db
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchemaWrapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.serializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder;
+
+import org.apache.pulsar.client.api.Schema;
+
+/** Wrap the Flink's SerializationSchema into PulsarSerializationSchema. */
+@Internal
+public class PulsarSerializationSchemaWrapper<IN> implements PulsarSerializationSchema<IN> {
+ private static final long serialVersionUID = 4948155843623161119L;
+
+ private final SerializationSchema<IN> serializationSchema;
+
+ public PulsarSerializationSchemaWrapper(SerializationSchema<IN> serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ }
+
+ @Override
+ public void open(
+ InitializationContext initializationContext,
+ PulsarSinkContext sinkContext,
+ SinkConfiguration sinkConfiguration)
+ throws Exception {
+ serializationSchema.open(initializationContext);
+ }
+
+ @Override
+ public PulsarMessage<?> serialize(IN element, PulsarSinkContext sinkContext) {
+ PulsarMessageBuilder<byte[]> builder = new PulsarMessageBuilder<>();
+ byte[] value = serializationSchema.serialize(element);
+ builder.value(Schema.BYTES, value);
+
+ return builder.build();
+ }
+}