You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "anupamaggarwal (via GitHub)" <gi...@apache.org> on 2024/03/11 13:55:18 UTC

[PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

anupamaggarwal opened a new pull request, #24482:
URL: https://github.com/apache/flink/pull/24482

   
   ## What is the purpose of the change
   
   Support Debezium Protobuf Confluent Format
   
   
   ## Brief change log
   
   - Add support for protobuf-confluent format 
   
   
   ## Verifying this change
   
   TBD 
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (TBD)
     - The serializers: (yes)
     - The runtime per-record code paths (performance sensitive): (yes)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (tbd)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

Posted by "anupamaggarwal (via GitHub)" <gi...@apache.org>.
anupamaggarwal commented on code in PR #24482:
URL: https://github.com/apache/flink/pull/24482#discussion_r1520977487


##########
flink-formats/flink-protobuf-confluent-registry/pom.xml:
##########
@@ -0,0 +1,171 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-formats</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.20-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-protobuf-confluent-registry</artifactId>
+
+	<name>Flink : Formats : Profobuf confluent registry</name>
+
+	<properties>
+		<confluent.version>7.5.3</confluent.version>
+		<schema.registry.version>7.5.0-22</schema.registry.version>

Review Comment:
   thanks for catching, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

Posted by "anupamaggarwal (via GitHub)" <gi...@apache.org>.
anupamaggarwal commented on code in PR #24482:
URL: https://github.com/apache/flink/pull/24482#discussion_r1548361941


##########
flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/ProtoRegistrySerializationSchema.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.protobuf.DynamicMessage;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * Serialization schema that serializes an object of Flink internal data structure with a Protobuf
+ * format.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using {@link
+ * ProtoRegistryDeserializationSchema}.
+ */
+public class ProtoRegistrySerializationSchema implements SerializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    /** RowType to generate the runtime converter. */
+    private final RowType rowType;
+
+    private final SchemaRegistryConfig schemaRegistryConfig;
+
+    /** The converter that converts internal data formats to JsonNode. */
+    private transient RowDataToProtoConverters.RowDataToProtoConverter runtimeConverter;
+
+    private transient SchemaRegistryCoder schemaCoder;
+    /** Output stream to write message to. */
+    private transient ByteArrayOutputStream arrayOutputStream;
+
+    public ProtoRegistrySerializationSchema(SchemaRegistryConfig registryConfig, RowType rowType) {
+        this.rowType = Preconditions.checkNotNull(rowType);
+        this.schemaRegistryConfig = Preconditions.checkNotNull(registryConfig);
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        final SchemaRegistryClient schemaRegistryClient = schemaRegistryConfig.createClient();
+        this.schemaCoder =
+                new SchemaRegistryCoder(schemaRegistryConfig.getSchemaId(), schemaRegistryClient);
+        final ProtobufSchema schema =
+                (ProtobufSchema)
+                        schemaRegistryClient.getSchemaById(schemaRegistryConfig.getSchemaId());
+        this.runtimeConverter =
+                RowDataToProtoConverters.createConverter(rowType, schema.toDescriptor());
+        this.arrayOutputStream = new ByteArrayOutputStream();
+    }
+
+    @Override
+    public byte[] serialize(RowData row) {
+        try {
+            final DynamicMessage converted = (DynamicMessage) runtimeConverter.convert(row);
+
+            arrayOutputStream.reset();
+            schemaCoder.writeSchema(arrayOutputStream);
+            final ByteBuffer buffer = writeMessageIndexes();
+            arrayOutputStream.write(buffer.array());
+            converted.writeTo(arrayOutputStream);
+            return arrayOutputStream.toByteArray();
+        } catch (Throwable t) {
+            throw new FlinkRuntimeException(String.format("Could not serialize row '%s'.", row), t);
+        }
+    }
+
+    private static ByteBuffer writeMessageIndexes() {
+        //write empty message indices for now
+        ByteBuffer buffer = ByteBuffer.allocate(ByteUtils.sizeOfVarint(0));
+        ByteUtils.writeVarint(0, buffer);
+        return buffer;

Review Comment:
   thanks @klam-shop, I added logic for handling message indexes, pls lmk if it does not makes sense.
   - For the case, wherein the schemaID is specified explicitly through config, the message indexes are determined via another config parameter - message-name. They are skipped during deserialization, since the correct schema descriptor to use is determined during initialization (from message-name).
   - For the case where pre-registered schemaId is not available, we write empty message indexes. For deserialization the message indexes are read from the inputstream for inferring the right schema to use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

Posted by "anupamaggarwal (via GitHub)" <gi...@apache.org>.
anupamaggarwal commented on code in PR #24482:
URL: https://github.com/apache/flink/pull/24482#discussion_r1564639354


##########
flink-formats/flink-protobuf-confluent-registry/src/test/java/org/apache/flink/formats/protobuf/registry/confluent/utils/FlinkToProtoSchemaConverterTest.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent.utils;
+
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.TestLoggerExtension;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FlinkToProtoSchemaConverter}. */
+@ExtendWith(TestLoggerExtension.class)
+class FlinkToProtoSchemaConverterTest {

Review Comment:
   Hi @klam-shop, I don't think I understood your comment completely, if we are converting from the rowType the protobuf schema will only have 1 (root) element. IIUC, we won't have a case which is similar to what is outlined in [wire-format](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format) example.
   I added a test testMessageIndexHandlingInferredFromRowTypeWithConnectDecoder to verify if connect's deserializer is happy with empty message index. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

Posted by "jeremyb-ps (via GitHub)" <gi...@apache.org>.
jeremyb-ps commented on code in PR #24482:
URL: https://github.com/apache/flink/pull/24482#discussion_r1520484664


##########
flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/ProtoToRowDataConverters.java:
##########
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.Type;
+import com.google.protobuf.Descriptors.OneofDescriptor;
+import com.google.protobuf.Message;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.time.LocalDate;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Runtime converters between {@link com.google.protobuf.Message} and {@link
+ * org.apache.flink.table.data.RowData}.
+ */
+public class ProtoToRowDataConverters {
+
+    private static final String KEY_FIELD = "key";
+    private static final String VALUE_FIELD = "value";
+
+    /**
+     * Runtime converter that converts Protobuf data structures into objects of Flink Table & SQL
+     * internal data structures.
+     */
+    @FunctionalInterface
+    public interface ProtoToRowDataConverter extends Serializable {
+
+        Object convert(Object object) throws IOException;
+    }
+
+    // -------------------------------------------------------------------------------------
+    // Runtime Converters
+    // -------------------------------------------------------------------------------------
+
+    // --------------------------------------------------------------------------------
+    // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
+    // necessary because the maven shade plugin cannot relocate classes in
+    // SerializedLambdas (MSHADE-260).
+    // --------------------------------------------------------------------------------
+
+    /** Creates a runtime converter. */
+    public static ProtoToRowDataConverter createConverter(
+            Descriptor readSchema, RowType targetType) {
+        if (readSchema.getRealOneofs().isEmpty()) {
+            return createNoOneOfRowConverter(readSchema, targetType);
+        } else {
+            return createOneOfRowConverter(readSchema, targetType);
+        }
+    }
+
+    private static ProtoToRowDataConverter createOneOfRowConverter(
+            Descriptor readSchema, RowType targetType) {
+        final Map<String, OneofDescriptor> oneOfDescriptors =
+                readSchema.getRealOneofs().stream()
+                        .collect(Collectors.toMap(OneofDescriptor::getName, Function.identity()));
+        final Map<String, FieldDescriptor> fieldDescriptors =
+                readSchema.getFields().stream()
+                        .filter(fieldDescriptor -> fieldDescriptor.getRealContainingOneof() == null)
+                        .collect(Collectors.toMap(FieldDescriptor::getName, Function.identity()));
+
+        final int arity = targetType.getFieldCount();
+        final List<OneOfDescriptorWithConverter> oneOfConverters =
+                targetType.getFields().stream()
+                        .filter(field -> oneOfDescriptors.containsKey(field.getName()))
+                        .map(
+                                rowField -> {
+                                    final OneofDescriptor fieldDescriptor =
+                                            oneOfDescriptors.get(rowField.getName());
+                                    return new OneOfDescriptorWithConverter(
+                                            fieldDescriptor,
+                                            createConverter(
+                                                    fieldDescriptor, (RowType) rowField.getType()));
+                                })
+                        .collect(Collectors.toList());
+        final List<FieldDescriptorWithConverter> fieldConverters =
+                targetType.getFields().stream()
+                        .filter(rowField -> !oneOfDescriptors.containsKey(rowField.getName()))
+                        .map(
+                                rowField -> {
+                                    final FieldDescriptor fieldDescriptor =
+                                            fieldDescriptors.get(rowField.getName());
+                                    return new FieldDescriptorWithConverter(
+                                            fieldDescriptor,
+                                            createFieldConverter(
+                                                    fieldDescriptor, rowField.getType()));
+                                })
+                        .collect(Collectors.toList());
+        return new ProtoToRowDataConverter() {
+            @Override
+            public Object convert(Object object) throws IOException {
+                final GenericRowData row = new GenericRowData(arity);
+                final Message message = (Message) object;
+                int i = 0;
+                for (OneOfDescriptorWithConverter descriptorWithConverter : oneOfConverters) {
+                    final OneofDescriptor descriptor = descriptorWithConverter.descriptor;
+                    final ProtoToRowDataConverter converter = descriptorWithConverter.converter;
+                    if (message.hasOneof(descriptor)) {
+                        row.setField(i, converter.convert(message));
+                    }
+                    i++;
+                }
+                for (FieldDescriptorWithConverter descriptorWithConverter : fieldConverters) {
+                    final FieldDescriptor fieldDescriptor = descriptorWithConverter.descriptor;
+                    final ProtoToRowDataConverter converter = descriptorWithConverter.converter;
+                    if (!fieldDescriptor.hasPresence() || message.hasField(fieldDescriptor)) {
+                        row.setField(i, converter.convert(message.getField(fieldDescriptor)));
+                    }
+                    i++;
+                }
+
+                return row;
+            }
+        };
+    }
+
+    private static ProtoToRowDataConverter createNoOneOfRowConverter(
+            Descriptor readSchema, RowType targetType) {
+        final Map<String, FieldDescriptor> fieldDescriptors =
+                readSchema.getFields().stream()
+                        .collect(Collectors.toMap(FieldDescriptor::getName, Function.identity()));
+
+        final int arity = targetType.getFieldCount();
+        final FieldDescriptorWithConverter[] fieldConverters =
+                targetType.getFields().stream()
+                        .map(
+                                rowField -> {
+                                    final FieldDescriptor fieldDescriptor =
+                                            fieldDescriptors.get(rowField.getName());
+                                    return new FieldDescriptorWithConverter(
+                                            fieldDescriptor,
+                                            createFieldConverter(
+                                                    fieldDescriptor, rowField.getType()));
+                                })
+                        .toArray(FieldDescriptorWithConverter[]::new);
+        return new ProtoToRowDataConverter() {
+            @Override
+            public Object convert(Object object) throws IOException {
+                final GenericRowData row = new GenericRowData(arity);
+                final Message message = (Message) object;
+                for (int i = 0; i < arity; i++) {
+                    final FieldDescriptor fieldDescriptor = fieldConverters[i].descriptor;
+                    final ProtoToRowDataConverter converter = fieldConverters[i].converter;
+                    if (!fieldDescriptor.hasPresence() || message.hasField(fieldDescriptor)) {
+                        row.setField(i, converter.convert(message.getField(fieldDescriptor)));
+                    }
+                }
+
+                return row;
+            }
+        };
+    }
+
+    private static class FieldDescriptorWithConverter {
+        final FieldDescriptor descriptor;
+        final ProtoToRowDataConverter converter;
+
+        private FieldDescriptorWithConverter(
+                FieldDescriptor descriptor, ProtoToRowDataConverter converter) {
+            this.descriptor = descriptor;
+            this.converter = converter;
+        }
+    }
+
+    private static class OneOfDescriptorWithConverter {
+        final OneofDescriptor descriptor;
+        final ProtoToRowDataConverter converter;
+
+        private OneOfDescriptorWithConverter(
+                OneofDescriptor descriptor, ProtoToRowDataConverter converter) {
+            this.descriptor = descriptor;
+            this.converter = converter;
+        }
+    }
+
+    private static ProtoToRowDataConverter createConverter(
+            OneofDescriptor readSchema, RowType targetType) {
+        final int arity = targetType.getFieldCount();
+        final Map<FieldDescriptor, Pair<ProtoToRowDataConverter, Integer>> fieldConverters =
+                new HashMap<>();
+        for (int i = 0; i < targetType.getFieldCount(); i++) {
+            final FieldDescriptor fieldDescriptor = readSchema.getField(i);
+            fieldConverters.put(
+                    fieldDescriptor,
+                    Pair.of(createFieldConverter(fieldDescriptor, targetType.getTypeAt(i)), i));
+        }
+        return new ProtoToRowDataConverter() {
+            @Override
+            public Object convert(Object object) throws IOException {
+                final Message message = (Message) object;
+                final GenericRowData row = new GenericRowData(arity);
+                final FieldDescriptor oneofFieldDescriptor =
+                        message.getOneofFieldDescriptor(readSchema);
+                final Pair<ProtoToRowDataConverter, Integer> converters =
+                        fieldConverters.get(oneofFieldDescriptor);
+                row.setField(
+                        converters.getRight(),
+                        converters.getLeft().convert(message.getField(oneofFieldDescriptor)));
+                return row;
+            }
+        };
+    }
+
+    private static ProtoToRowDataConverter createFieldConverter(
+            FieldDescriptor readSchema, LogicalType targetType) {
+        final Type schemaType = readSchema.getType();
+        switch (targetType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                return createStringConverter(targetType, schemaType);
+            case BOOLEAN:
+                return createBooleanConverter(targetType, schemaType);
+            case BINARY:
+            case VARBINARY:
+                return createBinaryConverter(targetType, schemaType);
+            case TIME_WITHOUT_TIME_ZONE:
+                return createTimeConverter();
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return createTimestampConverter();
+            case DATE:
+                return createDateConverter();
+            case DECIMAL:
+                return createDecimalConverter((DecimalType) targetType);
+            case TINYINT:
+                return createTinyIntConverter(targetType, schemaType);
+            case SMALLINT:
+                return createSmallIntConverter(targetType, schemaType);
+            case INTEGER:
+                return createIntegerConverter(targetType, schemaType);
+            case BIGINT:
+                return createBigintConverter(targetType, schemaType);
+            case FLOAT:
+                return createFloatConverter(targetType, schemaType);
+            case DOUBLE:
+                return createDoubleConverter(targetType, schemaType);
+            case ARRAY:
+                return createArrayConverter(readSchema, (ArrayType) targetType);
+            case MULTISET:
+                return createMultisetConverter(readSchema, (MultisetType) targetType);
+            case MAP:
+                return createMapConverter(readSchema, (MapType) targetType);
+            case ROW:
+                return createConverter(readSchema.getMessageType(), (RowType) targetType);
+            case NULL:
+            case RAW:
+            case SYMBOL:
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case DISTINCT_TYPE:
+            case STRUCTURED_TYPE:
+            case INTERVAL_DAY_TIME:
+            case INTERVAL_YEAR_MONTH:
+            case TIMESTAMP_WITH_TIME_ZONE:
+            case UNRESOLVED:
+            default:
+                throw new IllegalStateException(
+                        "Couldn't translate unsupported type " + targetType.getTypeRoot() + ".");
+        }
+    }
+
+    private static ProtoToRowDataConverter createArrayConverter(
+            FieldDescriptor readSchema, ArrayType targetType) {
+        final ProtoToRowDataConverter elementConverter =
+                createFieldConverter(readSchema, targetType.getElementType());
+        final Class<?> elementClass =
+                LogicalTypeUtils.toInternalConversionClass(targetType.getElementType());
+        return new ProtoToRowDataConverter() {
+            @Override
+            public Object convert(Object object) throws IOException {
+                final Collection<?> list = (Collection<?>) object;
+                final int length = list.size();
+                final Object[] array = (Object[]) Array.newInstance(elementClass, length);
+                int i = 0;
+                for (Object o : list) {
+                    array[i] = elementConverter.convert(o);
+                    i++;
+                }
+                return new GenericArrayData(array);
+            }
+        };
+    }
+
+    private static ProtoToRowDataConverter createMapConverter(
+            FieldDescriptor readSchema, MapType targetType) {
+        final FieldDescriptor keySchema = readSchema.getMessageType().findFieldByName(KEY_FIELD);
+        final FieldDescriptor valueSchema =
+                readSchema.getMessageType().findFieldByName(VALUE_FIELD);
+        final ProtoToRowDataConverter keyConverter =
+                createFieldConverter(keySchema, targetType.getKeyType());
+        final ProtoToRowDataConverter valueConverter =
+                createFieldConverter(valueSchema, targetType.getValueType());
+        return createMapLikeConverter(keyConverter, valueConverter);
+    }
+
+    private static ProtoToRowDataConverter createMultisetConverter(
+            FieldDescriptor readSchema, MultisetType targetType) {
+        final FieldDescriptor keySchema = readSchema.getMessageType().findFieldByName(KEY_FIELD);
+        final FieldDescriptor valueSchema =
+                readSchema.getMessageType().findFieldByName(VALUE_FIELD);
+        final ProtoToRowDataConverter keyConverter =
+                createFieldConverter(keySchema, targetType.getElementType());
+        final ProtoToRowDataConverter valueConverter =
+                createFieldConverter(valueSchema, new IntType(false));
+        return createMapLikeConverter(keyConverter, valueConverter);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static ProtoToRowDataConverter createMapLikeConverter(
+            ProtoToRowDataConverter keyConverter, ProtoToRowDataConverter valueConverter) {
+        return new ProtoToRowDataConverter() {
+            @Override
+            public Object convert(Object object) throws IOException {
+                final Collection<? extends Message> protoMap =
+                        (Collection<? extends Message>) object;
+                final Map<Object, Object> map = new HashMap<>();
+                for (Message message : protoMap) {
+                    final Descriptor descriptor = message.getDescriptorForType();
+                    final Object elemKey = message.getField(descriptor.findFieldByName(KEY_FIELD));
+                    final Object elemValue =
+                            message.getField(descriptor.findFieldByName(VALUE_FIELD));
+
+                    final Object key = keyConverter.convert(elemKey);
+                    final Object value = valueConverter.convert(elemValue);
+                    map.put(key, value);
+                }
+                return new GenericMapData(map);
+            }
+        };
+    }
+
+    private static ProtoToRowDataConverter createStringConverter(
+            LogicalType targetType, Type schemaType) {
+        if (schemaType == Type.STRING || schemaType == Type.ENUM) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return StringData.fromString(object.toString());
+                }
+            };
+        } else if (schemaType == Type.MESSAGE) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return StringData.fromString(extractValueField(object).toString());
+                }
+            };
+        } else {
+            throw unexpectedTypeForSchema(schemaType, targetType.getTypeRoot());
+        }
+    }
+
+    private static ProtoToRowDataConverter createBooleanConverter(
+            LogicalType targetType, Type schemaType) {
+        if (schemaType == Type.BOOL) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return object;
+                }
+            };
+        } else if (schemaType == Type.MESSAGE) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return extractValueField(object);
+                }
+            };
+        } else {
+            throw unexpectedTypeForSchema(schemaType, targetType.getTypeRoot());
+        }
+    }
+
+    private static ProtoToRowDataConverter createBinaryConverter(
+            LogicalType targetType, Type schemaType) {
+        if (schemaType == Type.BYTES) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return ((ByteString) object).toByteArray();
+                }
+            };
+        } else if (schemaType == Type.MESSAGE) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return ((ByteString) extractValueField(object)).toByteArray();
+                }
+            };
+        } else {
+            throw unexpectedTypeForSchema(schemaType, targetType.getTypeRoot());
+        }
+    }
+
+    private static ProtoToRowDataConverter createTimeConverter() {
+        return new ProtoToRowDataConverter() {
+
+            @Override
+            public Object convert(Object object) {
+                final Message message = (Message) object;
+                int hours = 0;
+                int minutes = 0;
+                int seconds = 0;
+                int nanos = 0;
+                for (Map.Entry<FieldDescriptor, Object> entry : message.getAllFields().entrySet()) {
+                    if (entry.getKey().getName().equals("hours")) {
+                        hours = ((Number) entry.getValue()).intValue();
+                    } else if (entry.getKey().getName().equals("minutes")) {
+                        minutes = ((Number) entry.getValue()).intValue();
+                    } else if (entry.getKey().getName().equals("seconds")) {
+                        seconds = ((Number) entry.getValue()).intValue();
+                    } else if (entry.getKey().getName().equals("nanos")) {
+                        nanos = ((Number) entry.getValue()).intValue();
+                    }
+                }
+
+                return hours * 3600000 + minutes * 60000 + seconds * 1000 + nanos / 1000_000;
+            }
+        };
+    }
+
+    private static ProtoToRowDataConverter createTinyIntConverter(
+            LogicalType targetType, Type schemaType) {
+        if (schemaType == Type.INT32 || schemaType == Type.SINT32 || schemaType == Type.SFIXED32) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return ((Number) object).byteValue();
+                }
+            };
+        } else if (schemaType == Type.MESSAGE) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return ((Number) extractValueField(object)).byteValue();
+                }
+            };
+        } else {
+            throw unexpectedTypeForSchema(schemaType, targetType.getTypeRoot());
+        }
+    }
+
+    private static ProtoToRowDataConverter createSmallIntConverter(
+            LogicalType targetType, Type schemaType) {
+        if (schemaType == Type.INT32 || schemaType == Type.SINT32 || schemaType == Type.SFIXED32) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return ((Number) object).shortValue();
+                }
+            };
+        } else if (schemaType == Type.MESSAGE) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return ((Number) extractValueField(object)).shortValue();
+                }
+            };
+        } else {
+            throw unexpectedTypeForSchema(schemaType, targetType.getTypeRoot());
+        }
+    }
+
+    private static ProtoToRowDataConverter createIntegerConverter(
+            LogicalType targetType, Type schemaType) {
+        if (schemaType == Type.INT32 || schemaType == Type.SINT32 || schemaType == Type.SFIXED32) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return ((Number) object).intValue();
+                }
+            };
+        } else if (schemaType == Type.MESSAGE) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return ((Number) extractValueField(object)).intValue();
+                }
+            };
+        } else {
+            throw unexpectedTypeForSchema(schemaType, targetType.getTypeRoot());
+        }
+    }
+
+    private static ProtoToRowDataConverter createDoubleConverter(
+            LogicalType targetType, Type schemaType) {
+        if (schemaType == Type.DOUBLE) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return ((Number) object).doubleValue();
+                }
+            };
+        } else if (schemaType == Type.MESSAGE) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return ((Number) extractValueField(object)).doubleValue();
+                }
+            };
+        } else {
+            throw unexpectedTypeForSchema(schemaType, targetType.getTypeRoot());
+        }
+    }
+
+    private static ProtoToRowDataConverter createFloatConverter(
+            LogicalType targetType, Type schemaType) {
+        if (schemaType == Type.FLOAT) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return ((Number) object).floatValue();
+                }
+            };
+        } else if (schemaType == Type.MESSAGE) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return ((Number) extractValueField(object)).floatValue();
+                }
+            };
+        } else {
+            throw unexpectedTypeForSchema(schemaType, targetType.getTypeRoot());
+        }
+    }
+
+    private static ProtoToRowDataConverter createBigintConverter(
+            LogicalType targetType, Type schemaType) {
+        if (schemaType == Type.UINT32
+                || schemaType == Type.FIXED32
+                || schemaType == Type.INT64
+                || schemaType == Type.UINT64
+                || schemaType == Type.SINT64
+                || schemaType == Type.FIXED64
+                || schemaType == Type.SFIXED64) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return ((Number) object).longValue();
+                }
+            };
+        } else if (schemaType == Type.MESSAGE) {
+            return new ProtoToRowDataConverter() {
+                @Override
+                public Object convert(Object object) {
+                    return ((Number) extractValueField(object)).byteValue();

Review Comment:
   ```suggestion
                       return ((Number) extractValueField(object)).longValue();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

Posted by "rmetzger (via GitHub)" <gi...@apache.org>.
rmetzger commented on code in PR #24482:
URL: https://github.com/apache/flink/pull/24482#discussion_r1520055438


##########
flink-formats/flink-protobuf-confluent-registry/pom.xml:
##########
@@ -0,0 +1,171 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-formats</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.20-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-protobuf-confluent-registry</artifactId>
+
+	<name>Flink : Formats : Profobuf confluent registry</name>
+
+	<properties>
+		<confluent.version>7.5.3</confluent.version>
+		<schema.registry.version>7.5.0-22</schema.registry.version>

Review Comment:
   Looks like this dependency is not available publicly: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58222&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

Posted by "klam-shop (via GitHub)" <gi...@apache.org>.
klam-shop commented on code in PR #24482:
URL: https://github.com/apache/flink/pull/24482#discussion_r1565935343


##########
flink-formats/flink-protobuf-confluent-registry/src/test/java/org/apache/flink/formats/protobuf/registry/confluent/utils/FlinkToProtoSchemaConverterTest.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent.utils;
+
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.TestLoggerExtension;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FlinkToProtoSchemaConverter}. */
+@ExtendWith(TestLoggerExtension.class)
+class FlinkToProtoSchemaConverterTest {

Review Comment:
   > if we are converting from the rowType the protobuf schema will only have 1 (root) element. IIUC, we won't have a case which is similar to what is outlined in [wire-format](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format) example.
   
   Yep, I agree with that. My point was just that the serializer code assumes 1 root element and is not tightly coupled t to the rowType -> protobuf schema code.
   
   If the rowType -> protobuf schema code changes such that theres more than 1 root element, but the serializer is not updated, it will continue to write empty message indices, which would be incorrect. So, to avoid that scenario, I suggest considering to add a test to assert there's only 1 root element (and maybe a comment saying if you're removing this test, you may need to update the serialization code to handle message indices)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

Posted by "klam-shop (via GitHub)" <gi...@apache.org>.
klam-shop commented on PR #24482:
URL: https://github.com/apache/flink/pull/24482#issuecomment-2099270127

   👋 Thanks for working on this Anupam. Any updates on this PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

Posted by "klam-shop (via GitHub)" <gi...@apache.org>.
klam-shop commented on code in PR #24482:
URL: https://github.com/apache/flink/pull/24482#discussion_r1551822003


##########
flink-formats/flink-protobuf-confluent-registry/src/test/java/org/apache/flink/formats/protobuf/registry/confluent/utils/FlinkToProtoSchemaConverterTest.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent.utils;
+
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.TestLoggerExtension;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FlinkToProtoSchemaConverter}. */
+@ExtendWith(TestLoggerExtension.class)
+class FlinkToProtoSchemaConverterTest {

Review Comment:
   If we're relying on the root message to be first in the generated ProtoSchema, so we can set Empty Message Indices when serializing, can we test that? 



##########
flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/ProtoRegistrySerializationSchema.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.protobuf.DynamicMessage;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * Serialization schema that serializes an object of Flink internal data structure with a Protobuf
+ * format.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using {@link
+ * ProtoRegistryDeserializationSchema}.
+ */
+public class ProtoRegistrySerializationSchema implements SerializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    /** RowType to generate the runtime converter. */
+    private final RowType rowType;
+
+    private final SchemaRegistryConfig schemaRegistryConfig;
+
+    /** The converter that converts internal data formats to JsonNode. */
+    private transient RowDataToProtoConverters.RowDataToProtoConverter runtimeConverter;
+
+    private transient SchemaRegistryCoder schemaCoder;
+    /** Output stream to write message to. */
+    private transient ByteArrayOutputStream arrayOutputStream;
+
+    public ProtoRegistrySerializationSchema(SchemaRegistryConfig registryConfig, RowType rowType) {
+        this.rowType = Preconditions.checkNotNull(rowType);
+        this.schemaRegistryConfig = Preconditions.checkNotNull(registryConfig);
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        final SchemaRegistryClient schemaRegistryClient = schemaRegistryConfig.createClient();
+        this.schemaCoder =
+                new SchemaRegistryCoder(schemaRegistryConfig.getSchemaId(), schemaRegistryClient);
+        final ProtobufSchema schema =
+                (ProtobufSchema)
+                        schemaRegistryClient.getSchemaById(schemaRegistryConfig.getSchemaId());
+        this.runtimeConverter =
+                RowDataToProtoConverters.createConverter(rowType, schema.toDescriptor());
+        this.arrayOutputStream = new ByteArrayOutputStream();
+    }
+
+    @Override
+    public byte[] serialize(RowData row) {
+        try {
+            final DynamicMessage converted = (DynamicMessage) runtimeConverter.convert(row);
+
+            arrayOutputStream.reset();
+            schemaCoder.writeSchema(arrayOutputStream);
+            final ByteBuffer buffer = writeMessageIndexes();
+            arrayOutputStream.write(buffer.array());
+            converted.writeTo(arrayOutputStream);
+            return arrayOutputStream.toByteArray();
+        } catch (Throwable t) {
+            throw new FlinkRuntimeException(String.format("Could not serialize row '%s'.", row), t);
+        }
+    }
+
+    private static ByteBuffer writeMessageIndexes() {
+        //write empty message indices for now
+        ByteBuffer buffer = ByteBuffer.allocate(ByteUtils.sizeOfVarint(0));
+        ByteUtils.writeVarint(0, buffer);
+        return buffer;

Review Comment:
   Looks good from a quick look, thanks @anupamaggarwal !!



##########
flink-formats/flink-protobuf-confluent-registry/src/test/java/org/apache/flink/formats/protobuf/registry/confluent/ProtoRegistrySerialisationDeserialisationTest.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.protobuf.registry.confluent.utils.FlinkToProtoSchemaConverter;
+import org.apache.flink.formats.protobuf.registry.confluent.utils.MockInitializationContext;
+import org.apache.flink.formats.protobuf.registry.confluent.utils.ProtoToFlinkSchemaConverter;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.TestLoggerExtension;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import io.confluent.connect.protobuf.ProtobufConverter;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Smoke tests for checking {@link ProtoRegistrySerializationSchema} and {@link
+ * ProtoRegistryDeserializationSchema}.
+ *
+ * <p>For more thorough tests on converting different types see {@link RowDataToProtoConvertersTest}
+ * and/or {@link ProtoToRowDataConvertersTest}.
+ */
+@ExtendWith(TestLoggerExtension.class)
+public class ProtoRegistrySerialisationDeserialisationTest {

Review Comment:
   nitpick. rename file and class to fix spelling of Deserialization 
   ```suggestion
   public class ProtoRegistrySerialisationDeserializationTest {
   ```



##########
flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/SchemaCoderProviders.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import org.apache.flink.formats.protobuf.registry.confluent.utils.FlinkToProtoSchemaConverter;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.protobuf.MessageIndexes;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static java.lang.String.format;
+
+/** Factory for {@link org.apache.flink.formats.protobuf.registry.confluent.SchemaCoder}. */
+public class SchemaCoderProviders {
+
+    /**
+     * Creates a {@link org.apache.flink.formats.protobuf.registry.confluent.SchemaCoder} in cases
+     * where the schema has already been setup before-hand/exists in Confluent Schema Registry.
+     *
+     * <p>Useful in scenarios where users want to be more explicit with schemas used. In these cases
+     * the external schema specified through schemaId will take precedence for encoding/decoding
+     * data. Also, the step of registering with schemaRegistry during serialization, will be
+     * skipped.
+     *
+     * <p>A single Schema Registry Protobuf entry may contain multiple Protobuf messages, some of
+     * which may have nested messages. The messageName identifies the exact message/schema to use
+     * for serialization/deserialization. Consider the following protobuf message
+     *
+     * <pre>
+     * package test.package;
+     * message MessageA {
+     *     message MessageB {
+     *         message MessageC {
+     *         ...
+     *         }
+     *     }
+     *     message MessageD {
+     *     ...
+     *     }
+     *     message MessageE {
+     *         message MessageF {
+     *         ...
+     *         }
+     *         message MessageG {
+     *         ...
+     *         }
+     *     ...
+     *     }
+     * ...
+     * }
+     * </pre>
+     *
+     * <p>In order to use messageD the messageName should contain the value of
+     * test.package.messageD. Similarly, for messageF to be used messageName should contain
+     * test.package.MessageE.MessageF.
+     *
+     * @param schemaId SchemaId for external schema referenced for encoding/decoding of payload.
+     * @param messageName Optional message name to be used to select the right {@link
+     *     com.google.protobuf.Message} for Serialialization/Deserialization. In absence of
+     *     messageName the outermost message will be used.
+     * @param schemaRegistryClient client handle to Schema Registry {@link
+     *     io.confluent.kafka.schemaregistry.client.SchemaRegistryClient}
+     * @return
+     */
+    public static SchemaCoder createForPreRegisteredSchema(
+            int schemaId, @Nullable String messageName, SchemaRegistryClient schemaRegistryClient) {
+        return new PreRegisteredSchemaCoder(schemaId, messageName, schemaRegistryClient);
+    }
+
+    /**
+     * Creates a default schema coder.
+     *
+     * <p>For serialization schema coder will infer the schema from Flink {@link
+     * org.apache.flink.table.types.logical.RowType}. Schema obtained from rowType will also be
+     * registered to Schema Registry using the subject passed in by invoking {@link
+     * io.confluent.kafka.schemaregistry.client.SchemaRegistryClient#register(String,
+     * io.confluent.kafka.schemaregistry.ParsedSchema)}.
+     *
+     * <p>For deserialization schema coder will infer schema from InputStream. In cases where
+     * messageIndexes indicate using a nested schema, the appropriate nested schema will be used.
+     *
+     * @param subject Subject to use for registering schema (only required for serialization).
+     * @param rowType Flink Row type.
+     * @param schemaRegistryClient Client for SchemaRegistry
+     * @return SchemaCoder to use.
+     */
+    public static SchemaCoder createDefault(
+            String subject, RowType rowType, SchemaRegistryClient schemaRegistryClient) {
+        return new DefaultSchemaCoder(subject, rowType, schemaRegistryClient);
+    }
+
+    /**
+     * Default implementation of SchemaCoder.
+     *
+     * <p>Parses schema information from inputStream for de-serialization. For Serialization, uses
+     * Flink Row Type to infer schema and registers this schema with Schema Registry.
+     */
+    static class DefaultSchemaCoder extends SchemaCoder {
+        private static final String ROW = "row";
+        private static final String PACKAGE = "io.confluent.generated";
+        private static final List<Integer> DEFAULT_INDEX = Collections.singletonList(0);
+        /** Subject can be nullable in case coder is only used for deserialization. */
+        private @Nullable final String subject;
+
+        private final ProtobufSchema rowSchema;
+        private final SchemaRegistryClient schemaRegistryClient;
+
+        public DefaultSchemaCoder(
+                @Nullable String subject,
+                RowType rowType,
+                SchemaRegistryClient schemaRegistryClient) {
+            this.subject = subject;
+            rowSchema =
+                    FlinkToProtoSchemaConverter.fromFlinkRowType(
+                            Preconditions.checkNotNull(rowType), ROW, PACKAGE);
+            this.schemaRegistryClient = Preconditions.checkNotNull(schemaRegistryClient);
+        }
+
+        public static MessageIndexes readMessageIndex(DataInputStream input) throws IOException {
+
+            int size = ByteUtils.readVarint(input);
+            if (size == 0) {
+                return new MessageIndexes(DEFAULT_INDEX);
+            } else {
+                List<Integer> indexes = new ArrayList<>(size);
+
+                for (int i = 0; i < size; ++i) {
+                    indexes.add(ByteUtils.readVarint(input));
+                }
+                return new MessageIndexes(indexes);
+            }
+        }
+
+        @Override
+        public ProtobufSchema readSchema(InputStream in) throws IOException {
+            DataInputStream dataInputStream = new DataInputStream(in);
+
+            if (dataInputStream.readByte() != 0) {
+                throw new IOException("Unknown data format. Magic number does not match");
+            } else {
+                int schemaId = dataInputStream.readInt();
+                try {
+                    ProtobufSchema schema =
+                            (ProtobufSchema) schemaRegistryClient.getSchemaById(schemaId);
+                    MessageIndexes indexes = readMessageIndex(dataInputStream);
+                    String name = schema.toMessageName(indexes);
+                    schema = schema.copy(name);
+                    return schema;
+                } catch (RestClientException e) {
+                    throw new IOException(
+                            format("Could not find schema with id %s in registry", schemaId), e);
+                }
+            }
+        }
+
+        @Override
+        public ProtobufSchema writerSchema() {
+            return rowSchema;
+        }
+
+        @Override
+        public void writeSchema(OutputStream out) throws IOException {
+            out.write(CONFLUENT_MAGIC_BYTE);
+            int schemaId = 0;
+            try {
+                schemaId = schemaRegistryClient.register(subject, rowSchema);
+                writeInt(out, schemaId);
+                final ByteBuffer buffer = emptyMessageIndexes();

Review Comment:
   Might be worth adding a comment to clarify that. 
   
   Just note that this is brittle, if the FlinkToProtoSchema code changes such that the message order changes, this will break. 



##########
flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/SchemaCoderProviders.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import org.apache.flink.formats.protobuf.registry.confluent.utils.FlinkToProtoSchemaConverter;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.protobuf.MessageIndexes;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static java.lang.String.format;
+
+/** Factory for {@link org.apache.flink.formats.protobuf.registry.confluent.SchemaCoder}. */
+public class SchemaCoderProviders {
+
+    /**
+     * Creates a {@link org.apache.flink.formats.protobuf.registry.confluent.SchemaCoder} in cases
+     * where the schema has already been setup before-hand/exists in Confluent Schema Registry.
+     *
+     * <p>Useful in scenarios where users want to be more explicit with schemas used. In these cases
+     * the external schema specified through schemaId will take precedence for encoding/decoding
+     * data. Also, the step of registering with schemaRegistry during serialization, will be
+     * skipped.
+     *
+     * <p>A single Schema Registry Protobuf entry may contain multiple Protobuf messages, some of
+     * which may have nested messages. The messageName identifies the exact message/schema to use
+     * for serialization/deserialization. Consider the following protobuf message
+     *
+     * <pre>
+     * package test.package;
+     * message MessageA {
+     *     message MessageB {
+     *         message MessageC {
+     *         ...
+     *         }
+     *     }
+     *     message MessageD {
+     *     ...
+     *     }
+     *     message MessageE {
+     *         message MessageF {
+     *         ...
+     *         }
+     *         message MessageG {
+     *         ...
+     *         }
+     *     ...
+     *     }
+     * ...
+     * }
+     * </pre>
+     *
+     * <p>In order to use messageD the messageName should contain the value of
+     * test.package.messageD. Similarly, for messageF to be used messageName should contain
+     * test.package.MessageE.MessageF.
+     *
+     * @param schemaId SchemaId for external schema referenced for encoding/decoding of payload.
+     * @param messageName Optional message name to be used to select the right {@link
+     *     com.google.protobuf.Message} for Serialialization/Deserialization. In absence of
+     *     messageName the outermost message will be used.
+     * @param schemaRegistryClient client handle to Schema Registry {@link
+     *     io.confluent.kafka.schemaregistry.client.SchemaRegistryClient}
+     * @return
+     */
+    public static SchemaCoder createForPreRegisteredSchema(
+            int schemaId, @Nullable String messageName, SchemaRegistryClient schemaRegistryClient) {
+        return new PreRegisteredSchemaCoder(schemaId, messageName, schemaRegistryClient);
+    }
+
+    /**
+     * Creates a default schema coder.
+     *
+     * <p>For serialization schema coder will infer the schema from Flink {@link
+     * org.apache.flink.table.types.logical.RowType}. Schema obtained from rowType will also be
+     * registered to Schema Registry using the subject passed in by invoking {@link
+     * io.confluent.kafka.schemaregistry.client.SchemaRegistryClient#register(String,
+     * io.confluent.kafka.schemaregistry.ParsedSchema)}.
+     *
+     * <p>For deserialization schema coder will infer schema from InputStream. In cases where
+     * messageIndexes indicate using a nested schema, the appropriate nested schema will be used.
+     *
+     * @param subject Subject to use for registering schema (only required for serialization).
+     * @param rowType Flink Row type.
+     * @param schemaRegistryClient Client for SchemaRegistry
+     * @return SchemaCoder to use.
+     */
+    public static SchemaCoder createDefault(
+            String subject, RowType rowType, SchemaRegistryClient schemaRegistryClient) {
+        return new DefaultSchemaCoder(subject, rowType, schemaRegistryClient);
+    }
+
+    /**
+     * Default implementation of SchemaCoder.
+     *
+     * <p>Parses schema information from inputStream for de-serialization. For Serialization, uses
+     * Flink Row Type to infer schema and registers this schema with Schema Registry.
+     */
+    static class DefaultSchemaCoder extends SchemaCoder {
+        private static final String ROW = "row";
+        private static final String PACKAGE = "io.confluent.generated";
+        private static final List<Integer> DEFAULT_INDEX = Collections.singletonList(0);
+        /** Subject can be nullable in case coder is only used for deserialization. */
+        private @Nullable final String subject;
+
+        private final ProtobufSchema rowSchema;
+        private final SchemaRegistryClient schemaRegistryClient;
+
+        public DefaultSchemaCoder(
+                @Nullable String subject,
+                RowType rowType,
+                SchemaRegistryClient schemaRegistryClient) {
+            this.subject = subject;
+            rowSchema =
+                    FlinkToProtoSchemaConverter.fromFlinkRowType(
+                            Preconditions.checkNotNull(rowType), ROW, PACKAGE);
+            this.schemaRegistryClient = Preconditions.checkNotNull(schemaRegistryClient);
+        }
+
+        public static MessageIndexes readMessageIndex(DataInputStream input) throws IOException {
+
+            int size = ByteUtils.readVarint(input);
+            if (size == 0) {
+                return new MessageIndexes(DEFAULT_INDEX);
+            } else {
+                List<Integer> indexes = new ArrayList<>(size);
+
+                for (int i = 0; i < size; ++i) {
+                    indexes.add(ByteUtils.readVarint(input));
+                }
+                return new MessageIndexes(indexes);
+            }
+        }
+
+        @Override
+        public ProtobufSchema readSchema(InputStream in) throws IOException {
+            DataInputStream dataInputStream = new DataInputStream(in);
+
+            if (dataInputStream.readByte() != 0) {
+                throw new IOException("Unknown data format. Magic number does not match");
+            } else {
+                int schemaId = dataInputStream.readInt();
+                try {
+                    ProtobufSchema schema =
+                            (ProtobufSchema) schemaRegistryClient.getSchemaById(schemaId);
+                    MessageIndexes indexes = readMessageIndex(dataInputStream);
+                    String name = schema.toMessageName(indexes);
+                    schema = schema.copy(name);
+                    return schema;
+                } catch (RestClientException e) {
+                    throw new IOException(
+                            format("Could not find schema with id %s in registry", schemaId), e);
+                }
+            }
+        }
+
+        @Override
+        public ProtobufSchema writerSchema() {
+            return rowSchema;
+        }
+
+        @Override
+        public void writeSchema(OutputStream out) throws IOException {
+            out.write(CONFLUENT_MAGIC_BYTE);
+            int schemaId = 0;
+            try {
+                schemaId = schemaRegistryClient.register(subject, rowSchema);
+                writeInt(out, schemaId);
+                final ByteBuffer buffer = emptyMessageIndexes();

Review Comment:
   We can set empty message indices because the dynamically generated Protobuf Message from Flink RowData will have the root message first. 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

Posted by "anupamaggarwal (via GitHub)" <gi...@apache.org>.
anupamaggarwal commented on code in PR #24482:
URL: https://github.com/apache/flink/pull/24482#discussion_r1548361941


##########
flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/ProtoRegistrySerializationSchema.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.protobuf.DynamicMessage;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * Serialization schema that serializes an object of Flink internal data structure with a Protobuf
+ * format.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using {@link
+ * ProtoRegistryDeserializationSchema}.
+ */
+public class ProtoRegistrySerializationSchema implements SerializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    /** RowType to generate the runtime converter. */
+    private final RowType rowType;
+
+    private final SchemaRegistryConfig schemaRegistryConfig;
+
+    /** The converter that converts internal data formats to JsonNode. */
+    private transient RowDataToProtoConverters.RowDataToProtoConverter runtimeConverter;
+
+    private transient SchemaRegistryCoder schemaCoder;
+    /** Output stream to write message to. */
+    private transient ByteArrayOutputStream arrayOutputStream;
+
+    public ProtoRegistrySerializationSchema(SchemaRegistryConfig registryConfig, RowType rowType) {
+        this.rowType = Preconditions.checkNotNull(rowType);
+        this.schemaRegistryConfig = Preconditions.checkNotNull(registryConfig);
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        final SchemaRegistryClient schemaRegistryClient = schemaRegistryConfig.createClient();
+        this.schemaCoder =
+                new SchemaRegistryCoder(schemaRegistryConfig.getSchemaId(), schemaRegistryClient);
+        final ProtobufSchema schema =
+                (ProtobufSchema)
+                        schemaRegistryClient.getSchemaById(schemaRegistryConfig.getSchemaId());
+        this.runtimeConverter =
+                RowDataToProtoConverters.createConverter(rowType, schema.toDescriptor());
+        this.arrayOutputStream = new ByteArrayOutputStream();
+    }
+
+    @Override
+    public byte[] serialize(RowData row) {
+        try {
+            final DynamicMessage converted = (DynamicMessage) runtimeConverter.convert(row);
+
+            arrayOutputStream.reset();
+            schemaCoder.writeSchema(arrayOutputStream);
+            final ByteBuffer buffer = writeMessageIndexes();
+            arrayOutputStream.write(buffer.array());
+            converted.writeTo(arrayOutputStream);
+            return arrayOutputStream.toByteArray();
+        } catch (Throwable t) {
+            throw new FlinkRuntimeException(String.format("Could not serialize row '%s'.", row), t);
+        }
+    }
+
+    private static ByteBuffer writeMessageIndexes() {
+        //write empty message indices for now
+        ByteBuffer buffer = ByteBuffer.allocate(ByteUtils.sizeOfVarint(0));
+        ByteUtils.writeVarint(0, buffer);
+        return buffer;

Review Comment:
   thanks @klam-shop, I added logic for handling message indexes. 
   - For the case, wherein the schemaID is specified explicitly through config, the message indexes are determined via another config parameter - message-name. They are skipped during deserialization, since the correct schema descriptor to use is determined during initialization (from message-name).
   - For the case where pre-registered schemaId is not available, we write empty message indexes. For deserialization the message indexes are read from the inputstream for inferring the right schema to use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #24482:
URL: https://github.com/apache/flink/pull/24482#issuecomment-1988514486

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "808ee4160daf8fb9f5f06271ab6fd280915bec92",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "808ee4160daf8fb9f5f06271ab6fd280915bec92",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 808ee4160daf8fb9f5f06271ab6fd280915bec92 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

Posted by "klam-shop (via GitHub)" <gi...@apache.org>.
klam-shop commented on code in PR #24482:
URL: https://github.com/apache/flink/pull/24482#discussion_r1520345251


##########
flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/SchemaRegistryCoder.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import io.confluent.kafka.schemaregistry.ParsedSchema;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static java.lang.String.format;
+
+/** Reads and Writes schema using Confluent Schema Registry protocol. */
+public class SchemaRegistryCoder {

Review Comment:
   Should we reuse the exissting `SchemaCoder` interface already in Flink avro-confluent format?
   https://github.com/apache/flink/blob/2ec2a606a4f041bfa45569700f97df556f10779d/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java#L32



##########
flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/ProtoRegistryDeserializationSchema.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.protobuf.registry.confluent.utils.MutableByteArrayInputStream;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.DynamicMessage;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A {@link DeserializationSchema} that deserializes {@link RowData} from Protobuf messages using
+ * Schema Registry protocol.
+ */
+public class ProtoRegistryDeserializationSchema implements DeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final SchemaRegistryConfig schemaRegistryConfig;
+    private final RowType rowType;
+    private final TypeInformation<RowData> producedType;
+
+    /** Input stream to read message from. */
+    private transient MutableByteArrayInputStream inputStream;
+
+    private transient SchemaRegistryCoder schemaCoder;
+
+    private transient ProtoToRowDataConverters.ProtoToRowDataConverter runtimeConverter;
+    private transient Descriptor descriptor;
+
+    public ProtoRegistryDeserializationSchema(
+            SchemaRegistryConfig schemaRegistryConfig,
+            RowType rowType,
+            TypeInformation<RowData> producedType) {
+        this.schemaRegistryConfig = Preconditions.checkNotNull(schemaRegistryConfig);
+        this.rowType = Preconditions.checkNotNull(rowType);
+        this.producedType = Preconditions.checkNotNull(producedType);
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        final SchemaRegistryClient schemaRegistryClient = schemaRegistryConfig.createClient();
+        this.schemaCoder =
+                new SchemaRegistryCoder(schemaRegistryConfig.getSchemaId(), schemaRegistryClient);
+        final ProtobufSchema schema =
+                (ProtobufSchema)
+                        schemaRegistryClient.getSchemaById(schemaRegistryConfig.getSchemaId());
+        this.descriptor = schema.toDescriptor();
+        this.runtimeConverter = ProtoToRowDataConverters.createConverter(descriptor, rowType);
+        this.inputStream = new MutableByteArrayInputStream();
+    }
+
+    @Override
+    public RowData deserialize(byte[] message) throws IOException {
+        if (message == null) {
+            return null;
+        }
+        try {
+            inputStream.setBuffer(message);
+            schemaCoder.readSchema(inputStream);
+            // Not sure what the message indexes are, it is some Confluent Schema Registry Protobuf
+            // magic. Until we figure out what that is, let's skip it
+            skipMessageIndexes(inputStream);

Review Comment:
   I think this code should parse the Message Indexes in order to determine what Protobuf Message to use, in the case where the fetched schema defines multiple Messages. 
   
   (Note: it does make sense it for passing bytes to deserialize to DynamicMessage to skip the Message Indexes, but as above and below, we need them to determine which Protobuf Message to use.)
   
   The Message Indexes are part of the Wire format, described here:
   https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
   
   >The Protobuf serialization format appends a list of message indexes after the magic-byte and schema-id. So, the Protobuf serialization format is:
   > magic-byte, schema-id, message-indexes, protobuf-payload
   > where message-indexes is an array of indexes that corresponds to the message type (which may be nested). A single Schema Registry Protobuf entry may contain multiple Protobuf messages, some of which may have nested messages. **The role of message-indexes is to identify which Protobuf message in the Schema Registry entry to use.**
   



##########
flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/ProtoRegistrySerializationSchema.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.protobuf.DynamicMessage;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * Serialization schema that serializes an object of Flink internal data structure with a Protobuf
+ * format.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using {@link
+ * ProtoRegistryDeserializationSchema}.
+ */
+public class ProtoRegistrySerializationSchema implements SerializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    /** RowType to generate the runtime converter. */
+    private final RowType rowType;
+
+    private final SchemaRegistryConfig schemaRegistryConfig;
+
+    /** The converter that converts internal data formats to JsonNode. */
+    private transient RowDataToProtoConverters.RowDataToProtoConverter runtimeConverter;
+
+    private transient SchemaRegistryCoder schemaCoder;
+    /** Output stream to write message to. */
+    private transient ByteArrayOutputStream arrayOutputStream;
+
+    public ProtoRegistrySerializationSchema(SchemaRegistryConfig registryConfig, RowType rowType) {
+        this.rowType = Preconditions.checkNotNull(rowType);
+        this.schemaRegistryConfig = Preconditions.checkNotNull(registryConfig);
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        final SchemaRegistryClient schemaRegistryClient = schemaRegistryConfig.createClient();
+        this.schemaCoder =
+                new SchemaRegistryCoder(schemaRegistryConfig.getSchemaId(), schemaRegistryClient);
+        final ProtobufSchema schema =
+                (ProtobufSchema)
+                        schemaRegistryClient.getSchemaById(schemaRegistryConfig.getSchemaId());
+        this.runtimeConverter =
+                RowDataToProtoConverters.createConverter(rowType, schema.toDescriptor());
+        this.arrayOutputStream = new ByteArrayOutputStream();
+    }
+
+    @Override
+    public byte[] serialize(RowData row) {
+        try {
+            final DynamicMessage converted = (DynamicMessage) runtimeConverter.convert(row);
+
+            arrayOutputStream.reset();
+            schemaCoder.writeSchema(arrayOutputStream);
+            final ByteBuffer buffer = writeMessageIndexes();
+            arrayOutputStream.write(buffer.array());
+            converted.writeTo(arrayOutputStream);
+            return arrayOutputStream.toByteArray();
+        } catch (Throwable t) {
+            throw new FlinkRuntimeException(String.format("Could not serialize row '%s'.", row), t);
+        }
+    }
+
+    private static ByteBuffer writeMessageIndexes() {
+        //write empty message indices for now
+        ByteBuffer buffer = ByteBuffer.allocate(ByteUtils.sizeOfVarint(0));
+        ByteUtils.writeVarint(0, buffer);
+        return buffer;

Review Comment:
   Similar to other comment, I think this code should handle Message Indexes properly to support schemas with multiple Messages. 
   
   Refer to the wire format for protobuf:
   https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

Posted by "anupamaggarwal (via GitHub)" <gi...@apache.org>.
anupamaggarwal commented on code in PR #24482:
URL: https://github.com/apache/flink/pull/24482#discussion_r1564640169


##########
flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/SchemaCoderProviders.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import org.apache.flink.formats.protobuf.registry.confluent.utils.FlinkToProtoSchemaConverter;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.protobuf.MessageIndexes;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static java.lang.String.format;
+
+/** Factory for {@link org.apache.flink.formats.protobuf.registry.confluent.SchemaCoder}. */
+public class SchemaCoderProviders {
+
+    /**
+     * Creates a {@link org.apache.flink.formats.protobuf.registry.confluent.SchemaCoder} in cases
+     * where the schema has already been setup before-hand/exists in Confluent Schema Registry.
+     *
+     * <p>Useful in scenarios where users want to be more explicit with schemas used. In these cases
+     * the external schema specified through schemaId will take precedence for encoding/decoding
+     * data. Also, the step of registering with schemaRegistry during serialization, will be
+     * skipped.
+     *
+     * <p>A single Schema Registry Protobuf entry may contain multiple Protobuf messages, some of
+     * which may have nested messages. The messageName identifies the exact message/schema to use
+     * for serialization/deserialization. Consider the following protobuf message
+     *
+     * <pre>
+     * package test.package;
+     * message MessageA {
+     *     message MessageB {
+     *         message MessageC {
+     *         ...
+     *         }
+     *     }
+     *     message MessageD {
+     *     ...
+     *     }
+     *     message MessageE {
+     *         message MessageF {
+     *         ...
+     *         }
+     *         message MessageG {
+     *         ...
+     *         }
+     *     ...
+     *     }
+     * ...
+     * }
+     * </pre>
+     *
+     * <p>In order to use messageD the messageName should contain the value of
+     * test.package.messageD. Similarly, for messageF to be used messageName should contain
+     * test.package.MessageE.MessageF.
+     *
+     * @param schemaId SchemaId for external schema referenced for encoding/decoding of payload.
+     * @param messageName Optional message name to be used to select the right {@link
+     *     com.google.protobuf.Message} for Serialialization/Deserialization. In absence of
+     *     messageName the outermost message will be used.
+     * @param schemaRegistryClient client handle to Schema Registry {@link
+     *     io.confluent.kafka.schemaregistry.client.SchemaRegistryClient}
+     * @return
+     */
+    public static SchemaCoder createForPreRegisteredSchema(
+            int schemaId, @Nullable String messageName, SchemaRegistryClient schemaRegistryClient) {
+        return new PreRegisteredSchemaCoder(schemaId, messageName, schemaRegistryClient);
+    }
+
+    /**
+     * Creates a default schema coder.
+     *
+     * <p>For serialization schema coder will infer the schema from Flink {@link
+     * org.apache.flink.table.types.logical.RowType}. Schema obtained from rowType will also be
+     * registered to Schema Registry using the subject passed in by invoking {@link
+     * io.confluent.kafka.schemaregistry.client.SchemaRegistryClient#register(String,
+     * io.confluent.kafka.schemaregistry.ParsedSchema)}.
+     *
+     * <p>For deserialization schema coder will infer schema from InputStream. In cases where
+     * messageIndexes indicate using a nested schema, the appropriate nested schema will be used.
+     *
+     * @param subject Subject to use for registering schema (only required for serialization).
+     * @param rowType Flink Row type.
+     * @param schemaRegistryClient Client for SchemaRegistry
+     * @return SchemaCoder to use.
+     */
+    public static SchemaCoder createDefault(
+            String subject, RowType rowType, SchemaRegistryClient schemaRegistryClient) {
+        return new DefaultSchemaCoder(subject, rowType, schemaRegistryClient);
+    }
+
+    /**
+     * Default implementation of SchemaCoder.
+     *
+     * <p>Parses schema information from inputStream for de-serialization. For Serialization, uses
+     * Flink Row Type to infer schema and registers this schema with Schema Registry.
+     */
+    static class DefaultSchemaCoder extends SchemaCoder {
+        private static final String ROW = "row";
+        private static final String PACKAGE = "io.confluent.generated";
+        private static final List<Integer> DEFAULT_INDEX = Collections.singletonList(0);
+        /** Subject can be nullable in case coder is only used for deserialization. */
+        private @Nullable final String subject;
+
+        private final ProtobufSchema rowSchema;
+        private final SchemaRegistryClient schemaRegistryClient;
+
+        public DefaultSchemaCoder(
+                @Nullable String subject,
+                RowType rowType,
+                SchemaRegistryClient schemaRegistryClient) {
+            this.subject = subject;
+            rowSchema =
+                    FlinkToProtoSchemaConverter.fromFlinkRowType(
+                            Preconditions.checkNotNull(rowType), ROW, PACKAGE);
+            this.schemaRegistryClient = Preconditions.checkNotNull(schemaRegistryClient);
+        }
+
+        public static MessageIndexes readMessageIndex(DataInputStream input) throws IOException {
+
+            int size = ByteUtils.readVarint(input);
+            if (size == 0) {
+                return new MessageIndexes(DEFAULT_INDEX);
+            } else {
+                List<Integer> indexes = new ArrayList<>(size);
+
+                for (int i = 0; i < size; ++i) {
+                    indexes.add(ByteUtils.readVarint(input));
+                }
+                return new MessageIndexes(indexes);
+            }
+        }
+
+        @Override
+        public ProtobufSchema readSchema(InputStream in) throws IOException {
+            DataInputStream dataInputStream = new DataInputStream(in);
+
+            if (dataInputStream.readByte() != 0) {
+                throw new IOException("Unknown data format. Magic number does not match");
+            } else {
+                int schemaId = dataInputStream.readInt();
+                try {
+                    ProtobufSchema schema =
+                            (ProtobufSchema) schemaRegistryClient.getSchemaById(schemaId);
+                    MessageIndexes indexes = readMessageIndex(dataInputStream);
+                    String name = schema.toMessageName(indexes);
+                    schema = schema.copy(name);
+                    return schema;
+                } catch (RestClientException e) {
+                    throw new IOException(
+                            format("Could not find schema with id %s in registry", schemaId), e);
+                }
+            }
+        }
+
+        @Override
+        public ProtobufSchema writerSchema() {
+            return rowSchema;
+        }
+
+        @Override
+        public void writeSchema(OutputStream out) throws IOException {
+            out.write(CONFLUENT_MAGIC_BYTE);
+            int schemaId = 0;
+            try {
+                schemaId = schemaRegistryClient.register(subject, rowSchema);
+                writeInt(out, schemaId);
+                final ByteBuffer buffer = emptyMessageIndexes();

Review Comment:
   Added a comment [here](https://github.com/anupamaggarwal/flink/blob/d0f410636fcf608fecbf530e8dbe6be3ef3e7d02/flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/SchemaCoderProviders.java#L147)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org