You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/23 13:10:13 UTC

[GitHub] [flink] libenchao commented on a diff in pull request #14376: [FLINK-18202][PB format] New Format of protobuf

libenchao commented on code in PR #14376:
URL: https://github.com/apache/flink/pull/14376#discussion_r904965759


##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenDeserializer.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.deserialize;
+
+import org.apache.flink.formats.protobuf.PbCodegenException;
+
+/**
+ * {@link PbCodegenDeserializer} is responsible for converting protobuf object to flink internal
+ * data by codegen process. The codegen procedure could be considered as
+ *
+ * <PRE>{@code returnVarName = codegen(pbGetStr) }
+ * </PRE>
+ */
+public interface PbCodegenDeserializer {
+    /**
+     * @param returnInternalDataVarName the final var name that is calculated by codegen. This var
+     *     name will be used by outsider codegen environment. {@code returnInternalDataVarName}
+     *     should be flink data object
+     * @param pbGetStr may be a variable or expression. Current codegen environment can use this
+     *     literal name directly to access the input. {@code pbGetStr} is a value coming from
+     *     protobuf object
+     * @return The java code generated
+     */
+    String codegen(String returnInternalDataVarName, String pbGetStr) throws PbCodegenException;

Review Comment:
   When reading the code, these two names confused me for a while. How about we name them: `resultVariable` and `pbObject`?



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.deserialize;
+
+import org.apache.flink.formats.protobuf.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbCodegenVarId;
+import org.apache.flink.formats.protobuf.PbFormatContext;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+
+/** Deserializer to convert proto message type object to flink row type data. */
+public class PbCodegenRowDeserializer implements PbCodegenDeserializer {
+    private final Descriptor descriptor;
+    private final RowType rowType;
+    private final PbFormatContext formatContext;
+
+    public PbCodegenRowDeserializer(
+            Descriptor descriptor, RowType rowType, PbFormatContext formatContext) {
+        this.rowType = rowType;
+        this.descriptor = descriptor;
+        this.formatContext = formatContext;
+    }
+
+    @Override
+    public String codegen(String returnInternalDataVarName, String pbGetStr)
+            throws PbCodegenException {
+        // The type of messageGetStr is a native pb object,

Review Comment:
   `messageGetStr` does not exist. I guess this should be `pbGetStr`? There are other places which have this problems, please also fix them.



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbRowDataSerializationSchema.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.serialize;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbFormatConfig;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.formats.protobuf.PbSchemaValidator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.google.protobuf.Descriptors;
+
+/**
+ * Serialization schema from Flink to Protobuf types.
+ *
+ * <p>Serializes a {@link RowData } to protobuf binary data.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped {@link FlinkRuntimeException}.
+ */
+public class PbRowDataSerializationSchema implements SerializationSchema<RowData> {

Review Comment:
   add a `serialVersionUID` to it. (One general rule, we always prefer to explicitly add a `serialVersionUID` for `Serializable` class)



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbCodegenAppender.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+/** Helper class which do code fragment concat. */
+public class PbCodegenAppender {
+    private StringBuilder sb;
+
+    public PbCodegenAppender() {
+        sb = new StringBuilder();
+    }
+
+    public void appendLine(String code) {
+        sb.append(code + ";\n");

Review Comment:
   how about doing it using two append:`sb.append(code).append(";\n")`? (As I can see, this will save us one String concatenation.)



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbSchemaValidator.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+import java.util.EnumMap;
+import java.util.EnumSet;
+
+/** Validation class to verify protobuf definition and flink schema. */
+public class PbSchemaValidator {
+    private Descriptors.Descriptor descriptor;
+    private RowType rowType;
+
+    private static final EnumMap<JavaType, EnumSet<LogicalTypeRoot>> TYPE_MATCH_MAP =
+            new EnumMap(JavaType.class);
+
+    static {
+        TYPE_MATCH_MAP.put(JavaType.BOOLEAN, EnumSet.of(LogicalTypeRoot.BOOLEAN));
+        TYPE_MATCH_MAP.put(
+                JavaType.BYTE_STRING,
+                EnumSet.of(LogicalTypeRoot.BINARY, LogicalTypeRoot.VARBINARY));
+        TYPE_MATCH_MAP.put(JavaType.DOUBLE, EnumSet.of(LogicalTypeRoot.DOUBLE));
+        TYPE_MATCH_MAP.put(JavaType.FLOAT, EnumSet.of(LogicalTypeRoot.FLOAT));
+        TYPE_MATCH_MAP.put(
+                JavaType.ENUM,
+                EnumSet.of(
+                        LogicalTypeRoot.VARCHAR,
+                        LogicalTypeRoot.CHAR,
+                        LogicalTypeRoot.TINYINT,
+                        LogicalTypeRoot.SMALLINT,
+                        LogicalTypeRoot.INTEGER,
+                        LogicalTypeRoot.BIGINT));
+        TYPE_MATCH_MAP.put(
+                JavaType.STRING, EnumSet.of(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.CHAR));
+        TYPE_MATCH_MAP.put(JavaType.INT, EnumSet.of(LogicalTypeRoot.INTEGER));
+        TYPE_MATCH_MAP.put(JavaType.LONG, EnumSet.of(LogicalTypeRoot.BIGINT));
+    }
+
+    public PbSchemaValidator(Descriptors.Descriptor descriptor, RowType rowType) {

Review Comment:
   We don't need to instantiate an object, a static method could be enough?



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbSchemaValidator.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+import java.util.EnumMap;
+import java.util.EnumSet;
+
+/** Validation class to verify protobuf definition and flink schema. */
+public class PbSchemaValidator {
+    private Descriptors.Descriptor descriptor;
+    private RowType rowType;
+
+    private static final EnumMap<JavaType, EnumSet<LogicalTypeRoot>> TYPE_MATCH_MAP =
+            new EnumMap(JavaType.class);

Review Comment:
   Another general rule, we do not suggest to use raw type.
   ```suggestion
               new EnumMap<>(JavaType.class);
   ```



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSerializeFactory.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.serialize;
+
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbFormatContext;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+
+/** Codegen factory class which return {@link PbCodegenSerializer} of different data type. */
+public class PbCodegenSerializeFactory {
+    public static PbCodegenSerializer getPbCodegenSer(
+            Descriptors.FieldDescriptor fd, LogicalType type, PbFormatContext formatContext)
+            throws PbCodegenException {
+        if (type instanceof RowType) {
+            return new PbCodegenRowSerializer(fd.getMessageType(), (RowType) type, formatContext);
+        } else if (PbFormatUtils.isSimpleType(type)) {
+            return new PbCodegenSimpleSerializer(fd, type, formatContext);
+        } else if (type instanceof ArrayType) {
+            return new PbCodegenArraySerializer(
+                    fd, ((ArrayType) type).getElementType(), formatContext);
+        } else if (type instanceof MapType) {
+            return new PbCodegenMapSerializer(fd, (MapType) type, formatContext);
+        } else {
+            throw new PbCodegenException("Cannot support flink data type: " + type);

Review Comment:
   ```suggestion
               throw new PbCodegenException("Do not support data type: " + type);
   ```



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbCodegenAppender.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+/** Helper class which do code fragment concat. */
+public class PbCodegenAppender {
+    private StringBuilder sb;
+
+    public PbCodegenAppender() {
+        sb = new StringBuilder();
+    }
+
+    public void appendLine(String code) {
+        sb.append(code + ";\n");
+    }
+
+    public void appendSegment(String code) {
+        sb.append(code + "\n");
+    }
+
+    public String code() {
+        return sb.toString();
+    }
+
+    public static String printWithLineNumber(String code) {

Review Comment:
   This method is never used?



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbCodegenAppender.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+/** Helper class which do code fragment concat. */
+public class PbCodegenAppender {
+    private StringBuilder sb;

Review Comment:
   Another general rule, we make the fields `final` as much as possible
   ```suggestion
       private final StringBuilder sb;
   ```



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbRowDataSerializationSchema.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.serialize;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbFormatConfig;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.formats.protobuf.PbSchemaValidator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.google.protobuf.Descriptors;
+
+/**
+ * Serialization schema from Flink to Protobuf types.
+ *
+ * <p>Serializes a {@link RowData } to protobuf binary data.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped {@link FlinkRuntimeException}.
+ */
+public class PbRowDataSerializationSchema implements SerializationSchema<RowData> {
+
+    private final RowType rowType;
+
+    private final PbFormatConfig pbFormatConfig;
+
+    private transient RowToProtoConverter rowToProtoConverter;
+
+    public PbRowDataSerializationSchema(RowType rowType, PbFormatConfig pbFormatConfig) {
+        this.rowType = rowType;
+        this.pbFormatConfig = pbFormatConfig;
+        Descriptors.Descriptor descriptor =
+                PbFormatUtils.getDescriptor(pbFormatConfig.getMessageClassName());
+        new PbSchemaValidator(descriptor, rowType).validate();
+        try {
+            // validate converter in client side to early detect errors
+            rowToProtoConverter = new RowToProtoConverter(rowType, pbFormatConfig);

Review Comment:
   I don't know what kind of errors will be thrown by this. As I can see, `new PbSchemaValidator(descriptor, rowType).validate()` has already done the work?



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+/** Keeps protobuf constants separately. */
+public class PbConstant {
+    public static final String PB_METHOD_GET_DESCRIPTOR = "getDescriptor";
+    public static final String PB_METHOD_PARSE_FROM = "parseFrom";
+    public static final String GENERATED_DECODE_METHOD = "decode";
+    public static final String GENERATED_ENCODE_METHOD = "encode";
+    public static final String PB_MAP_KEY_NAME = "key";
+    public static final String PB_MAP_VALUE_NAME = "value";
+    public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass";

Review Comment:
   This is not used?



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatUtils.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+
+import com.google.protobuf.Descriptors;
+
+/** Protobuf function util. */
+public class PbFormatUtils {
+
+    /**
+     * protobuf code has a bug that, f_abc_7d will be convert to fAbc7d in {@code
+     * com.google.protobuf.Descriptors.FileDescriptor.getJsonName()}, but actually we need fAbc7D.
+     */

Review Comment:
   ```suggestion
       /**
        * protobuf code has a bug that, f_abc_7d will be converted to fAbc7d in {@link
        * Descriptors.FieldDescriptor#getJsonName()}, but actually we need fAbc7D.
        */
   ```



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSerializeFactory.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.serialize;
+
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbFormatContext;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+
+/** Codegen factory class which return {@link PbCodegenSerializer} of different data type. */
+public class PbCodegenSerializeFactory {
+    public static PbCodegenSerializer getPbCodegenSer(
+            Descriptors.FieldDescriptor fd, LogicalType type, PbFormatContext formatContext)

Review Comment:
   I don't know if there is a forced rule about the method styles, but I prefer below style personally:
   ```java
   public static PbCodegenSerializer getPbCodegenSer(
           Descriptors.FieldDescriptor fd,
           LogicalType type,
           PbFormatContext formatContext) throws PbCodegenException {
       // code;
   }
   ```



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbCodegenAppender.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+/** Helper class which do code fragment concat. */
+public class PbCodegenAppender {
+    private StringBuilder sb;
+
+    public PbCodegenAppender() {

Review Comment:
   How about we add an `indent` to it, and assign a good value to it in `PbCodegenDeserializer` and `PbCodegenSerializer`, this will make the generated code much readable.



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbSchemaValidator.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+import java.util.EnumMap;
+import java.util.EnumSet;
+
+/** Validation class to verify protobuf definition and flink schema. */
+public class PbSchemaValidator {
+    private Descriptors.Descriptor descriptor;
+    private RowType rowType;
+
+    private static final EnumMap<JavaType, EnumSet<LogicalTypeRoot>> TYPE_MATCH_MAP =
+            new EnumMap(JavaType.class);
+
+    static {
+        TYPE_MATCH_MAP.put(JavaType.BOOLEAN, EnumSet.of(LogicalTypeRoot.BOOLEAN));
+        TYPE_MATCH_MAP.put(
+                JavaType.BYTE_STRING,
+                EnumSet.of(LogicalTypeRoot.BINARY, LogicalTypeRoot.VARBINARY));
+        TYPE_MATCH_MAP.put(JavaType.DOUBLE, EnumSet.of(LogicalTypeRoot.DOUBLE));
+        TYPE_MATCH_MAP.put(JavaType.FLOAT, EnumSet.of(LogicalTypeRoot.FLOAT));
+        TYPE_MATCH_MAP.put(
+                JavaType.ENUM,
+                EnumSet.of(
+                        LogicalTypeRoot.VARCHAR,
+                        LogicalTypeRoot.CHAR,
+                        LogicalTypeRoot.TINYINT,
+                        LogicalTypeRoot.SMALLINT,
+                        LogicalTypeRoot.INTEGER,
+                        LogicalTypeRoot.BIGINT));
+        TYPE_MATCH_MAP.put(
+                JavaType.STRING, EnumSet.of(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.CHAR));
+        TYPE_MATCH_MAP.put(JavaType.INT, EnumSet.of(LogicalTypeRoot.INTEGER));
+        TYPE_MATCH_MAP.put(JavaType.LONG, EnumSet.of(LogicalTypeRoot.BIGINT));
+    }
+
+    public PbSchemaValidator(Descriptors.Descriptor descriptor, RowType rowType) {
+        this.descriptor = descriptor;
+        this.rowType = rowType;
+    }
+
+    public Descriptors.Descriptor getDescriptor() {
+        return descriptor;
+    }
+
+    public void setDescriptor(Descriptors.Descriptor descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    public void setRowType(RowType rowType) {
+        this.rowType = rowType;
+    }
+
+    public void validate() {
+        validateTypeMatch(descriptor, rowType);
+    }
+
+    /**
+     * Validate type match of row type.
+     *
+     * @param descriptor the {@link Descriptors.Descriptor} of the protobuf object.
+     * @param rowType the corresponding {@link RowType} to the {@link Descriptors.Descriptor}
+     */
+    public void validateTypeMatch(Descriptors.Descriptor descriptor, RowType rowType) {

Review Comment:
   `public` -> `private`.  
   Another general rule, we'd better to make the method `private` as much as possible.



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatUtils.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+
+import com.google.protobuf.Descriptors;
+
+/** Protobuf function util. */
+public class PbFormatUtils {
+
+    /**
+     * protobuf code has a bug that, f_abc_7d will be convert to fAbc7d in {@code
+     * com.google.protobuf.Descriptors.FileDescriptor.getJsonName()}, but actually we need fAbc7D.
+     */
+    public static String fieldNameToJsonName(String name) {
+        final int length = name.length();
+        StringBuilder result = new StringBuilder(length);
+        boolean isNextUpperCase = false;
+        for (int i = 0; i < length; i++) {
+            char ch = name.charAt(i);
+            if (ch == '_') {
+                isNextUpperCase = true;
+            } else if (isNextUpperCase) {
+                if ('a' <= ch && ch <= 'z') {
+                    ch = (char) (ch - 'a' + 'A');
+                    isNextUpperCase = false;
+                }
+                result.append(ch);
+            } else {
+                result.append(ch);
+            }
+        }
+        return result.toString();
+    }
+
+    public static String getFullJavaName(Descriptors.Descriptor descriptor, String outerProtoName) {
+        if (null != descriptor.getContainingType()) {
+            // nested type
+            String parentJavaFullName =
+                    getFullJavaName(descriptor.getContainingType(), outerProtoName);
+            return parentJavaFullName + "." + descriptor.getName();
+        } else {
+            // top level message
+            return outerProtoName + descriptor.getName();
+        }
+    }
+
+    public static String getFullJavaName(
+            Descriptors.EnumDescriptor enumDescriptor, String outerProtoName) {
+        if (null != enumDescriptor.getContainingType()) {
+            return getFullJavaName(enumDescriptor.getContainingType(), outerProtoName)
+                    + "."
+                    + enumDescriptor.getName();
+        } else {
+            return outerProtoName + enumDescriptor.getName();
+        }
+    }
+
+    public static boolean isSimpleType(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case BOOLEAN:
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case CHAR:
+            case VARCHAR:
+            case BINARY:
+            case VARBINARY:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    public static String getStrongCamelCaseJsonName(String name) {
+        String jsonName = fieldNameToJsonName(name);
+        if (jsonName.length() == 1) {
+            return jsonName.toUpperCase();
+        } else {
+            return jsonName.substring(0, 1).toUpperCase() + jsonName.substring(1);
+        }
+    }
+
+    public static String getOuterProtoPrefix(String name) {
+        name = name.replace('$', '.');
+        int index = name.lastIndexOf('.');
+        if (index != -1) {
+            // include dot
+            return name.substring(0, index + 1);
+        } else {
+            return "";
+        }
+    }
+
+    public static Descriptors.Descriptor getDescriptor(String className) {
+        try {
+            Class<?> pbClass =
+                    Class.forName(className, true, Thread.currentThread().getContextClassLoader());
+            return (Descriptors.Descriptor)
+                    pbClass.getMethod(PbConstant.PB_METHOD_GET_DESCRIPTOR).invoke(null);
+        } catch (Exception y) {

Review Comment:
   `y` -> `e`



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbSchemaValidator.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+import java.util.EnumMap;
+import java.util.EnumSet;
+
+/** Validation class to verify protobuf definition and flink schema. */
+public class PbSchemaValidator {
+    private Descriptors.Descriptor descriptor;
+    private RowType rowType;
+
+    private static final EnumMap<JavaType, EnumSet<LogicalTypeRoot>> TYPE_MATCH_MAP =
+            new EnumMap(JavaType.class);
+
+    static {
+        TYPE_MATCH_MAP.put(JavaType.BOOLEAN, EnumSet.of(LogicalTypeRoot.BOOLEAN));
+        TYPE_MATCH_MAP.put(
+                JavaType.BYTE_STRING,
+                EnumSet.of(LogicalTypeRoot.BINARY, LogicalTypeRoot.VARBINARY));
+        TYPE_MATCH_MAP.put(JavaType.DOUBLE, EnumSet.of(LogicalTypeRoot.DOUBLE));
+        TYPE_MATCH_MAP.put(JavaType.FLOAT, EnumSet.of(LogicalTypeRoot.FLOAT));
+        TYPE_MATCH_MAP.put(
+                JavaType.ENUM,
+                EnumSet.of(
+                        LogicalTypeRoot.VARCHAR,
+                        LogicalTypeRoot.CHAR,
+                        LogicalTypeRoot.TINYINT,
+                        LogicalTypeRoot.SMALLINT,
+                        LogicalTypeRoot.INTEGER,
+                        LogicalTypeRoot.BIGINT));
+        TYPE_MATCH_MAP.put(
+                JavaType.STRING, EnumSet.of(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.CHAR));
+        TYPE_MATCH_MAP.put(JavaType.INT, EnumSet.of(LogicalTypeRoot.INTEGER));
+        TYPE_MATCH_MAP.put(JavaType.LONG, EnumSet.of(LogicalTypeRoot.BIGINT));
+    }
+
+    public PbSchemaValidator(Descriptors.Descriptor descriptor, RowType rowType) {
+        this.descriptor = descriptor;
+        this.rowType = rowType;
+    }
+
+    public Descriptors.Descriptor getDescriptor() {

Review Comment:
   these methods are not used.



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatUtils.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+
+import com.google.protobuf.Descriptors;
+
+/** Protobuf function util. */
+public class PbFormatUtils {

Review Comment:
   How about we put these util classes into a separate package `util`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org