You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/11 04:13:28 UTC

[GitHub] [flink] libenchao commented on a diff in pull request #14376: [FLINK-18202][PB format] New Format of protobuf

libenchao commented on code in PR #14376:
URL: https://github.com/apache/flink/pull/14376#discussion_r917351320


##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatConfig.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.formats.protobuf.PbFormatOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.flink.formats.protobuf.PbFormatOptions.READ_DEFAULT_VALUES;
+import static org.apache.flink.formats.protobuf.PbFormatOptions.WRITE_NULL_STRING_LITERAL;
+
+/** Config of protobuf configs. */
+public class PbFormatConfig implements Serializable {

Review Comment:
   1. Provide a `serialVersionUID` for `Serializable`s
   2. Do you think we should separate 'read' and 'write' configs? (Separate them in different sections for fields declarations, or use different class for reader and writer)



##########
flink-formats/flink-protobuf/pom.xml:
##########
@@ -0,0 +1,123 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-formats</artifactId>
+		<version>1.16-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>flink-protobuf</artifactId>
+	<name>Flink : Formats : Protobuf</name>
+
+	<packaging>jar</packaging>
+
+	<properties>
+		<!-- the same with flink-table/pom.xml -->
+		<janino.version>3.0.11</janino.version>
+	</properties>
+
+	<dependencies>
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.protobuf</groupId>
+			<artifactId>protobuf-java</artifactId>
+			<version>${protoc.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.codehaus.janino</groupId>
+			<artifactId>janino</artifactId>
+			<!-- this should be the same version of flink-table module -->
+			<version>${janino.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- test dependencies -->
+		<!-- JSON RowData schema test dependency -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- Filesystem format factory ITCase test dependency -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>

Review Comment:
   You added `flink-table-planner` twice.



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.util;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbConstant;
+import org.apache.flink.formats.protobuf.PbFormatContext;
+import org.apache.flink.formats.protobuf.serialize.PbCodegenSerializeFactory;
+import org.apache.flink.formats.protobuf.serialize.PbCodegenSerializer;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Codegen utils only used in protobuf format. */
+public class PbCodegenUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(PbCodegenUtils.class);
+
+    /**
+     * @param flinkContainerCode code phrase which represent flink container type like row/array in
+     *     codegen sections
+     * @param index the index number in flink container type
+     * @param eleType the element type
+     */
+    public static String flinkContainerElementCode(
+            String flinkContainerCode, String index, LogicalType eleType) {
+        switch (eleType.getTypeRoot()) {
+            case INTEGER:
+                return flinkContainerCode + ".getInt(" + index + ")";
+            case BIGINT:
+                return flinkContainerCode + ".getLong(" + index + ")";
+            case FLOAT:
+                return flinkContainerCode + ".getFloat(" + index + ")";
+            case DOUBLE:
+                return flinkContainerCode + ".getDouble(" + index + ")";
+            case BOOLEAN:
+                return flinkContainerCode + ".getBoolean(" + index + ")";
+            case VARCHAR:
+            case CHAR:
+                return flinkContainerCode + ".getString(" + index + ")";
+            case VARBINARY:
+            case BINARY:
+                return flinkContainerCode + ".getBinary(" + index + ")";
+            case ROW:
+                int size = eleType.getChildren().size();
+                return flinkContainerCode + ".getRow(" + index + ", " + size + ")";
+            case MAP:
+                return flinkContainerCode + ".getMap(" + index + ")";
+            case ARRAY:
+                return flinkContainerCode + ".getArray(" + index + ")";
+            default:
+                throw new IllegalArgumentException("Unsupported data type in schema: " + eleType);
+        }
+    }
+
+    /**
+     * Get java type str from {@link FieldDescriptor} which directly fetched from protobuf object.
+     *
+     * @return The returned code phrase will be used as java type str in codegen sections.
+     * @throws PbCodegenException
+     */
+    public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList, String outerPrefix)
+            throws PbCodegenException {
+        String typeStr;
+        switch (fd.getJavaType()) {
+            case MESSAGE:
+                if (fd.isMapField()) {
+                    // map
+                    FieldDescriptor keyFd =
+                            fd.getMessageType().findFieldByName(PbConstant.PB_MAP_KEY_NAME);
+                    FieldDescriptor valueFd =
+                            fd.getMessageType().findFieldByName(PbConstant.PB_MAP_VALUE_NAME);
+                    // key and value cannot be repeated
+                    String keyTypeStr = getTypeStrFromProto(keyFd, false, outerPrefix);
+                    String valueTypeStr = getTypeStrFromProto(valueFd, false, outerPrefix);
+                    typeStr = "Map<" + keyTypeStr + "," + valueTypeStr + ">";
+                } else {
+                    // simple message
+                    typeStr = PbFormatUtils.getFullJavaName(fd.getMessageType(), outerPrefix);
+                }
+                break;
+            case INT:
+                typeStr = "Integer";
+                break;
+            case LONG:
+                typeStr = "Long";
+                break;
+            case STRING:
+                typeStr = "String";
+                break;
+            case ENUM:
+                typeStr = PbFormatUtils.getFullJavaName(fd.getEnumType(), outerPrefix);
+                break;
+            case FLOAT:
+                typeStr = "Float";
+                break;
+            case DOUBLE:
+                typeStr = "Double";
+                break;
+            case BYTE_STRING:
+                typeStr = "ByteString";
+                break;
+            case BOOLEAN:
+                typeStr = "Boolean";
+                break;
+            default:
+                throw new PbCodegenException("do not support field type: " + fd.getJavaType());
+        }
+        if (isList) {
+            return "List<" + typeStr + ">";
+        } else {
+            return typeStr;
+        }
+    }
+
+    /**
+     * Get java type str from {@link LogicalType} which directly fetched from flink type.
+     *
+     * @return The returned code phrase will be used as java type str in codegen sections.
+     */
+    public static String getTypeStrFromLogicType(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case INTEGER:
+                return "int";
+            case BIGINT:
+                return "long";
+            case FLOAT:
+                return "float";
+            case DOUBLE:
+                return "double";
+            case BOOLEAN:
+                return "boolean";
+            case VARCHAR:
+            case CHAR:
+                return "StringData";
+            case VARBINARY:
+            case BINARY:
+                return "byte[]";
+            case ROW:
+                return "RowData";
+            case MAP:
+                return "MapData";
+            case ARRAY:
+                return "ArrayData";
+            default:
+                throw new IllegalArgumentException("Unsupported data type in schema: " + type);
+        }
+    }
+
+    /**
+     * Get protobuf default value from {@link FieldDescriptor}.
+     *
+     * @return The java code phrase which represents default value calculation.
+     */
+    public static String pbDefaultValueCode(
+            FieldDescriptor fieldDescriptor, PbFormatContext pbFormatContext)
+            throws PbCodegenException {
+        String outerPrefix = pbFormatContext.getOuterPrefix();
+        String nullLiteral = pbFormatContext.getPbFormatConfig().getWriteNullStringLiterals();
+        switch (fieldDescriptor.getJavaType()) {
+            case MESSAGE:
+                return PbFormatUtils.getFullJavaName(fieldDescriptor.getMessageType(), outerPrefix)
+                        + ".getDefaultInstance()";
+            case INT:
+                return "0";
+            case LONG:
+                return "0L";
+            case STRING:
+                return "\"" + nullLiteral + "\"";

Review Comment:
   Do you think that we need to consider some special characters which need to do escape?



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatFactory.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Table format factory for providing configured instances of Protobuf to RowData {@link
+ * SerializationSchema} and {@link DeserializationSchema}.
+ */
+public class PbFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
+
+    public static final String IDENTIFIER = "protobuf";
+
+    @Override
+    public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
+            DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+        FactoryUtil.validateFactoryOptions(this, formatOptions);
+        return new PbDecodingFormat(buildConfig(formatOptions));
+    }
+
+    @Override
+    public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
+            DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+        FactoryUtil.validateFactoryOptions(this, formatOptions);
+        return new PbEncodingFormat(buildConfig(formatOptions));
+    }
+
+    private static PbFormatConfig buildConfig(ReadableConfig formatOptions) {
+        PbFormatConfig.PbFormatConfigBuilder configBuilder =
+                new PbFormatConfig.PbFormatConfigBuilder();
+        configBuilder.messageClassName(formatOptions.get(PbFormatOptions.MESSAGE_CLASS_NAME));
+        formatOptions
+                .getOptional(PbFormatOptions.IGNORE_PARSE_ERRORS)
+                .ifPresent(configBuilder::ignoreParseErrors);
+        formatOptions
+                .getOptional(PbFormatOptions.READ_DEFAULT_VALUES)
+                .ifPresent(configBuilder::readDefaultValues);
+        formatOptions
+                .getOptional(PbFormatOptions.WRITE_NULL_STRING_LITERAL)
+                .ifPresent(configBuilder::writeNullStringLiterals);
+        return configBuilder.build();
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> result = new HashSet<>();
+        result.add(PbFormatOptions.MESSAGE_CLASS_NAME);
+        return result;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {

Review Comment:
   Add `PbFormatOptions#WRITE_NULL_STRING_LITERAL` to the optional options set. Also add some tests to verify that it works as expected.



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.deserialize;
+
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbConstant;
+import org.apache.flink.formats.protobuf.PbFormatConfig;
+import org.apache.flink.formats.protobuf.PbFormatContext;
+import org.apache.flink.formats.protobuf.util.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.util.PbCodegenUtils;
+import org.apache.flink.formats.protobuf.util.PbFormatUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * {@link ProtoToRowConverter} can convert binary protobuf message data to flink row data by codegen
+ * process.
+ */
+public class ProtoToRowConverter {
+    private static final Logger LOG = LoggerFactory.getLogger(ProtoToRowConverter.class);
+    private final Method parseFromMethod;
+    private final Method decodeMethod;
+
+    public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
+            throws PbCodegenException {
+        try {
+            String outerPrefix =
+                    PbFormatUtils.getOuterProtoPrefix(formatConfig.getMessageClassName());
+            Descriptors.Descriptor descriptor =
+                    PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
+            Class<?> messageClass =
+                    Class.forName(
+                            formatConfig.getMessageClassName(),
+                            true,
+                            Thread.currentThread().getContextClassLoader());
+            String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor, outerPrefix);
+            if (descriptor.getFile().getSyntax() == Syntax.PROTO3) {
+                // pb3 always read default values
+                formatConfig =
+                        new PbFormatConfig(
+                                formatConfig.getMessageClassName(),
+                                formatConfig.isIgnoreParseErrors(),
+                                true,
+                                formatConfig.getWriteNullStringLiterals());
+            }
+            PbCodegenAppender codegenAppender = new PbCodegenAppender();
+            PbFormatContext pbFormatContext = new PbFormatContext(outerPrefix, formatConfig);
+            String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
+            String generatedClassName = "GeneratedProtoToRow_" + uuid;
+            String generatedPackageName = ProtoToRowConverter.class.getPackage().getName();
+            codegenAppender.appendLine("package " + generatedPackageName);
+            codegenAppender.appendLine("import " + RowData.class.getName());
+            codegenAppender.appendLine("import " + ArrayData.class.getName());
+            codegenAppender.appendLine("import " + BinaryStringData.class.getName());
+            codegenAppender.appendLine("import " + GenericRowData.class.getName());
+            codegenAppender.appendLine("import " + GenericMapData.class.getName());
+            codegenAppender.appendLine("import " + GenericArrayData.class.getName());
+            codegenAppender.appendLine("import " + ArrayList.class.getName());
+            codegenAppender.appendLine("import " + List.class.getName());
+            codegenAppender.appendLine("import " + Map.class.getName());
+            codegenAppender.appendLine("import " + HashMap.class.getName());
+            codegenAppender.appendLine("import " + ByteString.class.getName());
+
+            codegenAppender.appendSegment("public class " + generatedClassName + "{");
+            codegenAppender.appendSegment(
+                    "public static RowData "
+                            + PbConstant.GENERATED_DECODE_METHOD
+                            + "("
+                            + fullMessageClassName
+                            + " message){");
+            codegenAppender.appendLine("RowData rowData=null");
+            PbCodegenDeserializer codegenDes =
+                    PbCodegenDeserializeFactory.getPbCodegenTopRowDes(
+                            descriptor, rowType, pbFormatContext);
+            String genCode = codegenDes.codegen("rowData", "message", 0);
+            codegenAppender.appendSegment(genCode);
+            codegenAppender.appendLine("return rowData");
+            codegenAppender.appendSegment("}");
+            codegenAppender.appendSegment("}");
+
+            String printCode = codegenAppender.printWithLineNumber();
+            Files.write(new File("/tmp/printCode").toPath(), printCode.getBytes());

Review Comment:
   What's this line for?



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbSchemaValidatorUtils.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.util;
+
+import org.apache.flink.formats.protobuf.PbConstant;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+import java.util.EnumMap;
+import java.util.EnumSet;
+
+/** Validation class to verify protobuf definition and flink schema. */
+public class PbSchemaValidatorUtils {

Review Comment:
   `PbSchemaValidatorUtils` -> `PbSchemaValidationUtils`



##########
flink-formats/flink-sql-protobuf/pom.xml:
##########
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-formats</artifactId>
+		<version>1.16-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>flink-sql-protobuf</artifactId>
+	<name>Flink : Formats : SQL Protobuf</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-parquet</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<includes>
+									<include>org.apache.flink:flink-protobuf</include>
+									<include>com.google.protobuf:protobuf-java</include>
+								</includes>
+							</artifactSet>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- skip dependency convergence due to Hadoop dependency -->

Review Comment:
   Do you need to do this for `flink-sql-protobuf`?



##########
flink-formats/flink-protobuf/pom.xml:
##########
@@ -0,0 +1,123 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-formats</artifactId>
+		<version>1.16-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>flink-protobuf</artifactId>
+	<name>Flink : Formats : Protobuf</name>
+
+	<packaging>jar</packaging>
+
+	<properties>
+		<!-- the same with flink-table/pom.xml -->
+		<janino.version>3.0.11</janino.version>
+	</properties>
+
+	<dependencies>
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.protobuf</groupId>
+			<artifactId>protobuf-java</artifactId>
+			<version>${protoc.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.codehaus.janino</groupId>
+			<artifactId>janino</artifactId>
+			<!-- this should be the same version of flink-table module -->
+			<version>${janino.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- test dependencies -->
+		<!-- JSON RowData schema test dependency -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- Filesystem format factory ITCase test dependency -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<!-- test utils dependency -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>com.github.os72</groupId>
+				<artifactId>protoc-jar-maven-plugin</artifactId>
+				<version>3.11.4</version>
+				<executions>
+					<execution>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>run</goal>
+						</goals>
+						<configuration>
+							<protocVersion>${protoc.version}</protocVersion>

Review Comment:
   I see that the output of generated proto classes are packaged into the jar, could you specify the 'outputDirectories' to make it in the test directory?



##########
flink-formats/flink-protobuf/pom.xml:
##########
@@ -0,0 +1,123 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-formats</artifactId>
+		<version>1.16-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>flink-protobuf</artifactId>
+	<name>Flink : Formats : Protobuf</name>
+
+	<packaging>jar</packaging>
+
+	<properties>
+		<!-- the same with flink-table/pom.xml -->
+		<janino.version>3.0.11</janino.version>
+	</properties>
+
+	<dependencies>
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.protobuf</groupId>
+			<artifactId>protobuf-java</artifactId>
+			<version>${protoc.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.codehaus.janino</groupId>
+			<artifactId>janino</artifactId>
+			<!-- this should be the same version of flink-table module -->
+			<version>${janino.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- test dependencies -->
+		<!-- JSON RowData schema test dependency -->

Review Comment:
   the comment is wrong



##########
flink-formats/flink-sql-protobuf/pom.xml:
##########
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-formats</artifactId>
+		<version>1.16-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>flink-sql-protobuf</artifactId>
+	<name>Flink : Formats : SQL Protobuf</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-parquet</artifactId>

Review Comment:
   This is wrong...  (Could you also help to check that your bundled sql jar could work well? Both in user classloader and parent classloader)



##########
flink-formats/flink-protobuf/pom.xml:
##########
@@ -0,0 +1,123 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-formats</artifactId>
+		<version>1.16-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>flink-protobuf</artifactId>
+	<name>Flink : Formats : Protobuf</name>
+
+	<packaging>jar</packaging>
+
+	<properties>
+		<!-- the same with flink-table/pom.xml -->
+		<janino.version>3.0.11</janino.version>
+	</properties>
+
+	<dependencies>
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.protobuf</groupId>
+			<artifactId>protobuf-java</artifactId>
+			<version>${protoc.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.codehaus.janino</groupId>
+			<artifactId>janino</artifactId>
+			<!-- this should be the same version of flink-table module -->
+			<version>${janino.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- test dependencies -->
+		<!-- JSON RowData schema test dependency -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- Filesystem format factory ITCase test dependency -->

Review Comment:
   the comment is wrong



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.serialize;
+
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbFormatContext;
+import org.apache.flink.formats.protobuf.util.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.util.PbCodegenVarId;
+import org.apache.flink.formats.protobuf.util.PbFormatUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+/** Serializer to convert flink simple type data to proto simple type object. */
+public class PbCodegenSimpleSerializer implements PbCodegenSerializer {
+    private final Descriptors.FieldDescriptor fd;
+    private final LogicalType type;
+    private final PbFormatContext formatContext;
+
+    public PbCodegenSimpleSerializer(
+            Descriptors.FieldDescriptor fd, LogicalType type, PbFormatContext formatContext) {
+        this.fd = fd;
+        this.type = type;
+        this.formatContext = formatContext;
+    }
+
+    @Override
+    public String codegen(String resultVar, String flinkObjectCode, int indent) throws PbCodegenException {

Review Comment:
   line is too long. (PS, you can manually trigger the CI after new code changes)



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbFormatUtils.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.util;
+
+import org.apache.flink.formats.protobuf.PbConstant;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+
+import com.google.protobuf.Descriptors;
+
+/** Protobuf function util. */
+public class PbFormatUtils {
+
+    /**
+     * protobuf code has a bug that, f_abc_7d will be converted to fAbc7d in {@link
+     * Descriptors.FieldDescriptor#getJsonName()}, but actually we need fAbc7D.
+     */
+    public static String fieldNameToJsonName(String name) {

Review Comment:
   Consider this case: `vpr6s`, your method will transform to `getVpr6s`, but the actual generated method is `getVpr6S`.  
   Hence I'm wondering whether there is a more general way to do this name converting? E.g. does 'proto-java' has any util method, or is there any third party library which could do this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org