You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/01/19 12:53:07 UTC

[flink] branch master updated: [FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timestamp type

This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ea4476c054 [FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timestamp type
7ea4476c054 is described below

commit 7ea4476c0544e17798cbb1e39609827954f6c266
Author: laughingman7743 <la...@gmail.com>
AuthorDate: Wed Jan 4 23:12:19 2023 +0900

    [FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timestamp type
    
    Close #21613
---
 .../docs/connectors/table/formats/protobuf.md      |  5 ++
 flink-formats/flink-protobuf/pom.xml               |  1 +
 .../apache/flink/formats/protobuf/PbConstant.java  |  1 +
 .../flink/formats/protobuf/PbFormatContext.java    |  8 +--
 .../deserialize/PbCodegenArrayDeserializer.java    |  3 +-
 .../deserialize/PbCodegenMapDeserializer.java      |  6 +-
 .../deserialize/PbCodegenRowDeserializer.java      |  3 +-
 .../protobuf/deserialize/ProtoToRowConverter.java  |  6 +-
 .../serialize/PbCodegenArraySerializer.java        |  3 +-
 .../protobuf/serialize/PbCodegenMapSerializer.java |  6 +-
 .../protobuf/serialize/PbCodegenRowSerializer.java | 11 +---
 .../serialize/PbCodegenSimpleSerializer.java       |  8 +--
 .../protobuf/serialize/RowToProtoConverter.java    |  4 +-
 .../formats/protobuf/util/PbCodegenUtils.java      | 19 +++----
 .../flink/formats/protobuf/util/PbFormatUtils.java | 58 +++++++++++++++-----
 ...RowTest.java => MetaNoMultiProtoToRowTest.java} | 18 ++----
 .../formats/protobuf/MetaOuterNoMultiTest.java     |  4 +-
 .../protobuf/SameOuterClassNameProtoToRowTest.java | 61 +++++++++++++++++++++
 .../protobuf/SameOuterClassNameRowToProtoTest.java | 56 +++++++++++++++++++
 .../formats/protobuf/SimpleProtoToRowTest.java     | 45 +++++++++------
 .../formats/protobuf/SimpleRowToProtoTest.java     | 49 ++++++++++-------
 .../protobuf/TimestampMultiProtoToRowTest.java     | 46 ++++++++++++++++
 .../protobuf/TimestampMultiRowToProtoTest.java}    | 32 ++++++-----
 .../protobuf/TimestampNoMultiProtoToRowTest.java   | 47 ++++++++++++++++
 .../protobuf/TimestampNoMultiRowToProtoTest.java   | 44 +++++++++++++++
 .../TimestampOuterMultiProtoToRowTest.java         | 49 +++++++++++++++++
 .../TimestampOuterMultiRowToProtoTest.java         | 44 +++++++++++++++
 .../TimestampOuterNoMultiProtoToRowTest.java       | 47 ++++++++++++++++
 .../TimestampOuterNoMultiRowToProtoTest.java       | 47 ++++++++++++++++
 .../flink-protobuf/src/test/proto/test_map.proto   | 18 +++---
 .../test/proto/test_multiple_level_message.proto   | 26 ++++-----
 .../flink-protobuf/src/test/proto/test_null.proto  | 64 +++++++++++-----------
 .../flink-protobuf/src/test/proto/test_oneof.proto |  8 +--
 .../flink-protobuf/src/test/proto/test_pb3.proto   | 48 ++++++++--------
 .../src/test/proto/test_repeated.proto             | 13 ++---
 .../src/test/proto/test_repeated_message.proto     | 12 ++--
 ...neof.proto => test_same_outer_class_name.proto} | 16 +++---
 .../{test_simple.proto => test_simple_multi.proto} | 51 +++++++++--------
 .../test/proto/test_simple_no_java_package.proto   | 40 +++++++-------
 ...ter_nomulti.proto => test_simple_nomulti.proto} | 30 ++++++----
 .../src/test/proto/test_simple_outer_multi.proto   | 23 ++++----
 .../src/test/proto/test_simple_outer_nomulti.proto | 26 ++++-----
 ...test_oneof.proto => test_timestamp_multi.proto} | 10 ++--
 ...st_oneof.proto => test_timestamp_nomulti.proto} | 12 ++--
 ...sage.proto => test_timestamp_outer_multi.proto} | 14 ++---
 ...ge.proto => test_timestamp_outer_nomulti.proto} | 16 ++----
 46 files changed, 806 insertions(+), 352 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/formats/protobuf.md b/docs/content.zh/docs/connectors/table/formats/protobuf.md
index 5cbafc8d911..b28cf49f9d3 100644
--- a/docs/content.zh/docs/connectors/table/formats/protobuf.md
+++ b/docs/content.zh/docs/connectors/table/formats/protobuf.md
@@ -236,6 +236,11 @@ The following table lists the type mapping from Flink type to Protobuf type.
       <td><code>enum</code></td>
       <td>The enum value of protobuf can be mapped to string or number of flink row accordingly.</td>
     </tr>
+    <tr>
+      <td><code>ROW&lt;seconds BIGINT, nanos INT&gt;</code></td>
+      <td><code>google.protobuf.timestamp</code></td>
+      <td>The google.protobuf.timestamp type can be mapped to seconds and fractions of seconds at nanosecond resolution in UTC epoch time using the row type as well as the protobuf definition.</td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/flink-formats/flink-protobuf/pom.xml b/flink-formats/flink-protobuf/pom.xml
index be38a1a4242..da593b8bc4a 100644
--- a/flink-formats/flink-protobuf/pom.xml
+++ b/flink-formats/flink-protobuf/pom.xml
@@ -109,6 +109,7 @@ under the License.
 						</goals>
 						<configuration>
 							<protocArtifact>com.google.protobuf:protoc:${protoc.version}</protocArtifact>
+							<includeMavenTypes>direct</includeMavenTypes>
 							<inputDirectories>
 								<include>src/test/proto</include>
 							</inputDirectories>
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
index a886768534d..ea7d6514c56 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
@@ -26,4 +26,5 @@ public class PbConstant {
     public static final String GENERATED_ENCODE_METHOD = "encode";
     public static final String PB_MAP_KEY_NAME = "key";
     public static final String PB_MAP_VALUE_NAME = "value";
+    public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass";
 }
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
index 06370f3fa6e..27ceb0fb49d 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
@@ -20,18 +20,12 @@ package org.apache.flink.formats.protobuf;
 
 /** store config and common information. */
 public class PbFormatContext {
-    private final String outerPrefix;
     private final PbFormatConfig pbFormatConfig;
 
-    public PbFormatContext(String outerPrefix, PbFormatConfig pbFormatConfig) {
-        this.outerPrefix = outerPrefix;
+    public PbFormatContext(PbFormatConfig pbFormatConfig) {
         this.pbFormatConfig = pbFormatConfig;
     }
 
-    public String getOuterPrefix() {
-        return outerPrefix;
-    }
-
     public PbFormatConfig getPbFormatConfig() {
         return pbFormatConfig;
     }
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java
index 65bc760b0a3..202322edbbf 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java
@@ -50,8 +50,7 @@ public class PbCodegenArrayDeserializer implements PbCodegenDeserializer {
         PbCodegenAppender appender = new PbCodegenAppender(indent);
         PbCodegenVarId varUid = PbCodegenVarId.getInstance();
         int uid = varUid.getAndIncrement();
-        String protoTypeStr =
-                PbCodegenUtils.getTypeStrFromProto(fd, false, formatContext.getOuterPrefix());
+        String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(fd, false);
         String listPbVar = "list" + uid;
         String flinkArrVar = "newArr" + uid;
         String flinkArrEleVar = "subReturnVar" + uid;
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java
index 439332e29e7..fed7c84ebba 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java
@@ -58,10 +58,8 @@ public class PbCodegenMapDeserializer implements PbCodegenDeserializer {
                 fd.getMessageType().findFieldByName(PbConstant.PB_MAP_VALUE_NAME);
 
         PbCodegenAppender appender = new PbCodegenAppender(indent);
-        String pbKeyTypeStr =
-                PbCodegenUtils.getTypeStrFromProto(keyFd, false, formatContext.getOuterPrefix());
-        String pbValueTypeStr =
-                PbCodegenUtils.getTypeStrFromProto(valueFd, false, formatContext.getOuterPrefix());
+        String pbKeyTypeStr = PbCodegenUtils.getTypeStrFromProto(keyFd, false);
+        String pbValueTypeStr = PbCodegenUtils.getTypeStrFromProto(valueFd, false);
         String pbMapVar = "pbMap" + uid;
         String pbMapEntryVar = "pbEntry" + uid;
         String resultDataMapVar = "resultDataMap" + uid;
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
index 3a5b3113166..d1fc0c60726 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
@@ -54,8 +54,7 @@ public class PbCodegenRowDeserializer implements PbCodegenDeserializer {
         String flinkRowDataVar = "rowData" + uid;
 
         int fieldSize = rowType.getFieldNames().size();
-        String pbMessageTypeStr =
-                PbFormatUtils.getFullJavaName(descriptor, formatContext.getOuterPrefix());
+        String pbMessageTypeStr = PbFormatUtils.getFullJavaName(descriptor);
         appender.appendLine(pbMessageTypeStr + " " + pbMessageVar + " = " + pbObjectCode);
         appender.appendLine(
                 "GenericRowData " + flinkRowDataVar + " = new GenericRowData(" + fieldSize + ")");
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
index 0ecbbaef19e..4564efce2a4 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
@@ -58,8 +58,6 @@ public class ProtoToRowConverter {
     public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
             throws PbCodegenException {
         try {
-            String outerPrefix =
-                    PbFormatUtils.getOuterProtoPrefix(formatConfig.getMessageClassName());
             Descriptors.Descriptor descriptor =
                     PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
             Class<?> messageClass =
@@ -67,7 +65,7 @@ public class ProtoToRowConverter {
                             formatConfig.getMessageClassName(),
                             true,
                             Thread.currentThread().getContextClassLoader());
-            String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor, outerPrefix);
+            String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
             if (descriptor.getFile().getSyntax() == Syntax.PROTO3) {
                 // pb3 always read default values
                 formatConfig =
@@ -78,7 +76,7 @@ public class ProtoToRowConverter {
                                 formatConfig.getWriteNullStringLiterals());
             }
             PbCodegenAppender codegenAppender = new PbCodegenAppender();
-            PbFormatContext pbFormatContext = new PbFormatContext(outerPrefix, formatConfig);
+            PbFormatContext pbFormatContext = new PbFormatContext(formatConfig);
             String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
             String generatedClassName = "GeneratedProtoToRow_" + uuid;
             String generatedPackageName = ProtoToRowConverter.class.getPackage().getName();
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenArraySerializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenArraySerializer.java
index 9d242d96e43..0f2f1bcf068 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenArraySerializer.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenArraySerializer.java
@@ -50,8 +50,7 @@ public class PbCodegenArraySerializer implements PbCodegenSerializer {
         PbCodegenVarId varUid = PbCodegenVarId.getInstance();
         int uid = varUid.getAndIncrement();
         PbCodegenAppender appender = new PbCodegenAppender(indent);
-        String protoTypeStr =
-                PbCodegenUtils.getTypeStrFromProto(fd, false, formatContext.getOuterPrefix());
+        String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(fd, false);
         String pbListVar = "pbList" + uid;
         String flinkArrayDataVar = "arrData" + uid;
         String pbElementVar = "elementPbVar" + uid;
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenMapSerializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenMapSerializer.java
index c0c9e383a00..3d223218c0c 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenMapSerializer.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenMapSerializer.java
@@ -57,10 +57,8 @@ public class PbCodegenMapSerializer implements PbCodegenSerializer {
                 fd.getMessageType().findFieldByName(PbConstant.PB_MAP_VALUE_NAME);
 
         PbCodegenAppender appender = new PbCodegenAppender(indent);
-        String keyProtoTypeStr =
-                PbCodegenUtils.getTypeStrFromProto(keyFd, false, formatContext.getOuterPrefix());
-        String valueProtoTypeStr =
-                PbCodegenUtils.getTypeStrFromProto(valueFd, false, formatContext.getOuterPrefix());
+        String keyProtoTypeStr = PbCodegenUtils.getTypeStrFromProto(keyFd, false);
+        String valueProtoTypeStr = PbCodegenUtils.getTypeStrFromProto(valueFd, false);
 
         String flinkKeyArrDataVar = "keyArrData" + uid;
         String flinkValueArrDataVar = "valueArrData" + uid;
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
index 5f085a779e6..083a628b70d 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
@@ -52,8 +52,7 @@ public class PbCodegenRowSerializer implements PbCodegenSerializer {
         int uid = varUid.getAndIncrement();
         PbCodegenAppender appender = new PbCodegenAppender(indent);
         String flinkRowDataVar = "rowData" + uid;
-        String pbMessageTypeStr =
-                PbFormatUtils.getFullJavaName(descriptor, formatContext.getOuterPrefix());
+        String pbMessageTypeStr = PbFormatUtils.getFullJavaName(descriptor);
         String messageBuilderVar = "messageBuilder" + uid;
         appender.appendLine("RowData " + flinkRowDataVar + " = " + flinkObjectCode);
         appender.appendLine(
@@ -71,15 +70,11 @@ public class PbCodegenRowSerializer implements PbCodegenSerializer {
             String elementPbVar = "elementPbVar" + subUid;
             String elementPbTypeStr;
             if (elementFd.isMapField()) {
-                elementPbTypeStr =
-                        PbCodegenUtils.getTypeStrFromProto(
-                                elementFd, false, formatContext.getOuterPrefix());
+                elementPbTypeStr = PbCodegenUtils.getTypeStrFromProto(elementFd, false);
             } else {
                 elementPbTypeStr =
                         PbCodegenUtils.getTypeStrFromProto(
-                                elementFd,
-                                PbFormatUtils.isArrayType(subType),
-                                formatContext.getOuterPrefix());
+                                elementFd, PbFormatUtils.isArrayType(subType));
             }
             String strongCamelFieldName = PbFormatUtils.getStrongCamelCaseJsonName(fieldName);
 
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java
index 29d4bc9ebd8..ccf44f8283c 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java
@@ -60,9 +60,7 @@ public class PbCodegenSimpleSerializer implements PbCodegenSerializer {
             case SMALLINT:
             case TINYINT:
                 if (fd.getJavaType() == JavaType.ENUM) {
-                    String enumTypeStr =
-                            PbFormatUtils.getFullJavaName(
-                                    fd.getEnumType(), formatContext.getOuterPrefix());
+                    String enumTypeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
                     appender.appendLine(
                             resultVar
                                     + " = "
@@ -86,9 +84,7 @@ public class PbCodegenSimpleSerializer implements PbCodegenSerializer {
                 appender.appendLine(fromVar + " = " + flinkObjectCode + ".toString()");
                 if (fd.getJavaType() == JavaType.ENUM) {
                     String enumValueDescVar = "enumValueDesc" + uid;
-                    String enumTypeStr =
-                            PbFormatUtils.getFullJavaName(
-                                    fd.getEnumType(), formatContext.getOuterPrefix());
+                    String enumTypeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
                     appender.appendLine(
                             "Descriptors.EnumValueDescriptor "
                                     + enumValueDescVar
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
index 349049da02d..df701a515a2 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
@@ -55,11 +55,9 @@ public class RowToProtoConverter {
     public RowToProtoConverter(RowType rowType, PbFormatConfig formatConfig)
             throws PbCodegenException {
         try {
-            String outerPrefix =
-                    PbFormatUtils.getOuterProtoPrefix(formatConfig.getMessageClassName());
-            PbFormatContext formatContext = new PbFormatContext(outerPrefix, formatConfig);
             Descriptors.Descriptor descriptor =
                     PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
+            PbFormatContext formatContext = new PbFormatContext(formatConfig);
 
             PbCodegenAppender codegenAppender = new PbCodegenAppender(0);
             String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java
index 9cf262ee4be..042392e302b 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java
@@ -78,7 +78,7 @@ public class PbCodegenUtils {
      * @return The returned code phrase will be used as java type str in codegen sections.
      * @throws PbCodegenException
      */
-    public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList, String outerPrefix)
+    public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList)
             throws PbCodegenException {
         String typeStr;
         switch (fd.getJavaType()) {
@@ -90,12 +90,12 @@ public class PbCodegenUtils {
                     FieldDescriptor valueFd =
                             fd.getMessageType().findFieldByName(PbConstant.PB_MAP_VALUE_NAME);
                     // key and value cannot be repeated
-                    String keyTypeStr = getTypeStrFromProto(keyFd, false, outerPrefix);
-                    String valueTypeStr = getTypeStrFromProto(valueFd, false, outerPrefix);
+                    String keyTypeStr = getTypeStrFromProto(keyFd, false);
+                    String valueTypeStr = getTypeStrFromProto(valueFd, false);
                     typeStr = "Map<" + keyTypeStr + "," + valueTypeStr + ">";
                 } else {
                     // simple message
-                    typeStr = PbFormatUtils.getFullJavaName(fd.getMessageType(), outerPrefix);
+                    typeStr = PbFormatUtils.getFullJavaName(fd.getMessageType());
                 }
                 break;
             case INT:
@@ -108,7 +108,7 @@ public class PbCodegenUtils {
                 typeStr = "String";
                 break;
             case ENUM:
-                typeStr = PbFormatUtils.getFullJavaName(fd.getEnumType(), outerPrefix);
+                typeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
                 break;
             case FLOAT:
                 typeStr = "Float";
@@ -174,11 +174,10 @@ public class PbCodegenUtils {
     public static String pbDefaultValueCode(
             FieldDescriptor fieldDescriptor, PbFormatContext pbFormatContext)
             throws PbCodegenException {
-        String outerPrefix = pbFormatContext.getOuterPrefix();
         String nullLiteral = pbFormatContext.getPbFormatConfig().getWriteNullStringLiterals();
         switch (fieldDescriptor.getJavaType()) {
             case MESSAGE:
-                return PbFormatUtils.getFullJavaName(fieldDescriptor.getMessageType(), outerPrefix)
+                return PbFormatUtils.getFullJavaName(fieldDescriptor.getMessageType())
                         + ".getDefaultInstance()";
             case INT:
                 return "0";
@@ -187,7 +186,7 @@ public class PbCodegenUtils {
             case STRING:
                 return "\"" + nullLiteral + "\"";
             case ENUM:
-                return PbFormatUtils.getFullJavaName(fieldDescriptor.getEnumType(), outerPrefix)
+                return PbFormatUtils.getFullJavaName(fieldDescriptor.getEnumType())
                         + ".values()[0]";
             case FLOAT:
                 return "0.0f";
@@ -229,9 +228,7 @@ public class PbCodegenUtils {
         int uid = varUid.getAndIncrement();
         String flinkElementVar = "elementVar" + uid;
         PbCodegenAppender appender = new PbCodegenAppender(indent);
-        String protoTypeStr =
-                PbCodegenUtils.getTypeStrFromProto(
-                        elementPbFd, false, pbFormatContext.getOuterPrefix());
+        String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(elementPbFd, false);
         String dataTypeStr = PbCodegenUtils.getTypeStrFromLogicType(elementDataType);
         appender.appendLine(protoTypeStr + " " + resultPbVar);
         appender.begin("if(" + flinkArrDataVar + ".isNullAt(" + iVar + ")){");
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbFormatUtils.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbFormatUtils.java
index 84cd35c98cf..1f972bb5752 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbFormatUtils.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbFormatUtils.java
@@ -28,25 +28,25 @@ import com.google.protobuf.ProtobufInternalUtils;
 
 /** Protobuf function util. */
 public class PbFormatUtils {
-    public static String getFullJavaName(Descriptors.Descriptor descriptor, String outerProtoName) {
+    public static String getFullJavaName(Descriptors.Descriptor descriptor) {
         if (null != descriptor.getContainingType()) {
             // nested type
-            String parentJavaFullName =
-                    getFullJavaName(descriptor.getContainingType(), outerProtoName);
+            String parentJavaFullName = getFullJavaName(descriptor.getContainingType());
             return parentJavaFullName + "." + descriptor.getName();
         } else {
             // top level message
+            String outerProtoName = getOuterProtoPrefix(descriptor.getFile());
             return outerProtoName + descriptor.getName();
         }
     }
 
-    public static String getFullJavaName(
-            Descriptors.EnumDescriptor enumDescriptor, String outerProtoName) {
+    public static String getFullJavaName(Descriptors.EnumDescriptor enumDescriptor) {
         if (null != enumDescriptor.getContainingType()) {
-            return getFullJavaName(enumDescriptor.getContainingType(), outerProtoName)
+            return getFullJavaName(enumDescriptor.getContainingType())
                     + "."
                     + enumDescriptor.getName();
         } else {
+            String outerProtoName = getOuterProtoPrefix(enumDescriptor.getFile());
             return outerProtoName + enumDescriptor.getName();
         }
     }
@@ -72,14 +72,46 @@ public class PbFormatUtils {
         return ProtobufInternalUtils.underScoreToCamelCase(name, true);
     }
 
-    public static String getOuterProtoPrefix(String name) {
-        name = name.replace('$', '.');
-        int index = name.lastIndexOf('.');
-        if (index != -1) {
-            // include dot
-            return name.substring(0, index + 1);
+    public static String getOuterClassName(Descriptors.FileDescriptor fileDescriptor) {
+        if (fileDescriptor.getOptions().hasJavaOuterClassname()) {
+            return fileDescriptor.getOptions().getJavaOuterClassname();
         } else {
-            return "";
+            String[] fileNames = fileDescriptor.getName().split("/");
+            String fileName = fileNames[fileNames.length - 1];
+            String outerName = getStrongCamelCaseJsonName(fileName.split("\\.")[0]);
+            // https://developers.google.com/protocol-buffers/docs/reference/java-generated#invocation
+            // The name of the wrapper class is determined by converting the base name of the .proto
+            // file to camel case if the java_outer_classname option is not specified.
+            // For example, foo_bar.proto produces the class name FooBar. If there is a service,
+            // enum, or message (including nested types) in the file with the same name,
+            // "OuterClass" will be appended to the wrapper class's name.
+            boolean hasSameNameMessage =
+                    fileDescriptor.getMessageTypes().stream()
+                            .anyMatch(f -> f.getName().equals(outerName));
+            boolean hasSameNameEnum =
+                    fileDescriptor.getEnumTypes().stream()
+                            .anyMatch(f -> f.getName().equals(outerName));
+            boolean hasSameNameService =
+                    fileDescriptor.getServices().stream()
+                            .anyMatch(f -> f.getName().equals(outerName));
+            if (hasSameNameMessage || hasSameNameEnum || hasSameNameService) {
+                return outerName + PbConstant.PB_OUTER_CLASS_SUFFIX;
+            } else {
+                return outerName;
+            }
+        }
+    }
+
+    public static String getOuterProtoPrefix(Descriptors.FileDescriptor fileDescriptor) {
+        String javaPackageName =
+                fileDescriptor.getOptions().hasJavaPackage()
+                        ? fileDescriptor.getOptions().getJavaPackage()
+                        : fileDescriptor.getPackage();
+        if (fileDescriptor.getOptions().getJavaMultipleFiles()) {
+            return javaPackageName + ".";
+        } else {
+            String outerClassName = getOuterClassName(fileDescriptor);
+            return javaPackageName + "." + outerClassName + ".";
         }
     }
 
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoOuterNoMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoMultiProtoToRowTest.java
similarity index 78%
rename from flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoOuterNoMultiProtoToRowTest.java
rename to flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoMultiProtoToRowTest.java
index 32269393e17..bb83169d96c 100644
--- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoOuterNoMultiProtoToRowTest.java
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoMultiProtoToRowTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.formats.protobuf;
 
 import org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema;
 import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
-import org.apache.flink.formats.protobuf.testproto.TestSimpleNoouterNomulti;
+import org.apache.flink.formats.protobuf.testproto.TestSimpleNomulti;
 import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
@@ -39,18 +39,15 @@ import org.junit.Test;
  *
  * <p>It is valid proto definition.
  */
-public class MetaNoOuterNoMultiProtoToRowTest {
+public class MetaNoMultiProtoToRowTest {
     @Test
     public void testSimple() throws Exception {
         RowType rowType =
                 PbToRowTypeUtil.generateRowType(
-                        TestSimpleNoouterNomulti.SimpleTestNoouterNomulti.getDescriptor());
+                        TestSimpleNomulti.SimpleTestNoMulti.getDescriptor());
         PbFormatConfig formatConfig =
                 new PbFormatConfig(
-                        TestSimpleNoouterNomulti.SimpleTestNoouterNomulti.class.getName(),
-                        false,
-                        false,
-                        "");
+                        TestSimpleNomulti.SimpleTestNoMulti.class.getName(), false, false, "");
         new PbRowDataDeserializationSchema(rowType, InternalTypeInfo.of(rowType), formatConfig)
                 .open(null);
 
@@ -62,13 +59,10 @@ public class MetaNoOuterNoMultiProtoToRowTest {
     public void testOuterClassName() throws Exception {
         RowType rowType =
                 PbToRowTypeUtil.generateRowType(
-                        TestSimpleNoouterNomulti.SimpleTestNoouterNomulti.getDescriptor());
+                        TestSimpleNomulti.SimpleTestNoMulti.getDescriptor());
         PbFormatConfig formatConfig =
                 new PbFormatConfig(
-                        TestSimpleNoouterNomulti.SimpleTestNoouterNomulti.class.getName(),
-                        false,
-                        false,
-                        "");
+                        TestSimpleNomulti.SimpleTestNoMulti.class.getName(), false, false, "");
         new PbRowDataDeserializationSchema(rowType, InternalTypeInfo.of(rowType), formatConfig)
                 .open(null);
         new PbRowDataSerializationSchema(rowType, formatConfig).open(null);
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java
index 7e70a5561b5..afcd3498bb0 100644
--- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java
@@ -45,10 +45,10 @@ public class MetaOuterNoMultiTest {
     public void testSimple() throws Exception {
         RowType rowType =
                 PbToRowTypeUtil.generateRowType(
-                        SimpleTestOuterNomultiProto.SimpleTestOuterNomulti.getDescriptor());
+                        SimpleTestOuterNomultiProto.SimpleTestOuterNoMulti.getDescriptor());
         PbFormatConfig formatConfig =
                 new PbFormatConfig(
-                        SimpleTestOuterNomultiProto.SimpleTestOuterNomulti.class.getName(),
+                        SimpleTestOuterNomultiProto.SimpleTestOuterNoMulti.class.getName(),
                         false,
                         false,
                         "");
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameProtoToRowTest.java
new file mode 100644
index 00000000000..b049f2fb907
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameProtoToRowTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TestSameOuterClassNameOuterClass;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test conversion of proto same outer class name data to flink internal data. */
+public class SameOuterClassNameProtoToRowTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        TestSameOuterClassNameOuterClass.TestSameOuterClassName testSameOuterClassName =
+                TestSameOuterClassNameOuterClass.TestSameOuterClassName.newBuilder()
+                        .setA(1)
+                        .setB(TestSameOuterClassNameOuterClass.FooBar.BAR)
+                        .build();
+        RowData row =
+                ProtobufTestHelper.pbBytesToRow(
+                        TestSameOuterClassNameOuterClass.TestSameOuterClassName.class,
+                        testSameOuterClassName.toByteArray());
+
+        assertEquals(1, row.getInt(0));
+        assertEquals("BAR", row.getString(1).toString());
+    }
+
+    @Test
+    public void testIntEnum() throws Exception {
+        TestSameOuterClassNameOuterClass.TestSameOuterClassName testSameOuterClassName =
+                TestSameOuterClassNameOuterClass.TestSameOuterClassName.newBuilder()
+                        .setB(TestSameOuterClassNameOuterClass.FooBar.BAR)
+                        .build();
+
+        RowData row =
+                ProtobufTestHelper.pbBytesToRow(
+                        TestSameOuterClassNameOuterClass.TestSameOuterClassName.class,
+                        testSameOuterClassName.toByteArray(),
+                        true);
+        assertEquals(1, row.getInt(1));
+    }
+}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameRowToProtoTest.java
new file mode 100644
index 00000000000..9e9e3f7fe69
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameRowToProtoTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TestSameOuterClassNameOuterClass;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test conversion of flink internal primitive data to same outer class name proto data. */
+public class SameOuterClassNameRowToProtoTest {
+    @Test
+    public void testSimple() throws Exception {
+        RowData row = GenericRowData.of(1, StringData.fromString("BAR"));
+
+        byte[] bytes =
+                ProtobufTestHelper.rowToPbBytes(
+                        row, TestSameOuterClassNameOuterClass.TestSameOuterClassName.class);
+        TestSameOuterClassNameOuterClass.TestSameOuterClassName testSameOuterClassName =
+                TestSameOuterClassNameOuterClass.TestSameOuterClassName.parseFrom(bytes);
+        assertEquals(1, testSameOuterClassName.getA());
+        assertEquals(TestSameOuterClassNameOuterClass.FooBar.BAR, testSameOuterClassName.getB());
+    }
+
+    @Test
+    public void testEnumAsInt() throws Exception {
+        RowData row = GenericRowData.of(1, 1);
+
+        byte[] bytes =
+                ProtobufTestHelper.rowToPbBytes(
+                        row, TestSameOuterClassNameOuterClass.TestSameOuterClassName.class, true);
+        TestSameOuterClassNameOuterClass.TestSameOuterClassName testSameOuterClassName =
+                TestSameOuterClassNameOuterClass.TestSameOuterClassName.parseFrom(bytes);
+        assertEquals(TestSameOuterClassNameOuterClass.FooBar.BAR, testSameOuterClassName.getB());
+    }
+}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java
index f7cfa331109..2409e60db7e 100644
--- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.formats.protobuf;
 
-import org.apache.flink.formats.protobuf.testproto.SimpleTest;
+import org.apache.flink.formats.protobuf.testproto.SimpleTestMulti;
+import org.apache.flink.formats.protobuf.testproto.Status;
 import org.apache.flink.table.data.RowData;
 
 import com.google.protobuf.ByteString;
@@ -33,8 +34,8 @@ import static org.junit.Assert.assertTrue;
 public class SimpleProtoToRowTest {
     @Test
     public void testSimple() throws Exception {
-        SimpleTest simple =
-                SimpleTest.newBuilder()
+        SimpleTestMulti simple =
+                SimpleTestMulti.newBuilder()
                         .setA(1)
                         .setB(2L)
                         .setC(false)
@@ -42,14 +43,15 @@ public class SimpleProtoToRowTest {
                         .setE(0.01)
                         .setF("haha")
                         .setG(ByteString.copyFrom(new byte[] {1}))
-                        .setH(SimpleTest.Corpus.IMAGES)
+                        .setH(SimpleTestMulti.Corpus.IMAGES)
+                        .setI(Status.FINISHED)
                         .setFAbc7D(1) // test fieldNameToJsonName
                         .setVpr6S(2)
                         .build();
 
-        RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTest.class, simple.toByteArray());
+        RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTestMulti.class, simple.toByteArray());
 
-        assertEquals(10, row.getArity());
+        assertEquals(11, row.getArity());
         assertEquals(1, row.getInt(0));
         assertEquals(2L, row.getLong(1));
         assertFalse(row.getBoolean(2));
@@ -58,14 +60,15 @@ public class SimpleProtoToRowTest {
         assertEquals("haha", row.getString(5).toString());
         assertEquals(1, (row.getBinary(6))[0]);
         assertEquals("IMAGES", row.getString(7).toString());
-        assertEquals(1, row.getInt(8));
-        assertEquals(2, row.getInt(9));
+        assertEquals("FINISHED", row.getString(8).toString());
+        assertEquals(1, row.getInt(9));
+        assertEquals(2, row.getInt(10));
     }
 
     @Test
     public void testNotExistsValueIgnoringDefault() throws Exception {
-        SimpleTest simple =
-                SimpleTest.newBuilder()
+        SimpleTestMulti simple =
+                SimpleTestMulti.newBuilder()
                         .setB(2L)
                         .setC(false)
                         .setD(0.1f)
@@ -73,7 +76,7 @@ public class SimpleProtoToRowTest {
                         .setF("haha")
                         .build();
 
-        RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTest.class, simple.toByteArray());
+        RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTestMulti.class, simple.toByteArray());
 
         assertTrue(row.isNullAt(0));
         assertFalse(row.isNullAt(1));
@@ -81,13 +84,13 @@ public class SimpleProtoToRowTest {
 
     @Test
     public void testDefaultValues() throws Exception {
-        SimpleTest simple = SimpleTest.newBuilder().build();
+        SimpleTestMulti simple = SimpleTestMulti.newBuilder().build();
 
         RowData row =
                 ProtobufTestHelper.pbBytesToRow(
-                        SimpleTest.class,
+                        SimpleTestMulti.class,
                         simple.toByteArray(),
-                        new PbFormatConfig(SimpleTest.class.getName(), false, true, ""),
+                        new PbFormatConfig(SimpleTestMulti.class.getName(), false, true, ""),
                         false);
 
         assertFalse(row.isNullAt(0));
@@ -98,6 +101,7 @@ public class SimpleProtoToRowTest {
         assertFalse(row.isNullAt(5));
         assertFalse(row.isNullAt(6));
         assertFalse(row.isNullAt(7));
+        assertFalse(row.isNullAt(8));
         assertEquals(10, row.getInt(0));
         assertEquals(100L, row.getLong(1));
         assertFalse(row.getBoolean(2));
@@ -105,13 +109,20 @@ public class SimpleProtoToRowTest {
         assertEquals(0.0d, row.getDouble(4), 0.0001);
         assertEquals("f", row.getString(5).toString());
         assertArrayEquals(ByteString.EMPTY.toByteArray(), row.getBinary(6));
-        assertEquals(SimpleTest.Corpus.UNIVERSAL.toString(), row.getString(7).toString());
+        assertEquals(SimpleTestMulti.Corpus.UNIVERSAL.toString(), row.getString(7).toString());
+        assertEquals(Status.UNSPECIFIED.toString(), row.getString(8).toString());
     }
 
     @Test
     public void testIntEnum() throws Exception {
-        SimpleTest simple = SimpleTest.newBuilder().setH(SimpleTest.Corpus.IMAGES).build();
-        RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTest.class, simple.toByteArray(), true);
+        SimpleTestMulti simple =
+                SimpleTestMulti.newBuilder()
+                        .setH(SimpleTestMulti.Corpus.IMAGES)
+                        .setI(Status.STARTED)
+                        .build();
+        RowData row =
+                ProtobufTestHelper.pbBytesToRow(SimpleTestMulti.class, simple.toByteArray(), true);
         assertEquals(2, row.getInt(7));
+        assertEquals(1, row.getInt(8));
     }
 }
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java
index eccee930892..04b059b8bcc 100644
--- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.formats.protobuf;
 
-import org.apache.flink.formats.protobuf.testproto.SimpleTest;
+import org.apache.flink.formats.protobuf.testproto.SimpleTestMulti;
+import org.apache.flink.formats.protobuf.testproto.Status;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -43,21 +44,23 @@ public class SimpleRowToProtoTest {
                         StringData.fromString("hello"),
                         new byte[] {1},
                         StringData.fromString("IMAGES"),
+                        StringData.fromString("FINISHED"),
                         1,
                         2);
 
-        byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTest.class);
-        SimpleTest simpleTest = SimpleTest.parseFrom(bytes);
-        assertTrue(simpleTest.hasA());
-        assertEquals(1, simpleTest.getA());
-        assertEquals(2L, simpleTest.getB());
-        assertFalse(simpleTest.getC());
-        assertEquals(Float.valueOf(0.1f), Float.valueOf(simpleTest.getD()));
-        assertEquals(Double.valueOf(0.01d), Double.valueOf(simpleTest.getE()));
-        assertEquals("hello", simpleTest.getF());
-        assertEquals(1, simpleTest.getG().byteAt(0));
-        assertEquals(SimpleTest.Corpus.IMAGES, simpleTest.getH());
-        assertEquals(1, simpleTest.getFAbc7D());
+        byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTestMulti.class);
+        SimpleTestMulti simpleTestMulti = SimpleTestMulti.parseFrom(bytes);
+        assertTrue(simpleTestMulti.hasA());
+        assertEquals(1, simpleTestMulti.getA());
+        assertEquals(2L, simpleTestMulti.getB());
+        assertFalse(simpleTestMulti.getC());
+        assertEquals(Float.valueOf(0.1f), Float.valueOf(simpleTestMulti.getD()));
+        assertEquals(Double.valueOf(0.01d), Double.valueOf(simpleTestMulti.getE()));
+        assertEquals("hello", simpleTestMulti.getF());
+        assertEquals(1, simpleTestMulti.getG().byteAt(0));
+        assertEquals(SimpleTestMulti.Corpus.IMAGES, simpleTestMulti.getH());
+        assertEquals(Status.FINISHED, simpleTestMulti.getI());
+        assertEquals(1, simpleTestMulti.getFAbc7D());
     }
 
     @Test
@@ -72,14 +75,16 @@ public class SimpleRowToProtoTest {
                         StringData.fromString("hello"),
                         null,
                         null,
+                        null,
                         1,
                         2);
 
-        byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTest.class);
-        SimpleTest simpleTest = SimpleTest.parseFrom(bytes);
-        assertFalse(simpleTest.hasA());
-        assertFalse(simpleTest.hasG());
-        assertFalse(simpleTest.hasH());
+        byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTestMulti.class);
+        SimpleTestMulti simpleTestMulti = SimpleTestMulti.parseFrom(bytes);
+        assertFalse(simpleTestMulti.hasA());
+        assertFalse(simpleTestMulti.hasG());
+        assertFalse(simpleTestMulti.hasH());
+        assertFalse(simpleTestMulti.hasI());
     }
 
     @Test
@@ -87,10 +92,12 @@ public class SimpleRowToProtoTest {
         RowData row =
                 GenericRowData.of(
                         null, null, null, null, null, null, null, 2, // CORPUS: IMAGE
+                        1, // STATUS: STARTED
                         null, null);
 
-        byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTest.class, true);
-        SimpleTest simpleTest = SimpleTest.parseFrom(bytes);
-        assertEquals(SimpleTest.Corpus.IMAGES, simpleTest.getH());
+        byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTestMulti.class, true);
+        SimpleTestMulti simpleTestMulti = SimpleTestMulti.parseFrom(bytes);
+        assertEquals(SimpleTestMulti.Corpus.IMAGES, simpleTestMulti.getH());
+        assertEquals(Status.STARTED, simpleTestMulti.getI());
     }
 }
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiProtoToRowTest.java
new file mode 100644
index 00000000000..e567a789da1
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiProtoToRowTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TimestampTestMulti;
+import org.apache.flink.table.data.RowData;
+
+import com.google.protobuf.Timestamp;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test conversion of proto timestamp data with multiple_files options to flink internal data. */
+public class TimestampMultiProtoToRowTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        TimestampTestMulti timestampTestMulti =
+                TimestampTestMulti.newBuilder()
+                        .setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
+                        .build();
+        RowData row =
+                ProtobufTestHelper.pbBytesToRow(
+                        TimestampTestMulti.class, timestampTestMulti.toByteArray());
+
+        RowData rowData = row.getRow(0, 2);
+        assertEquals(1672498800, rowData.getLong(0));
+        assertEquals(123, rowData.getInt(1));
+    }
+}
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiRowToProtoTest.java
similarity index 50%
copy from flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
copy to flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiRowToProtoTest.java
index 06370f3fa6e..213f42661a2 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiRowToProtoTest.java
@@ -18,21 +18,27 @@
 
 package org.apache.flink.formats.protobuf;
 
-/** store config and common information. */
-public class PbFormatContext {
-    private final String outerPrefix;
-    private final PbFormatConfig pbFormatConfig;
+import org.apache.flink.formats.protobuf.testproto.TimestampTestMulti;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
 
-    public PbFormatContext(String outerPrefix, PbFormatConfig pbFormatConfig) {
-        this.outerPrefix = outerPrefix;
-        this.pbFormatConfig = pbFormatConfig;
-    }
+import org.junit.Test;
 
-    public String getOuterPrefix() {
-        return outerPrefix;
-    }
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test conversion of flink internal primitive data to proto timestamp data with multiple_files
+ * options.
+ */
+public class TimestampMultiRowToProtoTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123));
 
-    public PbFormatConfig getPbFormatConfig() {
-        return pbFormatConfig;
+        byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, TimestampTestMulti.class);
+        TimestampTestMulti timestampTestMulti = TimestampTestMulti.parseFrom(bytes);
+        assertEquals(1672498800, timestampTestMulti.getTs().getSeconds());
+        assertEquals(123, timestampTestMulti.getTs().getNanos());
     }
 }
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiProtoToRowTest.java
new file mode 100644
index 00000000000..55917933bc2
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiProtoToRowTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TestTimestampNomulti;
+import org.apache.flink.table.data.RowData;
+
+import com.google.protobuf.Timestamp;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test conversion of proto timestamp data to flink internal data. */
+public class TimestampNoMultiProtoToRowTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        TestTimestampNomulti.TimestampTestNoMulti timestampTestNoMulti =
+                TestTimestampNomulti.TimestampTestNoMulti.newBuilder()
+                        .setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
+                        .build();
+        RowData row =
+                ProtobufTestHelper.pbBytesToRow(
+                        TestTimestampNomulti.TimestampTestNoMulti.class,
+                        timestampTestNoMulti.toByteArray());
+
+        RowData rowData = row.getRow(0, 2);
+        assertEquals(1672498800, rowData.getLong(0));
+        assertEquals(123, rowData.getInt(1));
+    }
+}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiRowToProtoTest.java
new file mode 100644
index 00000000000..65cd877b2bf
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiRowToProtoTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TestTimestampNomulti;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test conversion of flink internal primitive data to proto timestamp data. */
+public class TimestampNoMultiRowToProtoTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123));
+
+        byte[] bytes =
+                ProtobufTestHelper.rowToPbBytes(
+                        row, TestTimestampNomulti.TimestampTestNoMulti.class);
+        TestTimestampNomulti.TimestampTestNoMulti timestampTestNoMulti =
+                TestTimestampNomulti.TimestampTestNoMulti.parseFrom(bytes);
+        assertEquals(1672498800, timestampTestNoMulti.getTs().getSeconds());
+        assertEquals(123, timestampTestNoMulti.getTs().getNanos());
+    }
+}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiProtoToRowTest.java
new file mode 100644
index 00000000000..935c17c169b
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiProtoToRowTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterMulti;
+import org.apache.flink.table.data.RowData;
+
+import com.google.protobuf.Timestamp;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test conversion of proto timestamp data with multiple_files and outer_classname options to flink
+ * internal data.
+ */
+public class TimestampOuterMultiProtoToRowTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        TimestampTestOuterMulti timestampTestOuterMulti =
+                TimestampTestOuterMulti.newBuilder()
+                        .setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
+                        .build();
+        RowData row =
+                ProtobufTestHelper.pbBytesToRow(
+                        TimestampTestOuterMulti.class, timestampTestOuterMulti.toByteArray());
+
+        RowData rowData = row.getRow(0, 2);
+        assertEquals(1672498800, rowData.getLong(0));
+        assertEquals(123, rowData.getInt(1));
+    }
+}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiRowToProtoTest.java
new file mode 100644
index 00000000000..1f27ed60f0f
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiRowToProtoTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterMulti;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test conversion of flink internal primitive data to proto timestamp data with multiple_files and
+ * outer_classname options.
+ */
+public class TimestampOuterMultiRowToProtoTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123));
+
+        byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, TimestampTestOuterMulti.class);
+        TimestampTestOuterMulti timestampTestOuterMulti = TimestampTestOuterMulti.parseFrom(bytes);
+        assertEquals(1672498800, timestampTestOuterMulti.getTs().getSeconds());
+        assertEquals(123, timestampTestOuterMulti.getTs().getNanos());
+    }
+}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiProtoToRowTest.java
new file mode 100644
index 00000000000..5c2c08fe435
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiProtoToRowTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterNomultiProto;
+import org.apache.flink.table.data.RowData;
+
+import com.google.protobuf.Timestamp;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test conversion of proto timestamp data with outer_classname options to flink internal data. */
+public class TimestampOuterNoMultiProtoToRowTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti timestampTestOuterNoMulti =
+                TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.newBuilder()
+                        .setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
+                        .build();
+        RowData row =
+                ProtobufTestHelper.pbBytesToRow(
+                        TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.class,
+                        timestampTestOuterNoMulti.toByteArray());
+
+        RowData rowData = row.getRow(0, 2);
+        assertEquals(1672498800, rowData.getLong(0));
+        assertEquals(123, rowData.getInt(1));
+    }
+}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiRowToProtoTest.java
new file mode 100644
index 00000000000..208b49ab780
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiRowToProtoTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterNomultiProto;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test conversion of flink internal primitive data to proto timestamp data with outer_classname
+ * options.
+ */
+public class TimestampOuterNoMultiRowToProtoTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123));
+
+        byte[] bytes =
+                ProtobufTestHelper.rowToPbBytes(
+                        row, TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.class);
+        TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti timestampTestOuterNoMulti =
+                TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.parseFrom(bytes);
+        assertEquals(1672498800, timestampTestOuterNoMulti.getTs().getSeconds());
+        assertEquals(123, timestampTestOuterNoMulti.getTs().getNanos());
+    }
+}
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_map.proto b/flink-formats/flink-protobuf/src/test/proto/test_map.proto
index e22f771bfe3..cce7daa1143 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_map.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_map.proto
@@ -22,16 +22,14 @@ option java_package = "org.apache.flink.formats.protobuf.testproto";
 option java_multiple_files = true;
 
 message MapTest {
-    optional int32 a = 1;
-    map<string, string> map1 = 2;
-    map<string, InnerMessageTest> map2 = 3;
-    map<string, bytes> map3 = 4;
+  optional int32 a = 1;
+  map<string, string> map1 = 2;
+  map<string, InnerMessageTest> map2 = 3;
+  map<string, bytes> map3 = 4;
 
 
-    message InnerMessageTest{
-          optional int32 a =1;
-          optional int64 b =2;
-    }
+  message InnerMessageTest{
+    optional int32 a = 1;
+    optional int64 b = 2;
+  }
 }
-
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_multiple_level_message.proto b/flink-formats/flink-protobuf/src/test/proto/test_multiple_level_message.proto
index ba0c320d35f..0829f26132b 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_multiple_level_message.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_multiple_level_message.proto
@@ -22,21 +22,17 @@ option java_package = "org.apache.flink.formats.protobuf.testproto";
 option java_multiple_files = true;
 
 message MultipleLevelMessageTest {
-    optional int32 a = 1;
-    optional int64 b = 2;
-    optional bool c = 3;
-    optional InnerMessageTest1 d = 4;
+  optional int32 a = 1;
+  optional int64 b = 2;
+  optional bool c = 3;
+  optional InnerMessageTest1 d = 4;
 
-    message InnerMessageTest1{
-          optional InnerMessageTest2 a = 1;
-          optional bool c = 2;
-          message InnerMessageTest2{
-             optional int32 a =1;
-             optional int64 b =2;
-          }
+  message InnerMessageTest1{
+    optional InnerMessageTest2 a = 1;
+    optional bool c = 2;
+    message InnerMessageTest2{
+      optional int32 a = 1;
+      optional int64 b = 2;
     }
+  }
 }
-
-
-
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_null.proto b/flink-formats/flink-protobuf/src/test/proto/test_null.proto
index b17b15336b4..cd3d16d962c 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_null.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_null.proto
@@ -22,40 +22,38 @@ option java_package = "org.apache.flink.formats.protobuf.testproto";
 option java_multiple_files = true;
 
 message NullTest {
-    map<string, string> string_map = 1;
-    map<int32, int32> int_map = 2;
-    map<int64, int64> long_map = 3;
-    map<bool, bool> boolean_map = 4;
-    map<string, float> float_map = 5;
-    map<string, double> double_map = 6;
-    map<string, Corpus> enum_map = 7;
-    map<string, InnerMessageTest> message_map = 8;
-    map<string, bytes> bytes_map=9;
+  map<string, string> string_map = 1;
+  map<int32, int32> int_map = 2;
+  map<int64, int64> long_map = 3;
+  map<bool, bool> boolean_map = 4;
+  map<string, float> float_map = 5;
+  map<string, double> double_map = 6;
+  map<string, Corpus> enum_map = 7;
+  map<string, InnerMessageTest> message_map = 8;
+  map<string, bytes> bytes_map = 9;
 
-    repeated string string_array = 10;
-    repeated int32 int_array = 11;
-    repeated int64 long_array = 12;
-    repeated bool boolean_array = 13;
-    repeated float float_array = 14;
-    repeated double double_array = 15;
-    repeated Corpus enum_array = 16;
-    repeated InnerMessageTest message_array = 17;
-    repeated bytes bytes_array = 18;
+  repeated string string_array = 10;
+  repeated int32 int_array = 11;
+  repeated int64 long_array = 12;
+  repeated bool boolean_array = 13;
+  repeated float float_array = 14;
+  repeated double double_array = 15;
+  repeated Corpus enum_array = 16;
+  repeated InnerMessageTest message_array = 17;
+  repeated bytes bytes_array = 18;
 
-    message InnerMessageTest{
-          optional int32 a =1;
-          optional int64 b =2;
-    }
+  message InnerMessageTest{
+    optional int32 a = 1;
+    optional int64 b = 2;
+  }
 
-    enum Corpus {
-            UNIVERSAL = 0;
-            WEB = 1;
-            IMAGES = 2;
-            LOCAL = 3;
-            NEWS = 4;
-            PRODUCTS = 5;
-            VIDEO = 7;
-    }
+  enum Corpus {
+    UNIVERSAL = 0;
+    WEB = 1;
+    IMAGES = 2;
+    LOCAL = 3;
+    NEWS = 4;
+    PRODUCTS = 5;
+    VIDEO = 7;
+  }
 }
-
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto b/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
index 814761b2cec..b81cf39a31b 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
@@ -22,8 +22,8 @@ option java_package = "org.apache.flink.formats.protobuf.testproto";
 option java_multiple_files = true;
 
 message OneofTest {
-    oneof test_oneof{
-       int32 a = 1;
-       int32 b = 2;
-    }
+  oneof test_oneof{
+    int32 a = 1;
+    int32 b = 2;
+  }
 }
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_pb3.proto b/flink-formats/flink-protobuf/src/test/proto/test_pb3.proto
index b21ca840f65..bbc5f4df4b1 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_pb3.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_pb3.proto
@@ -22,32 +22,30 @@ option java_package = "org.apache.flink.formats.protobuf.testproto";
 option java_multiple_files = true;
 
 message Pb3Test {
+  int32 a = 1;
+  int64 b = 2;
+  string c = 3;
+  float d = 4;
+  double e = 5;
+  Corpus f = 6;
+  InnerMessageTest g = 7;
+  repeated InnerMessageTest h = 8;
+  bytes i = 9;
+  map<string, string> map1 = 10;
+  map<string, InnerMessageTest> map2 = 11;
+
+  message InnerMessageTest{
     int32 a = 1;
     int64 b = 2;
-    string c = 3;
-    float d = 4;
-    double e = 5;
-    Corpus f = 6;
-    InnerMessageTest g = 7;
-    repeated InnerMessageTest h = 8;
-    bytes i = 9;
-    map<string, string> map1 = 10;
-    map<string, InnerMessageTest> map2 = 11;
-
-    message InnerMessageTest{
-          int32 a =1;
-          int64 b =2;
-    }
+  }
 
-    enum Corpus {
-            UNIVERSAL = 0;
-            WEB = 1;
-            IMAGES = 2;
-            LOCAL = 3;
-            NEWS = 4;
-            PRODUCTS = 5;
-            VIDEO = 7;
-    }
+  enum Corpus {
+    UNIVERSAL = 0;
+    WEB = 1;
+    IMAGES = 2;
+    LOCAL = 3;
+    NEWS = 4;
+    PRODUCTS = 5;
+    VIDEO = 7;
+  }
 }
-
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_repeated.proto b/flink-formats/flink-protobuf/src/test/proto/test_repeated.proto
index 90cb9a58bed..75dff2ef78b 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_repeated.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_repeated.proto
@@ -22,11 +22,10 @@ option java_package = "org.apache.flink.formats.protobuf.testproto";
 option java_multiple_files = true;
 
 message RepeatedTest {
-    optional int32 a = 1;
-    repeated int64 b = 2;
-    optional bool c = 3;
-    optional float d = 4;
-    optional double e = 5;
-    optional string f = 6;
+  optional int32 a = 1;
+  repeated int64 b = 2;
+  optional bool c = 3;
+  optional float d = 4;
+  optional double e = 5;
+  optional string f = 6;
 }
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto b/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto
index 2a73a9a111b..4e0e9694521 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto
@@ -22,11 +22,9 @@ option java_package = "org.apache.flink.formats.protobuf.testproto";
 option java_multiple_files = true;
 
 message RepeatedMessageTest {
-    repeated InnerMessageTest d = 4;
-    message InnerMessageTest{
-      optional int32 a =1;
-      optional int64 b =2;
-    }
+  repeated InnerMessageTest d = 4;
+  message InnerMessageTest{
+    optional int32 a = 1;
+    optional int64 b = 2;
+  }
 }
-
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto b/flink-formats/flink-protobuf/src/test/proto/test_same_outer_class_name.proto
similarity index 86%
copy from flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
copy to flink-formats/flink-protobuf/src/test/proto/test_same_outer_class_name.proto
index 814761b2cec..2941b6b65fe 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_same_outer_class_name.proto
@@ -16,14 +16,16 @@
  * limitations under the License.
  */
 
-syntax = "proto2";
+syntax = "proto3";
 package org.apache.flink.formats.protobuf.proto;
 option java_package = "org.apache.flink.formats.protobuf.testproto";
-option java_multiple_files = true;
 
-message OneofTest {
-    oneof test_oneof{
-       int32 a = 1;
-       int32 b = 2;
-    }
+message TestSameOuterClassName {
+  int32 a = 1;
+  FooBar b = 2;
+}
+
+enum FooBar {
+  FOO = 0;
+  BAR = 1;
 }
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_multi.proto
similarity index 60%
rename from flink-formats/flink-protobuf/src/test/proto/test_simple.proto
rename to flink-formats/flink-protobuf/src/test/proto/test_simple_multi.proto
index e5d01f740c7..9ee527e0784 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_simple.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_multi.proto
@@ -21,29 +21,34 @@ package org.apache.flink.formats.protobuf.proto;
 option java_package = "org.apache.flink.formats.protobuf.testproto";
 option java_multiple_files = true;
 
-message SimpleTest {
-    optional int32 a = 1 [default=10];
-    optional int64 b = 2 [default=100];
-    optional bool c = 3;
-    optional float d = 4;
-    optional double e = 5;
-    optional string f = 6 [default="f"];;
-    optional bytes g = 7;
-    optional Corpus h = 8;
-    //this is must because protobuf have some field name parse bug if number is after "_".
-    optional int32 f_abc_7d = 9;
-    optional int32 vpr6s = 10;
-
-    enum Corpus {
-        UNIVERSAL = 0;
-        WEB = 1;
-        IMAGES = 2;
-        LOCAL = 3;
-        NEWS = 4;
-        PRODUCTS = 5;
-        VIDEO = 7;
-      }
+message SimpleTestMulti {
+  optional int32 a = 1 [default = 10];
+  optional int64 b = 2 [default = 100];
+  optional bool c = 3;
+  optional float d = 4;
+  optional double e = 5;
+  optional string f = 6 [default = "f"];
+  optional bytes g = 7;
+  optional Corpus h = 8;
+  optional Status i = 9;
+  //this is must because protobuf have some field name parse bug if number is after "_".
+  optional int32 f_abc_7d = 10;
+  optional int32 vpr6s = 11;
 
+  enum Corpus {
+    UNIVERSAL = 0;
+    WEB = 1;
+    IMAGES = 2;
+    LOCAL = 3;
+    NEWS = 4;
+    PRODUCTS = 5;
+    VIDEO = 7;
+  }
 }
 
-
+enum Status {
+  UNSPECIFIED = 0;
+  STARTED = 1;
+  RUNNING = 2;
+  FINISHED = 3;
+}
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple_no_java_package.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_no_java_package.proto
index 334ec0d68e4..649400e0780 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_simple_no_java_package.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_no_java_package.proto
@@ -21,26 +21,24 @@ package org.apache.flink.formats.protobuf.proto;
 option java_multiple_files = true;
 
 message SimpleTestNoJavaPackage {
-    optional int32 a = 1 [default=10];
-    optional int64 b = 2 [default=100];
-    optional bool c = 3;
-    optional float d = 4;
-    optional double e = 5;
-    optional string f = 6 [default="f"];
-    optional bytes g = 7;
-    optional Corpus h = 8;
-    //this is must because protobuf have some field name parse bug if number is after "_".
-    optional int32 f_abc_7d = 9;
+  optional int32 a = 1 [default = 10];
+  optional int64 b = 2 [default = 100];
+  optional bool c = 3;
+  optional float d = 4;
+  optional double e = 5;
+  optional string f = 6 [default = "f"];
+  optional bytes g = 7;
+  optional Corpus h = 8;
+  //this is must because protobuf have some field name parse bug if number is after "_".
+  optional int32 f_abc_7d = 9;
 
-    enum Corpus {
-        UNIVERSAL = 0;
-        WEB = 1;
-        IMAGES = 2;
-        LOCAL = 3;
-        NEWS = 4;
-        PRODUCTS = 5;
-        VIDEO = 7;
-      }
+  enum Corpus {
+    UNIVERSAL = 0;
+    WEB = 1;
+    IMAGES = 2;
+    LOCAL = 3;
+    NEWS = 4;
+    PRODUCTS = 5;
+    VIDEO = 7;
+  }
 }
-
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple_noouter_nomulti.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_nomulti.proto
similarity index 73%
rename from flink-formats/flink-protobuf/src/test/proto/test_simple_noouter_nomulti.proto
rename to flink-formats/flink-protobuf/src/test/proto/test_simple_nomulti.proto
index 379732d5503..f5ab90002b1 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_simple_noouter_nomulti.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_nomulti.proto
@@ -19,20 +19,26 @@
 syntax = "proto2";
 package org.apache.flink.formats.protobuf.proto;
 option java_package = "org.apache.flink.formats.protobuf.testproto";
+option java_multiple_files = false;
 
-message SimpleTestNoouterNomulti {
+message SimpleTestNoMulti {
+  optional int32 a = 1;
+  optional int64 b = 2;
+  optional bool c = 3;
+  optional float d = 4;
+  optional double e = 5;
+  optional string f = 6;
+  optional bytes g = 7;
+  optional InnerMessageTest h = 8;
+  optional Result i = 9;
+
+  message InnerMessageTest{
     optional int32 a = 1;
     optional int64 b = 2;
-    optional bool c = 3;
-    optional float d = 4;
-    optional double e = 5;
-    optional string f = 6;
-    optional bytes g = 7;
-    optional InnerMessageTest h = 8;
-
-    message InnerMessageTest{
-       optional int32 a =1;
-       optional int64 b =2;
-    }
+  }
 }
 
+enum Result {
+  SUCCESS = 0;
+  FAIL = 1;
+}
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_multi.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_multi.proto
index ca024b862d6..b47274a4ff2 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_multi.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_multi.proto
@@ -23,18 +23,17 @@ option java_outer_classname = "SimpleTestOuterMultiProto";
 option java_multiple_files = true;
 
 message SimpleTestOuterMulti {
+  optional int32 a = 1;
+  optional int64 b = 2;
+  optional bool c = 3;
+  optional float d = 4;
+  optional double e = 5;
+  optional string f = 6;
+  optional bytes g = 7;
+  optional InnerMessageTest h = 8;
+
+  message InnerMessageTest{
     optional int32 a = 1;
     optional int64 b = 2;
-    optional bool c = 3;
-    optional float d = 4;
-    optional double e = 5;
-    optional string f = 6;
-    optional bytes g = 7;
-    optional InnerMessageTest h = 8;
-
-    message InnerMessageTest{
-           optional int32 a =1;
-           optional int64 b =2;
-    }
+  }
 }
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_nomulti.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_nomulti.proto
index 16459d27250..3185fcaa087 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_nomulti.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_nomulti.proto
@@ -20,20 +20,20 @@ syntax = "proto2";
 package org.apache.flink.formats.protobuf.proto;
 option java_package = "org.apache.flink.formats.protobuf.testproto";
 option java_outer_classname = "SimpleTestOuterNomultiProto";
+option java_multiple_files = false;
 
-message SimpleTestOuterNomulti {
+message SimpleTestOuterNoMulti {
+  optional int32 a = 1;
+  optional int64 b = 2;
+  optional bool c = 3;
+  optional float d = 4;
+  optional double e = 5;
+  optional string f = 6;
+  optional bytes g = 7;
+  optional InnerMessageTest h = 8;
+
+  message InnerMessageTest{
     optional int32 a = 1;
     optional int64 b = 2;
-    optional bool c = 3;
-    optional float d = 4;
-    optional double e = 5;
-    optional string f = 6;
-    optional bytes g = 7;
-    optional InnerMessageTest h = 8;
-
-    message InnerMessageTest{
-        optional int32 a =1;
-        optional int64 b =2;
-    }
+  }
 }
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_multi.proto
similarity index 88%
copy from flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
copy to flink-formats/flink-protobuf/src/test/proto/test_timestamp_multi.proto
index 814761b2cec..f7da5c47e62 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_multi.proto
@@ -16,14 +16,12 @@
  * limitations under the License.
  */
 
-syntax = "proto2";
+syntax = "proto3";
 package org.apache.flink.formats.protobuf.proto;
+import "google/protobuf/timestamp.proto";
 option java_package = "org.apache.flink.formats.protobuf.testproto";
 option java_multiple_files = true;
 
-message OneofTest {
-    oneof test_oneof{
-       int32 a = 1;
-       int32 b = 2;
-    }
+message TimestampTestMulti {
+  google.protobuf.Timestamp ts = 1;
 }
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_nomulti.proto
similarity index 84%
copy from flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
copy to flink-formats/flink-protobuf/src/test/proto/test_timestamp_nomulti.proto
index 814761b2cec..6db2e9dbe15 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_nomulti.proto
@@ -16,14 +16,12 @@
  * limitations under the License.
  */
 
-syntax = "proto2";
+syntax = "proto3";
 package org.apache.flink.formats.protobuf.proto;
+import "google/protobuf/timestamp.proto";
 option java_package = "org.apache.flink.formats.protobuf.testproto";
-option java_multiple_files = true;
+option java_multiple_files = false;
 
-message OneofTest {
-    oneof test_oneof{
-       int32 a = 1;
-       int32 b = 2;
-    }
+message TimestampTestNoMulti {
+  google.protobuf.Timestamp ts = 1;
 }
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_multi.proto
similarity index 83%
copy from flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto
copy to flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_multi.proto
index 2a73a9a111b..bc21d85f378 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_multi.proto
@@ -16,17 +16,13 @@
  * limitations under the License.
  */
 
-syntax = "proto2";
+syntax = "proto3";
 package org.apache.flink.formats.protobuf.proto;
+import "google/protobuf/timestamp.proto";
 option java_package = "org.apache.flink.formats.protobuf.testproto";
+option java_outer_classname = "TimestampTestOuterMultiProto";
 option java_multiple_files = true;
 
-message RepeatedMessageTest {
-    repeated InnerMessageTest d = 4;
-    message InnerMessageTest{
-      optional int32 a =1;
-      optional int64 b =2;
-    }
+message TimestampTestOuterMulti {
+  google.protobuf.Timestamp ts = 1;
 }
-
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_nomulti.proto
similarity index 79%
copy from flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto
copy to flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_nomulti.proto
index 2a73a9a111b..5b571e78a12 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_nomulti.proto
@@ -16,17 +16,13 @@
  * limitations under the License.
  */
 
-syntax = "proto2";
+syntax = "proto3";
 package org.apache.flink.formats.protobuf.proto;
+import "google/protobuf/timestamp.proto";
 option java_package = "org.apache.flink.formats.protobuf.testproto";
-option java_multiple_files = true;
+option java_outer_classname = "TimestampTestOuterNomultiProto";
+option java_multiple_files = false;
 
-message RepeatedMessageTest {
-    repeated InnerMessageTest d = 4;
-    message InnerMessageTest{
-      optional int32 a =1;
-      optional int64 b =2;
-    }
+message TimestampTestOuterNoMulti {
+  google.protobuf.Timestamp ts = 1;
 }
-
-