You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/01/19 12:53:07 UTC
[flink] branch master updated: [FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timestamp type
This is an automated email from the ASF dual-hosted git repository.
libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7ea4476c054 [FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timestamp type
7ea4476c054 is described below
commit 7ea4476c0544e17798cbb1e39609827954f6c266
Author: laughingman7743 <la...@gmail.com>
AuthorDate: Wed Jan 4 23:12:19 2023 +0900
[FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timestamp type
Close #21613
---
.../docs/connectors/table/formats/protobuf.md | 5 ++
flink-formats/flink-protobuf/pom.xml | 1 +
.../apache/flink/formats/protobuf/PbConstant.java | 1 +
.../flink/formats/protobuf/PbFormatContext.java | 8 +--
.../deserialize/PbCodegenArrayDeserializer.java | 3 +-
.../deserialize/PbCodegenMapDeserializer.java | 6 +-
.../deserialize/PbCodegenRowDeserializer.java | 3 +-
.../protobuf/deserialize/ProtoToRowConverter.java | 6 +-
.../serialize/PbCodegenArraySerializer.java | 3 +-
.../protobuf/serialize/PbCodegenMapSerializer.java | 6 +-
.../protobuf/serialize/PbCodegenRowSerializer.java | 11 +---
.../serialize/PbCodegenSimpleSerializer.java | 8 +--
.../protobuf/serialize/RowToProtoConverter.java | 4 +-
.../formats/protobuf/util/PbCodegenUtils.java | 19 +++----
.../flink/formats/protobuf/util/PbFormatUtils.java | 58 +++++++++++++++-----
...RowTest.java => MetaNoMultiProtoToRowTest.java} | 18 ++----
.../formats/protobuf/MetaOuterNoMultiTest.java | 4 +-
.../protobuf/SameOuterClassNameProtoToRowTest.java | 61 +++++++++++++++++++++
.../protobuf/SameOuterClassNameRowToProtoTest.java | 56 +++++++++++++++++++
.../formats/protobuf/SimpleProtoToRowTest.java | 45 +++++++++------
.../formats/protobuf/SimpleRowToProtoTest.java | 49 ++++++++++-------
.../protobuf/TimestampMultiProtoToRowTest.java | 46 ++++++++++++++++
.../protobuf/TimestampMultiRowToProtoTest.java} | 32 ++++++-----
.../protobuf/TimestampNoMultiProtoToRowTest.java | 47 ++++++++++++++++
.../protobuf/TimestampNoMultiRowToProtoTest.java | 44 +++++++++++++++
.../TimestampOuterMultiProtoToRowTest.java | 49 +++++++++++++++++
.../TimestampOuterMultiRowToProtoTest.java | 44 +++++++++++++++
.../TimestampOuterNoMultiProtoToRowTest.java | 47 ++++++++++++++++
.../TimestampOuterNoMultiRowToProtoTest.java | 47 ++++++++++++++++
.../flink-protobuf/src/test/proto/test_map.proto | 18 +++---
.../test/proto/test_multiple_level_message.proto | 26 ++++-----
.../flink-protobuf/src/test/proto/test_null.proto | 64 +++++++++++-----------
.../flink-protobuf/src/test/proto/test_oneof.proto | 8 +--
.../flink-protobuf/src/test/proto/test_pb3.proto | 48 ++++++++--------
.../src/test/proto/test_repeated.proto | 13 ++---
.../src/test/proto/test_repeated_message.proto | 12 ++--
...neof.proto => test_same_outer_class_name.proto} | 16 +++---
.../{test_simple.proto => test_simple_multi.proto} | 51 +++++++++--------
.../test/proto/test_simple_no_java_package.proto | 40 +++++++-------
...ter_nomulti.proto => test_simple_nomulti.proto} | 30 ++++++----
.../src/test/proto/test_simple_outer_multi.proto | 23 ++++----
.../src/test/proto/test_simple_outer_nomulti.proto | 26 ++++-----
...test_oneof.proto => test_timestamp_multi.proto} | 10 ++--
...st_oneof.proto => test_timestamp_nomulti.proto} | 12 ++--
...sage.proto => test_timestamp_outer_multi.proto} | 14 ++---
...ge.proto => test_timestamp_outer_nomulti.proto} | 16 ++----
46 files changed, 806 insertions(+), 352 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/formats/protobuf.md b/docs/content.zh/docs/connectors/table/formats/protobuf.md
index 5cbafc8d911..b28cf49f9d3 100644
--- a/docs/content.zh/docs/connectors/table/formats/protobuf.md
+++ b/docs/content.zh/docs/connectors/table/formats/protobuf.md
@@ -236,6 +236,11 @@ The following table lists the type mapping from Flink type to Protobuf type.
<td><code>enum</code></td>
<td>The enum value of protobuf can be mapped to string or number of flink row accordingly.</td>
</tr>
+ <tr>
+ <td><code>ROW<seconds BIGINT, nanos INT></code></td>
+ <td><code>google.protobuf.timestamp</code></td>
+ <td>The google.protobuf.timestamp type can be mapped to seconds and fractions of seconds at nanosecond resolution in UTC epoch time using the row type as well as the protobuf definition.</td>
+ </tr>
</tbody>
</table>
diff --git a/flink-formats/flink-protobuf/pom.xml b/flink-formats/flink-protobuf/pom.xml
index be38a1a4242..da593b8bc4a 100644
--- a/flink-formats/flink-protobuf/pom.xml
+++ b/flink-formats/flink-protobuf/pom.xml
@@ -109,6 +109,7 @@ under the License.
</goals>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}</protocArtifact>
+ <includeMavenTypes>direct</includeMavenTypes>
<inputDirectories>
<include>src/test/proto</include>
</inputDirectories>
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
index a886768534d..ea7d6514c56 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
@@ -26,4 +26,5 @@ public class PbConstant {
public static final String GENERATED_ENCODE_METHOD = "encode";
public static final String PB_MAP_KEY_NAME = "key";
public static final String PB_MAP_VALUE_NAME = "value";
+ public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass";
}
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
index 06370f3fa6e..27ceb0fb49d 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
@@ -20,18 +20,12 @@ package org.apache.flink.formats.protobuf;
/** store config and common information. */
public class PbFormatContext {
- private final String outerPrefix;
private final PbFormatConfig pbFormatConfig;
- public PbFormatContext(String outerPrefix, PbFormatConfig pbFormatConfig) {
- this.outerPrefix = outerPrefix;
+ public PbFormatContext(PbFormatConfig pbFormatConfig) {
this.pbFormatConfig = pbFormatConfig;
}
- public String getOuterPrefix() {
- return outerPrefix;
- }
-
public PbFormatConfig getPbFormatConfig() {
return pbFormatConfig;
}
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java
index 65bc760b0a3..202322edbbf 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java
@@ -50,8 +50,7 @@ public class PbCodegenArrayDeserializer implements PbCodegenDeserializer {
PbCodegenAppender appender = new PbCodegenAppender(indent);
PbCodegenVarId varUid = PbCodegenVarId.getInstance();
int uid = varUid.getAndIncrement();
- String protoTypeStr =
- PbCodegenUtils.getTypeStrFromProto(fd, false, formatContext.getOuterPrefix());
+ String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(fd, false);
String listPbVar = "list" + uid;
String flinkArrVar = "newArr" + uid;
String flinkArrEleVar = "subReturnVar" + uid;
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java
index 439332e29e7..fed7c84ebba 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java
@@ -58,10 +58,8 @@ public class PbCodegenMapDeserializer implements PbCodegenDeserializer {
fd.getMessageType().findFieldByName(PbConstant.PB_MAP_VALUE_NAME);
PbCodegenAppender appender = new PbCodegenAppender(indent);
- String pbKeyTypeStr =
- PbCodegenUtils.getTypeStrFromProto(keyFd, false, formatContext.getOuterPrefix());
- String pbValueTypeStr =
- PbCodegenUtils.getTypeStrFromProto(valueFd, false, formatContext.getOuterPrefix());
+ String pbKeyTypeStr = PbCodegenUtils.getTypeStrFromProto(keyFd, false);
+ String pbValueTypeStr = PbCodegenUtils.getTypeStrFromProto(valueFd, false);
String pbMapVar = "pbMap" + uid;
String pbMapEntryVar = "pbEntry" + uid;
String resultDataMapVar = "resultDataMap" + uid;
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
index 3a5b3113166..d1fc0c60726 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
@@ -54,8 +54,7 @@ public class PbCodegenRowDeserializer implements PbCodegenDeserializer {
String flinkRowDataVar = "rowData" + uid;
int fieldSize = rowType.getFieldNames().size();
- String pbMessageTypeStr =
- PbFormatUtils.getFullJavaName(descriptor, formatContext.getOuterPrefix());
+ String pbMessageTypeStr = PbFormatUtils.getFullJavaName(descriptor);
appender.appendLine(pbMessageTypeStr + " " + pbMessageVar + " = " + pbObjectCode);
appender.appendLine(
"GenericRowData " + flinkRowDataVar + " = new GenericRowData(" + fieldSize + ")");
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
index 0ecbbaef19e..4564efce2a4 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
@@ -58,8 +58,6 @@ public class ProtoToRowConverter {
public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
throws PbCodegenException {
try {
- String outerPrefix =
- PbFormatUtils.getOuterProtoPrefix(formatConfig.getMessageClassName());
Descriptors.Descriptor descriptor =
PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
Class<?> messageClass =
@@ -67,7 +65,7 @@ public class ProtoToRowConverter {
formatConfig.getMessageClassName(),
true,
Thread.currentThread().getContextClassLoader());
- String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor, outerPrefix);
+ String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
if (descriptor.getFile().getSyntax() == Syntax.PROTO3) {
// pb3 always read default values
formatConfig =
@@ -78,7 +76,7 @@ public class ProtoToRowConverter {
formatConfig.getWriteNullStringLiterals());
}
PbCodegenAppender codegenAppender = new PbCodegenAppender();
- PbFormatContext pbFormatContext = new PbFormatContext(outerPrefix, formatConfig);
+ PbFormatContext pbFormatContext = new PbFormatContext(formatConfig);
String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
String generatedClassName = "GeneratedProtoToRow_" + uuid;
String generatedPackageName = ProtoToRowConverter.class.getPackage().getName();
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenArraySerializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenArraySerializer.java
index 9d242d96e43..0f2f1bcf068 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenArraySerializer.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenArraySerializer.java
@@ -50,8 +50,7 @@ public class PbCodegenArraySerializer implements PbCodegenSerializer {
PbCodegenVarId varUid = PbCodegenVarId.getInstance();
int uid = varUid.getAndIncrement();
PbCodegenAppender appender = new PbCodegenAppender(indent);
- String protoTypeStr =
- PbCodegenUtils.getTypeStrFromProto(fd, false, formatContext.getOuterPrefix());
+ String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(fd, false);
String pbListVar = "pbList" + uid;
String flinkArrayDataVar = "arrData" + uid;
String pbElementVar = "elementPbVar" + uid;
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenMapSerializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenMapSerializer.java
index c0c9e383a00..3d223218c0c 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenMapSerializer.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenMapSerializer.java
@@ -57,10 +57,8 @@ public class PbCodegenMapSerializer implements PbCodegenSerializer {
fd.getMessageType().findFieldByName(PbConstant.PB_MAP_VALUE_NAME);
PbCodegenAppender appender = new PbCodegenAppender(indent);
- String keyProtoTypeStr =
- PbCodegenUtils.getTypeStrFromProto(keyFd, false, formatContext.getOuterPrefix());
- String valueProtoTypeStr =
- PbCodegenUtils.getTypeStrFromProto(valueFd, false, formatContext.getOuterPrefix());
+ String keyProtoTypeStr = PbCodegenUtils.getTypeStrFromProto(keyFd, false);
+ String valueProtoTypeStr = PbCodegenUtils.getTypeStrFromProto(valueFd, false);
String flinkKeyArrDataVar = "keyArrData" + uid;
String flinkValueArrDataVar = "valueArrData" + uid;
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
index 5f085a779e6..083a628b70d 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
@@ -52,8 +52,7 @@ public class PbCodegenRowSerializer implements PbCodegenSerializer {
int uid = varUid.getAndIncrement();
PbCodegenAppender appender = new PbCodegenAppender(indent);
String flinkRowDataVar = "rowData" + uid;
- String pbMessageTypeStr =
- PbFormatUtils.getFullJavaName(descriptor, formatContext.getOuterPrefix());
+ String pbMessageTypeStr = PbFormatUtils.getFullJavaName(descriptor);
String messageBuilderVar = "messageBuilder" + uid;
appender.appendLine("RowData " + flinkRowDataVar + " = " + flinkObjectCode);
appender.appendLine(
@@ -71,15 +70,11 @@ public class PbCodegenRowSerializer implements PbCodegenSerializer {
String elementPbVar = "elementPbVar" + subUid;
String elementPbTypeStr;
if (elementFd.isMapField()) {
- elementPbTypeStr =
- PbCodegenUtils.getTypeStrFromProto(
- elementFd, false, formatContext.getOuterPrefix());
+ elementPbTypeStr = PbCodegenUtils.getTypeStrFromProto(elementFd, false);
} else {
elementPbTypeStr =
PbCodegenUtils.getTypeStrFromProto(
- elementFd,
- PbFormatUtils.isArrayType(subType),
- formatContext.getOuterPrefix());
+ elementFd, PbFormatUtils.isArrayType(subType));
}
String strongCamelFieldName = PbFormatUtils.getStrongCamelCaseJsonName(fieldName);
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java
index 29d4bc9ebd8..ccf44f8283c 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java
@@ -60,9 +60,7 @@ public class PbCodegenSimpleSerializer implements PbCodegenSerializer {
case SMALLINT:
case TINYINT:
if (fd.getJavaType() == JavaType.ENUM) {
- String enumTypeStr =
- PbFormatUtils.getFullJavaName(
- fd.getEnumType(), formatContext.getOuterPrefix());
+ String enumTypeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
appender.appendLine(
resultVar
+ " = "
@@ -86,9 +84,7 @@ public class PbCodegenSimpleSerializer implements PbCodegenSerializer {
appender.appendLine(fromVar + " = " + flinkObjectCode + ".toString()");
if (fd.getJavaType() == JavaType.ENUM) {
String enumValueDescVar = "enumValueDesc" + uid;
- String enumTypeStr =
- PbFormatUtils.getFullJavaName(
- fd.getEnumType(), formatContext.getOuterPrefix());
+ String enumTypeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
appender.appendLine(
"Descriptors.EnumValueDescriptor "
+ enumValueDescVar
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
index 349049da02d..df701a515a2 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
@@ -55,11 +55,9 @@ public class RowToProtoConverter {
public RowToProtoConverter(RowType rowType, PbFormatConfig formatConfig)
throws PbCodegenException {
try {
- String outerPrefix =
- PbFormatUtils.getOuterProtoPrefix(formatConfig.getMessageClassName());
- PbFormatContext formatContext = new PbFormatContext(outerPrefix, formatConfig);
Descriptors.Descriptor descriptor =
PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
+ PbFormatContext formatContext = new PbFormatContext(formatConfig);
PbCodegenAppender codegenAppender = new PbCodegenAppender(0);
String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java
index 9cf262ee4be..042392e302b 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java
@@ -78,7 +78,7 @@ public class PbCodegenUtils {
* @return The returned code phrase will be used as java type str in codegen sections.
* @throws PbCodegenException
*/
- public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList, String outerPrefix)
+ public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList)
throws PbCodegenException {
String typeStr;
switch (fd.getJavaType()) {
@@ -90,12 +90,12 @@ public class PbCodegenUtils {
FieldDescriptor valueFd =
fd.getMessageType().findFieldByName(PbConstant.PB_MAP_VALUE_NAME);
// key and value cannot be repeated
- String keyTypeStr = getTypeStrFromProto(keyFd, false, outerPrefix);
- String valueTypeStr = getTypeStrFromProto(valueFd, false, outerPrefix);
+ String keyTypeStr = getTypeStrFromProto(keyFd, false);
+ String valueTypeStr = getTypeStrFromProto(valueFd, false);
typeStr = "Map<" + keyTypeStr + "," + valueTypeStr + ">";
} else {
// simple message
- typeStr = PbFormatUtils.getFullJavaName(fd.getMessageType(), outerPrefix);
+ typeStr = PbFormatUtils.getFullJavaName(fd.getMessageType());
}
break;
case INT:
@@ -108,7 +108,7 @@ public class PbCodegenUtils {
typeStr = "String";
break;
case ENUM:
- typeStr = PbFormatUtils.getFullJavaName(fd.getEnumType(), outerPrefix);
+ typeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
break;
case FLOAT:
typeStr = "Float";
@@ -174,11 +174,10 @@ public class PbCodegenUtils {
public static String pbDefaultValueCode(
FieldDescriptor fieldDescriptor, PbFormatContext pbFormatContext)
throws PbCodegenException {
- String outerPrefix = pbFormatContext.getOuterPrefix();
String nullLiteral = pbFormatContext.getPbFormatConfig().getWriteNullStringLiterals();
switch (fieldDescriptor.getJavaType()) {
case MESSAGE:
- return PbFormatUtils.getFullJavaName(fieldDescriptor.getMessageType(), outerPrefix)
+ return PbFormatUtils.getFullJavaName(fieldDescriptor.getMessageType())
+ ".getDefaultInstance()";
case INT:
return "0";
@@ -187,7 +186,7 @@ public class PbCodegenUtils {
case STRING:
return "\"" + nullLiteral + "\"";
case ENUM:
- return PbFormatUtils.getFullJavaName(fieldDescriptor.getEnumType(), outerPrefix)
+ return PbFormatUtils.getFullJavaName(fieldDescriptor.getEnumType())
+ ".values()[0]";
case FLOAT:
return "0.0f";
@@ -229,9 +228,7 @@ public class PbCodegenUtils {
int uid = varUid.getAndIncrement();
String flinkElementVar = "elementVar" + uid;
PbCodegenAppender appender = new PbCodegenAppender(indent);
- String protoTypeStr =
- PbCodegenUtils.getTypeStrFromProto(
- elementPbFd, false, pbFormatContext.getOuterPrefix());
+ String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(elementPbFd, false);
String dataTypeStr = PbCodegenUtils.getTypeStrFromLogicType(elementDataType);
appender.appendLine(protoTypeStr + " " + resultPbVar);
appender.begin("if(" + flinkArrDataVar + ".isNullAt(" + iVar + ")){");
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbFormatUtils.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbFormatUtils.java
index 84cd35c98cf..1f972bb5752 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbFormatUtils.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbFormatUtils.java
@@ -28,25 +28,25 @@ import com.google.protobuf.ProtobufInternalUtils;
/** Protobuf function util. */
public class PbFormatUtils {
- public static String getFullJavaName(Descriptors.Descriptor descriptor, String outerProtoName) {
+ public static String getFullJavaName(Descriptors.Descriptor descriptor) {
if (null != descriptor.getContainingType()) {
// nested type
- String parentJavaFullName =
- getFullJavaName(descriptor.getContainingType(), outerProtoName);
+ String parentJavaFullName = getFullJavaName(descriptor.getContainingType());
return parentJavaFullName + "." + descriptor.getName();
} else {
// top level message
+ String outerProtoName = getOuterProtoPrefix(descriptor.getFile());
return outerProtoName + descriptor.getName();
}
}
- public static String getFullJavaName(
- Descriptors.EnumDescriptor enumDescriptor, String outerProtoName) {
+ public static String getFullJavaName(Descriptors.EnumDescriptor enumDescriptor) {
if (null != enumDescriptor.getContainingType()) {
- return getFullJavaName(enumDescriptor.getContainingType(), outerProtoName)
+ return getFullJavaName(enumDescriptor.getContainingType())
+ "."
+ enumDescriptor.getName();
} else {
+ String outerProtoName = getOuterProtoPrefix(enumDescriptor.getFile());
return outerProtoName + enumDescriptor.getName();
}
}
@@ -72,14 +72,46 @@ public class PbFormatUtils {
return ProtobufInternalUtils.underScoreToCamelCase(name, true);
}
- public static String getOuterProtoPrefix(String name) {
- name = name.replace('$', '.');
- int index = name.lastIndexOf('.');
- if (index != -1) {
- // include dot
- return name.substring(0, index + 1);
+ public static String getOuterClassName(Descriptors.FileDescriptor fileDescriptor) {
+ if (fileDescriptor.getOptions().hasJavaOuterClassname()) {
+ return fileDescriptor.getOptions().getJavaOuterClassname();
} else {
- return "";
+ String[] fileNames = fileDescriptor.getName().split("/");
+ String fileName = fileNames[fileNames.length - 1];
+ String outerName = getStrongCamelCaseJsonName(fileName.split("\\.")[0]);
+ // https://developers.google.com/protocol-buffers/docs/reference/java-generated#invocation
+ // The name of the wrapper class is determined by converting the base name of the .proto
+ // file to camel case if the java_outer_classname option is not specified.
+ // For example, foo_bar.proto produces the class name FooBar. If there is a service,
+ // enum, or message (including nested types) in the file with the same name,
+ // "OuterClass" will be appended to the wrapper class's name.
+ boolean hasSameNameMessage =
+ fileDescriptor.getMessageTypes().stream()
+ .anyMatch(f -> f.getName().equals(outerName));
+ boolean hasSameNameEnum =
+ fileDescriptor.getEnumTypes().stream()
+ .anyMatch(f -> f.getName().equals(outerName));
+ boolean hasSameNameService =
+ fileDescriptor.getServices().stream()
+ .anyMatch(f -> f.getName().equals(outerName));
+ if (hasSameNameMessage || hasSameNameEnum || hasSameNameService) {
+ return outerName + PbConstant.PB_OUTER_CLASS_SUFFIX;
+ } else {
+ return outerName;
+ }
+ }
+ }
+
+ public static String getOuterProtoPrefix(Descriptors.FileDescriptor fileDescriptor) {
+ String javaPackageName =
+ fileDescriptor.getOptions().hasJavaPackage()
+ ? fileDescriptor.getOptions().getJavaPackage()
+ : fileDescriptor.getPackage();
+ if (fileDescriptor.getOptions().getJavaMultipleFiles()) {
+ return javaPackageName + ".";
+ } else {
+ String outerClassName = getOuterClassName(fileDescriptor);
+ return javaPackageName + "." + outerClassName + ".";
}
}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoOuterNoMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoMultiProtoToRowTest.java
similarity index 78%
rename from flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoOuterNoMultiProtoToRowTest.java
rename to flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoMultiProtoToRowTest.java
index 32269393e17..bb83169d96c 100644
--- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoOuterNoMultiProtoToRowTest.java
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoMultiProtoToRowTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.formats.protobuf;
import org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema;
import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
-import org.apache.flink.formats.protobuf.testproto.TestSimpleNoouterNomulti;
+import org.apache.flink.formats.protobuf.testproto.TestSimpleNomulti;
import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
@@ -39,18 +39,15 @@ import org.junit.Test;
*
* <p>It is valid proto definition.
*/
-public class MetaNoOuterNoMultiProtoToRowTest {
+public class MetaNoMultiProtoToRowTest {
@Test
public void testSimple() throws Exception {
RowType rowType =
PbToRowTypeUtil.generateRowType(
- TestSimpleNoouterNomulti.SimpleTestNoouterNomulti.getDescriptor());
+ TestSimpleNomulti.SimpleTestNoMulti.getDescriptor());
PbFormatConfig formatConfig =
new PbFormatConfig(
- TestSimpleNoouterNomulti.SimpleTestNoouterNomulti.class.getName(),
- false,
- false,
- "");
+ TestSimpleNomulti.SimpleTestNoMulti.class.getName(), false, false, "");
new PbRowDataDeserializationSchema(rowType, InternalTypeInfo.of(rowType), formatConfig)
.open(null);
@@ -62,13 +59,10 @@ public class MetaNoOuterNoMultiProtoToRowTest {
public void testOuterClassName() throws Exception {
RowType rowType =
PbToRowTypeUtil.generateRowType(
- TestSimpleNoouterNomulti.SimpleTestNoouterNomulti.getDescriptor());
+ TestSimpleNomulti.SimpleTestNoMulti.getDescriptor());
PbFormatConfig formatConfig =
new PbFormatConfig(
- TestSimpleNoouterNomulti.SimpleTestNoouterNomulti.class.getName(),
- false,
- false,
- "");
+ TestSimpleNomulti.SimpleTestNoMulti.class.getName(), false, false, "");
new PbRowDataDeserializationSchema(rowType, InternalTypeInfo.of(rowType), formatConfig)
.open(null);
new PbRowDataSerializationSchema(rowType, formatConfig).open(null);
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java
index 7e70a5561b5..afcd3498bb0 100644
--- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java
@@ -45,10 +45,10 @@ public class MetaOuterNoMultiTest {
public void testSimple() throws Exception {
RowType rowType =
PbToRowTypeUtil.generateRowType(
- SimpleTestOuterNomultiProto.SimpleTestOuterNomulti.getDescriptor());
+ SimpleTestOuterNomultiProto.SimpleTestOuterNoMulti.getDescriptor());
PbFormatConfig formatConfig =
new PbFormatConfig(
- SimpleTestOuterNomultiProto.SimpleTestOuterNomulti.class.getName(),
+ SimpleTestOuterNomultiProto.SimpleTestOuterNoMulti.class.getName(),
false,
false,
"");
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameProtoToRowTest.java
new file mode 100644
index 00000000000..b049f2fb907
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameProtoToRowTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TestSameOuterClassNameOuterClass;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test conversion of proto same outer class name data to flink internal data. */
+public class SameOuterClassNameProtoToRowTest {
+
+ @Test
+ public void testSimple() throws Exception {
+ TestSameOuterClassNameOuterClass.TestSameOuterClassName testSameOuterClassName =
+ TestSameOuterClassNameOuterClass.TestSameOuterClassName.newBuilder()
+ .setA(1)
+ .setB(TestSameOuterClassNameOuterClass.FooBar.BAR)
+ .build();
+ RowData row =
+ ProtobufTestHelper.pbBytesToRow(
+ TestSameOuterClassNameOuterClass.TestSameOuterClassName.class,
+ testSameOuterClassName.toByteArray());
+
+ assertEquals(1, row.getInt(0));
+ assertEquals("BAR", row.getString(1).toString());
+ }
+
+ @Test
+ public void testIntEnum() throws Exception {
+ TestSameOuterClassNameOuterClass.TestSameOuterClassName testSameOuterClassName =
+ TestSameOuterClassNameOuterClass.TestSameOuterClassName.newBuilder()
+ .setB(TestSameOuterClassNameOuterClass.FooBar.BAR)
+ .build();
+
+ RowData row =
+ ProtobufTestHelper.pbBytesToRow(
+ TestSameOuterClassNameOuterClass.TestSameOuterClassName.class,
+ testSameOuterClassName.toByteArray(),
+ true);
+ assertEquals(1, row.getInt(1));
+ }
+}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameRowToProtoTest.java
new file mode 100644
index 00000000000..9e9e3f7fe69
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameRowToProtoTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TestSameOuterClassNameOuterClass;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test conversion of flink internal primitive data to same outer class name proto data. */
+public class SameOuterClassNameRowToProtoTest {
+ @Test
+ public void testSimple() throws Exception {
+ RowData row = GenericRowData.of(1, StringData.fromString("BAR"));
+
+ byte[] bytes =
+ ProtobufTestHelper.rowToPbBytes(
+ row, TestSameOuterClassNameOuterClass.TestSameOuterClassName.class);
+ TestSameOuterClassNameOuterClass.TestSameOuterClassName testSameOuterClassName =
+ TestSameOuterClassNameOuterClass.TestSameOuterClassName.parseFrom(bytes);
+ assertEquals(1, testSameOuterClassName.getA());
+ assertEquals(TestSameOuterClassNameOuterClass.FooBar.BAR, testSameOuterClassName.getB());
+ }
+
+ @Test
+ public void testEnumAsInt() throws Exception {
+ RowData row = GenericRowData.of(1, 1);
+
+ byte[] bytes =
+ ProtobufTestHelper.rowToPbBytes(
+ row, TestSameOuterClassNameOuterClass.TestSameOuterClassName.class, true);
+ TestSameOuterClassNameOuterClass.TestSameOuterClassName testSameOuterClassName =
+ TestSameOuterClassNameOuterClass.TestSameOuterClassName.parseFrom(bytes);
+ assertEquals(TestSameOuterClassNameOuterClass.FooBar.BAR, testSameOuterClassName.getB());
+ }
+}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java
index f7cfa331109..2409e60db7e 100644
--- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java
@@ -18,7 +18,8 @@
package org.apache.flink.formats.protobuf;
-import org.apache.flink.formats.protobuf.testproto.SimpleTest;
+import org.apache.flink.formats.protobuf.testproto.SimpleTestMulti;
+import org.apache.flink.formats.protobuf.testproto.Status;
import org.apache.flink.table.data.RowData;
import com.google.protobuf.ByteString;
@@ -33,8 +34,8 @@ import static org.junit.Assert.assertTrue;
public class SimpleProtoToRowTest {
@Test
public void testSimple() throws Exception {
- SimpleTest simple =
- SimpleTest.newBuilder()
+ SimpleTestMulti simple =
+ SimpleTestMulti.newBuilder()
.setA(1)
.setB(2L)
.setC(false)
@@ -42,14 +43,15 @@ public class SimpleProtoToRowTest {
.setE(0.01)
.setF("haha")
.setG(ByteString.copyFrom(new byte[] {1}))
- .setH(SimpleTest.Corpus.IMAGES)
+ .setH(SimpleTestMulti.Corpus.IMAGES)
+ .setI(Status.FINISHED)
.setFAbc7D(1) // test fieldNameToJsonName
.setVpr6S(2)
.build();
- RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTest.class, simple.toByteArray());
+ RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTestMulti.class, simple.toByteArray());
- assertEquals(10, row.getArity());
+ assertEquals(11, row.getArity());
assertEquals(1, row.getInt(0));
assertEquals(2L, row.getLong(1));
assertFalse(row.getBoolean(2));
@@ -58,14 +60,15 @@ public class SimpleProtoToRowTest {
assertEquals("haha", row.getString(5).toString());
assertEquals(1, (row.getBinary(6))[0]);
assertEquals("IMAGES", row.getString(7).toString());
- assertEquals(1, row.getInt(8));
- assertEquals(2, row.getInt(9));
+ assertEquals("FINISHED", row.getString(8).toString());
+ assertEquals(1, row.getInt(9));
+ assertEquals(2, row.getInt(10));
}
@Test
public void testNotExistsValueIgnoringDefault() throws Exception {
- SimpleTest simple =
- SimpleTest.newBuilder()
+ SimpleTestMulti simple =
+ SimpleTestMulti.newBuilder()
.setB(2L)
.setC(false)
.setD(0.1f)
@@ -73,7 +76,7 @@ public class SimpleProtoToRowTest {
.setF("haha")
.build();
- RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTest.class, simple.toByteArray());
+ RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTestMulti.class, simple.toByteArray());
assertTrue(row.isNullAt(0));
assertFalse(row.isNullAt(1));
@@ -81,13 +84,13 @@ public class SimpleProtoToRowTest {
@Test
public void testDefaultValues() throws Exception {
- SimpleTest simple = SimpleTest.newBuilder().build();
+ SimpleTestMulti simple = SimpleTestMulti.newBuilder().build();
RowData row =
ProtobufTestHelper.pbBytesToRow(
- SimpleTest.class,
+ SimpleTestMulti.class,
simple.toByteArray(),
- new PbFormatConfig(SimpleTest.class.getName(), false, true, ""),
+ new PbFormatConfig(SimpleTestMulti.class.getName(), false, true, ""),
false);
assertFalse(row.isNullAt(0));
@@ -98,6 +101,7 @@ public class SimpleProtoToRowTest {
assertFalse(row.isNullAt(5));
assertFalse(row.isNullAt(6));
assertFalse(row.isNullAt(7));
+ assertFalse(row.isNullAt(8));
assertEquals(10, row.getInt(0));
assertEquals(100L, row.getLong(1));
assertFalse(row.getBoolean(2));
@@ -105,13 +109,20 @@ public class SimpleProtoToRowTest {
assertEquals(0.0d, row.getDouble(4), 0.0001);
assertEquals("f", row.getString(5).toString());
assertArrayEquals(ByteString.EMPTY.toByteArray(), row.getBinary(6));
- assertEquals(SimpleTest.Corpus.UNIVERSAL.toString(), row.getString(7).toString());
+ assertEquals(SimpleTestMulti.Corpus.UNIVERSAL.toString(), row.getString(7).toString());
+ assertEquals(Status.UNSPECIFIED.toString(), row.getString(8).toString());
}
@Test
public void testIntEnum() throws Exception {
- SimpleTest simple = SimpleTest.newBuilder().setH(SimpleTest.Corpus.IMAGES).build();
- RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTest.class, simple.toByteArray(), true);
+ SimpleTestMulti simple =
+ SimpleTestMulti.newBuilder()
+ .setH(SimpleTestMulti.Corpus.IMAGES)
+ .setI(Status.STARTED)
+ .build();
+ RowData row =
+ ProtobufTestHelper.pbBytesToRow(SimpleTestMulti.class, simple.toByteArray(), true);
assertEquals(2, row.getInt(7));
+ assertEquals(1, row.getInt(8));
}
}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java
index eccee930892..04b059b8bcc 100644
--- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java
@@ -18,7 +18,8 @@
package org.apache.flink.formats.protobuf;
-import org.apache.flink.formats.protobuf.testproto.SimpleTest;
+import org.apache.flink.formats.protobuf.testproto.SimpleTestMulti;
+import org.apache.flink.formats.protobuf.testproto.Status;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
@@ -43,21 +44,23 @@ public class SimpleRowToProtoTest {
StringData.fromString("hello"),
new byte[] {1},
StringData.fromString("IMAGES"),
+ StringData.fromString("FINISHED"),
1,
2);
- byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTest.class);
- SimpleTest simpleTest = SimpleTest.parseFrom(bytes);
- assertTrue(simpleTest.hasA());
- assertEquals(1, simpleTest.getA());
- assertEquals(2L, simpleTest.getB());
- assertFalse(simpleTest.getC());
- assertEquals(Float.valueOf(0.1f), Float.valueOf(simpleTest.getD()));
- assertEquals(Double.valueOf(0.01d), Double.valueOf(simpleTest.getE()));
- assertEquals("hello", simpleTest.getF());
- assertEquals(1, simpleTest.getG().byteAt(0));
- assertEquals(SimpleTest.Corpus.IMAGES, simpleTest.getH());
- assertEquals(1, simpleTest.getFAbc7D());
+ byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTestMulti.class);
+ SimpleTestMulti simpleTestMulti = SimpleTestMulti.parseFrom(bytes);
+ assertTrue(simpleTestMulti.hasA());
+ assertEquals(1, simpleTestMulti.getA());
+ assertEquals(2L, simpleTestMulti.getB());
+ assertFalse(simpleTestMulti.getC());
+ assertEquals(Float.valueOf(0.1f), Float.valueOf(simpleTestMulti.getD()));
+ assertEquals(Double.valueOf(0.01d), Double.valueOf(simpleTestMulti.getE()));
+ assertEquals("hello", simpleTestMulti.getF());
+ assertEquals(1, simpleTestMulti.getG().byteAt(0));
+ assertEquals(SimpleTestMulti.Corpus.IMAGES, simpleTestMulti.getH());
+ assertEquals(Status.FINISHED, simpleTestMulti.getI());
+ assertEquals(1, simpleTestMulti.getFAbc7D());
}
@Test
@@ -72,14 +75,16 @@ public class SimpleRowToProtoTest {
StringData.fromString("hello"),
null,
null,
+ null,
1,
2);
- byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTest.class);
- SimpleTest simpleTest = SimpleTest.parseFrom(bytes);
- assertFalse(simpleTest.hasA());
- assertFalse(simpleTest.hasG());
- assertFalse(simpleTest.hasH());
+ byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTestMulti.class);
+ SimpleTestMulti simpleTestMulti = SimpleTestMulti.parseFrom(bytes);
+ assertFalse(simpleTestMulti.hasA());
+ assertFalse(simpleTestMulti.hasG());
+ assertFalse(simpleTestMulti.hasH());
+ assertFalse(simpleTestMulti.hasI());
}
@Test
@@ -87,10 +92,12 @@ public class SimpleRowToProtoTest {
RowData row =
GenericRowData.of(
null, null, null, null, null, null, null, 2, // CORPUS: IMAGE
+ 1, // STATUS: STARTED
null, null);
- byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTest.class, true);
- SimpleTest simpleTest = SimpleTest.parseFrom(bytes);
- assertEquals(SimpleTest.Corpus.IMAGES, simpleTest.getH());
+ byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTestMulti.class, true);
+ SimpleTestMulti simpleTestMulti = SimpleTestMulti.parseFrom(bytes);
+ assertEquals(SimpleTestMulti.Corpus.IMAGES, simpleTestMulti.getH());
+ assertEquals(Status.STARTED, simpleTestMulti.getI());
}
}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiProtoToRowTest.java
new file mode 100644
index 00000000000..e567a789da1
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiProtoToRowTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TimestampTestMulti;
+import org.apache.flink.table.data.RowData;
+
+import com.google.protobuf.Timestamp;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test conversion of proto timestamp data with multiple_files options to flink internal data. */
+public class TimestampMultiProtoToRowTest {
+
+ @Test
+ public void testSimple() throws Exception {
+ TimestampTestMulti timestampTestMulti =
+ TimestampTestMulti.newBuilder()
+ .setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
+ .build();
+ RowData row =
+ ProtobufTestHelper.pbBytesToRow(
+ TimestampTestMulti.class, timestampTestMulti.toByteArray());
+
+ RowData rowData = row.getRow(0, 2);
+ assertEquals(1672498800, rowData.getLong(0));
+ assertEquals(123, rowData.getInt(1));
+ }
+}
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiRowToProtoTest.java
similarity index 50%
copy from flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
copy to flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiRowToProtoTest.java
index 06370f3fa6e..213f42661a2 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiRowToProtoTest.java
@@ -18,21 +18,27 @@
package org.apache.flink.formats.protobuf;
-/** store config and common information. */
-public class PbFormatContext {
- private final String outerPrefix;
- private final PbFormatConfig pbFormatConfig;
+import org.apache.flink.formats.protobuf.testproto.TimestampTestMulti;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
- public PbFormatContext(String outerPrefix, PbFormatConfig pbFormatConfig) {
- this.outerPrefix = outerPrefix;
- this.pbFormatConfig = pbFormatConfig;
- }
+import org.junit.Test;
- public String getOuterPrefix() {
- return outerPrefix;
- }
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test conversion of flink internal primitive data to proto timestamp data with multiple_files
+ * options.
+ */
+public class TimestampMultiRowToProtoTest {
+
+ @Test
+ public void testSimple() throws Exception {
+ RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123));
- public PbFormatConfig getPbFormatConfig() {
- return pbFormatConfig;
+ byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, TimestampTestMulti.class);
+ TimestampTestMulti timestampTestMulti = TimestampTestMulti.parseFrom(bytes);
+ assertEquals(1672498800, timestampTestMulti.getTs().getSeconds());
+ assertEquals(123, timestampTestMulti.getTs().getNanos());
}
}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiProtoToRowTest.java
new file mode 100644
index 00000000000..55917933bc2
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiProtoToRowTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TestTimestampNomulti;
+import org.apache.flink.table.data.RowData;
+
+import com.google.protobuf.Timestamp;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test conversion of proto timestamp data to flink internal data. */
+public class TimestampNoMultiProtoToRowTest {
+
+ @Test
+ public void testSimple() throws Exception {
+ TestTimestampNomulti.TimestampTestNoMulti timestampTestNoMulti =
+ TestTimestampNomulti.TimestampTestNoMulti.newBuilder()
+ .setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
+ .build();
+ RowData row =
+ ProtobufTestHelper.pbBytesToRow(
+ TestTimestampNomulti.TimestampTestNoMulti.class,
+ timestampTestNoMulti.toByteArray());
+
+ RowData rowData = row.getRow(0, 2);
+ assertEquals(1672498800, rowData.getLong(0));
+ assertEquals(123, rowData.getInt(1));
+ }
+}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiRowToProtoTest.java
new file mode 100644
index 00000000000..65cd877b2bf
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiRowToProtoTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TestTimestampNomulti;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test conversion of flink internal primitive data to proto timestamp data. */
+public class TimestampNoMultiRowToProtoTest {
+
+ @Test
+ public void testSimple() throws Exception {
+ RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123));
+
+ byte[] bytes =
+ ProtobufTestHelper.rowToPbBytes(
+ row, TestTimestampNomulti.TimestampTestNoMulti.class);
+ TestTimestampNomulti.TimestampTestNoMulti timestampTestNoMulti =
+ TestTimestampNomulti.TimestampTestNoMulti.parseFrom(bytes);
+ assertEquals(1672498800, timestampTestNoMulti.getTs().getSeconds());
+ assertEquals(123, timestampTestNoMulti.getTs().getNanos());
+ }
+}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiProtoToRowTest.java
new file mode 100644
index 00000000000..935c17c169b
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiProtoToRowTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterMulti;
+import org.apache.flink.table.data.RowData;
+
+import com.google.protobuf.Timestamp;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test conversion of proto timestamp data with multiple_files and outer_classname options to flink
+ * internal data.
+ */
+public class TimestampOuterMultiProtoToRowTest {
+
+ @Test
+ public void testSimple() throws Exception {
+ TimestampTestOuterMulti timestampTestOuterMulti =
+ TimestampTestOuterMulti.newBuilder()
+ .setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
+ .build();
+ RowData row =
+ ProtobufTestHelper.pbBytesToRow(
+ TimestampTestOuterMulti.class, timestampTestOuterMulti.toByteArray());
+
+ RowData rowData = row.getRow(0, 2);
+ assertEquals(1672498800, rowData.getLong(0));
+ assertEquals(123, rowData.getInt(1));
+ }
+}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiRowToProtoTest.java
new file mode 100644
index 00000000000..1f27ed60f0f
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiRowToProtoTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterMulti;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test conversion of flink internal primitive data to proto timestamp data with multiple_files and
+ * outer_classname options.
+ */
+public class TimestampOuterMultiRowToProtoTest {
+
+ @Test
+ public void testSimple() throws Exception {
+ RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123));
+
+ byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, TimestampTestOuterMulti.class);
+ TimestampTestOuterMulti timestampTestOuterMulti = TimestampTestOuterMulti.parseFrom(bytes);
+ assertEquals(1672498800, timestampTestOuterMulti.getTs().getSeconds());
+ assertEquals(123, timestampTestOuterMulti.getTs().getNanos());
+ }
+}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiProtoToRowTest.java
new file mode 100644
index 00000000000..5c2c08fe435
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiProtoToRowTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterNomultiProto;
+import org.apache.flink.table.data.RowData;
+
+import com.google.protobuf.Timestamp;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test conversion of proto timestamp data with outer_classname options to flink internal data. */
+public class TimestampOuterNoMultiProtoToRowTest {
+
+ @Test
+ public void testSimple() throws Exception {
+ TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti timestampTestOuterNoMulti =
+ TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.newBuilder()
+ .setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
+ .build();
+ RowData row =
+ ProtobufTestHelper.pbBytesToRow(
+ TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.class,
+ timestampTestOuterNoMulti.toByteArray());
+
+ RowData rowData = row.getRow(0, 2);
+ assertEquals(1672498800, rowData.getLong(0));
+ assertEquals(123, rowData.getInt(1));
+ }
+}
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiRowToProtoTest.java
new file mode 100644
index 00000000000..208b49ab780
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiRowToProtoTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterNomultiProto;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test conversion of flink internal primitive data to proto timestamp data with outer_classname
+ * options.
+ */
+public class TimestampOuterNoMultiRowToProtoTest {
+
+ @Test
+ public void testSimple() throws Exception {
+ RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123));
+
+ byte[] bytes =
+ ProtobufTestHelper.rowToPbBytes(
+ row, TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.class);
+ TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti timestampTestOuterNoMulti =
+ TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.parseFrom(bytes);
+ assertEquals(1672498800, timestampTestOuterNoMulti.getTs().getSeconds());
+ assertEquals(123, timestampTestOuterNoMulti.getTs().getNanos());
+ }
+}
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_map.proto b/flink-formats/flink-protobuf/src/test/proto/test_map.proto
index e22f771bfe3..cce7daa1143 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_map.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_map.proto
@@ -22,16 +22,14 @@ option java_package = "org.apache.flink.formats.protobuf.testproto";
option java_multiple_files = true;
message MapTest {
- optional int32 a = 1;
- map<string, string> map1 = 2;
- map<string, InnerMessageTest> map2 = 3;
- map<string, bytes> map3 = 4;
+ optional int32 a = 1;
+ map<string, string> map1 = 2;
+ map<string, InnerMessageTest> map2 = 3;
+ map<string, bytes> map3 = 4;
- message InnerMessageTest{
- optional int32 a =1;
- optional int64 b =2;
- }
+ message InnerMessageTest{
+ optional int32 a = 1;
+ optional int64 b = 2;
+ }
}
-
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_multiple_level_message.proto b/flink-formats/flink-protobuf/src/test/proto/test_multiple_level_message.proto
index ba0c320d35f..0829f26132b 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_multiple_level_message.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_multiple_level_message.proto
@@ -22,21 +22,17 @@ option java_package = "org.apache.flink.formats.protobuf.testproto";
option java_multiple_files = true;
message MultipleLevelMessageTest {
- optional int32 a = 1;
- optional int64 b = 2;
- optional bool c = 3;
- optional InnerMessageTest1 d = 4;
+ optional int32 a = 1;
+ optional int64 b = 2;
+ optional bool c = 3;
+ optional InnerMessageTest1 d = 4;
- message InnerMessageTest1{
- optional InnerMessageTest2 a = 1;
- optional bool c = 2;
- message InnerMessageTest2{
- optional int32 a =1;
- optional int64 b =2;
- }
+ message InnerMessageTest1{
+ optional InnerMessageTest2 a = 1;
+ optional bool c = 2;
+ message InnerMessageTest2{
+ optional int32 a = 1;
+ optional int64 b = 2;
}
+ }
}
-
-
-
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_null.proto b/flink-formats/flink-protobuf/src/test/proto/test_null.proto
index b17b15336b4..cd3d16d962c 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_null.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_null.proto
@@ -22,40 +22,38 @@ option java_package = "org.apache.flink.formats.protobuf.testproto";
option java_multiple_files = true;
message NullTest {
- map<string, string> string_map = 1;
- map<int32, int32> int_map = 2;
- map<int64, int64> long_map = 3;
- map<bool, bool> boolean_map = 4;
- map<string, float> float_map = 5;
- map<string, double> double_map = 6;
- map<string, Corpus> enum_map = 7;
- map<string, InnerMessageTest> message_map = 8;
- map<string, bytes> bytes_map=9;
+ map<string, string> string_map = 1;
+ map<int32, int32> int_map = 2;
+ map<int64, int64> long_map = 3;
+ map<bool, bool> boolean_map = 4;
+ map<string, float> float_map = 5;
+ map<string, double> double_map = 6;
+ map<string, Corpus> enum_map = 7;
+ map<string, InnerMessageTest> message_map = 8;
+ map<string, bytes> bytes_map = 9;
- repeated string string_array = 10;
- repeated int32 int_array = 11;
- repeated int64 long_array = 12;
- repeated bool boolean_array = 13;
- repeated float float_array = 14;
- repeated double double_array = 15;
- repeated Corpus enum_array = 16;
- repeated InnerMessageTest message_array = 17;
- repeated bytes bytes_array = 18;
+ repeated string string_array = 10;
+ repeated int32 int_array = 11;
+ repeated int64 long_array = 12;
+ repeated bool boolean_array = 13;
+ repeated float float_array = 14;
+ repeated double double_array = 15;
+ repeated Corpus enum_array = 16;
+ repeated InnerMessageTest message_array = 17;
+ repeated bytes bytes_array = 18;
- message InnerMessageTest{
- optional int32 a =1;
- optional int64 b =2;
- }
+ message InnerMessageTest{
+ optional int32 a = 1;
+ optional int64 b = 2;
+ }
- enum Corpus {
- UNIVERSAL = 0;
- WEB = 1;
- IMAGES = 2;
- LOCAL = 3;
- NEWS = 4;
- PRODUCTS = 5;
- VIDEO = 7;
- }
+ enum Corpus {
+ UNIVERSAL = 0;
+ WEB = 1;
+ IMAGES = 2;
+ LOCAL = 3;
+ NEWS = 4;
+ PRODUCTS = 5;
+ VIDEO = 7;
+ }
}
-
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto b/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
index 814761b2cec..b81cf39a31b 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
@@ -22,8 +22,8 @@ option java_package = "org.apache.flink.formats.protobuf.testproto";
option java_multiple_files = true;
message OneofTest {
- oneof test_oneof{
- int32 a = 1;
- int32 b = 2;
- }
+ oneof test_oneof{
+ int32 a = 1;
+ int32 b = 2;
+ }
}
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_pb3.proto b/flink-formats/flink-protobuf/src/test/proto/test_pb3.proto
index b21ca840f65..bbc5f4df4b1 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_pb3.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_pb3.proto
@@ -22,32 +22,30 @@ option java_package = "org.apache.flink.formats.protobuf.testproto";
option java_multiple_files = true;
message Pb3Test {
+ int32 a = 1;
+ int64 b = 2;
+ string c = 3;
+ float d = 4;
+ double e = 5;
+ Corpus f = 6;
+ InnerMessageTest g = 7;
+ repeated InnerMessageTest h = 8;
+ bytes i = 9;
+ map<string, string> map1 = 10;
+ map<string, InnerMessageTest> map2 = 11;
+
+ message InnerMessageTest{
int32 a = 1;
int64 b = 2;
- string c = 3;
- float d = 4;
- double e = 5;
- Corpus f = 6;
- InnerMessageTest g = 7;
- repeated InnerMessageTest h = 8;
- bytes i = 9;
- map<string, string> map1 = 10;
- map<string, InnerMessageTest> map2 = 11;
-
- message InnerMessageTest{
- int32 a =1;
- int64 b =2;
- }
+ }
- enum Corpus {
- UNIVERSAL = 0;
- WEB = 1;
- IMAGES = 2;
- LOCAL = 3;
- NEWS = 4;
- PRODUCTS = 5;
- VIDEO = 7;
- }
+ enum Corpus {
+ UNIVERSAL = 0;
+ WEB = 1;
+ IMAGES = 2;
+ LOCAL = 3;
+ NEWS = 4;
+ PRODUCTS = 5;
+ VIDEO = 7;
+ }
}
-
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_repeated.proto b/flink-formats/flink-protobuf/src/test/proto/test_repeated.proto
index 90cb9a58bed..75dff2ef78b 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_repeated.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_repeated.proto
@@ -22,11 +22,10 @@ option java_package = "org.apache.flink.formats.protobuf.testproto";
option java_multiple_files = true;
message RepeatedTest {
- optional int32 a = 1;
- repeated int64 b = 2;
- optional bool c = 3;
- optional float d = 4;
- optional double e = 5;
- optional string f = 6;
+ optional int32 a = 1;
+ repeated int64 b = 2;
+ optional bool c = 3;
+ optional float d = 4;
+ optional double e = 5;
+ optional string f = 6;
}
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto b/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto
index 2a73a9a111b..4e0e9694521 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto
@@ -22,11 +22,9 @@ option java_package = "org.apache.flink.formats.protobuf.testproto";
option java_multiple_files = true;
message RepeatedMessageTest {
- repeated InnerMessageTest d = 4;
- message InnerMessageTest{
- optional int32 a =1;
- optional int64 b =2;
- }
+ repeated InnerMessageTest d = 4;
+ message InnerMessageTest{
+ optional int32 a = 1;
+ optional int64 b = 2;
+ }
}
-
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto b/flink-formats/flink-protobuf/src/test/proto/test_same_outer_class_name.proto
similarity index 86%
copy from flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
copy to flink-formats/flink-protobuf/src/test/proto/test_same_outer_class_name.proto
index 814761b2cec..2941b6b65fe 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_same_outer_class_name.proto
@@ -16,14 +16,16 @@
* limitations under the License.
*/
-syntax = "proto2";
+syntax = "proto3";
package org.apache.flink.formats.protobuf.proto;
option java_package = "org.apache.flink.formats.protobuf.testproto";
-option java_multiple_files = true;
-message OneofTest {
- oneof test_oneof{
- int32 a = 1;
- int32 b = 2;
- }
+message TestSameOuterClassName {
+ int32 a = 1;
+ FooBar b = 2;
+}
+
+enum FooBar {
+ FOO = 0;
+ BAR = 1;
}
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_multi.proto
similarity index 60%
rename from flink-formats/flink-protobuf/src/test/proto/test_simple.proto
rename to flink-formats/flink-protobuf/src/test/proto/test_simple_multi.proto
index e5d01f740c7..9ee527e0784 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_simple.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_multi.proto
@@ -21,29 +21,34 @@ package org.apache.flink.formats.protobuf.proto;
option java_package = "org.apache.flink.formats.protobuf.testproto";
option java_multiple_files = true;
-message SimpleTest {
- optional int32 a = 1 [default=10];
- optional int64 b = 2 [default=100];
- optional bool c = 3;
- optional float d = 4;
- optional double e = 5;
- optional string f = 6 [default="f"];;
- optional bytes g = 7;
- optional Corpus h = 8;
- //this is must because protobuf have some field name parse bug if number is after "_".
- optional int32 f_abc_7d = 9;
- optional int32 vpr6s = 10;
-
- enum Corpus {
- UNIVERSAL = 0;
- WEB = 1;
- IMAGES = 2;
- LOCAL = 3;
- NEWS = 4;
- PRODUCTS = 5;
- VIDEO = 7;
- }
+message SimpleTestMulti {
+ optional int32 a = 1 [default = 10];
+ optional int64 b = 2 [default = 100];
+ optional bool c = 3;
+ optional float d = 4;
+ optional double e = 5;
+ optional string f = 6 [default = "f"];
+ optional bytes g = 7;
+ optional Corpus h = 8;
+ optional Status i = 9;
+ //this is must because protobuf have some field name parse bug if number is after "_".
+ optional int32 f_abc_7d = 10;
+ optional int32 vpr6s = 11;
+ enum Corpus {
+ UNIVERSAL = 0;
+ WEB = 1;
+ IMAGES = 2;
+ LOCAL = 3;
+ NEWS = 4;
+ PRODUCTS = 5;
+ VIDEO = 7;
+ }
}
-
+enum Status {
+ UNSPECIFIED = 0;
+ STARTED = 1;
+ RUNNING = 2;
+ FINISHED = 3;
+}
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple_no_java_package.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_no_java_package.proto
index 334ec0d68e4..649400e0780 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_simple_no_java_package.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_no_java_package.proto
@@ -21,26 +21,24 @@ package org.apache.flink.formats.protobuf.proto;
option java_multiple_files = true;
message SimpleTestNoJavaPackage {
- optional int32 a = 1 [default=10];
- optional int64 b = 2 [default=100];
- optional bool c = 3;
- optional float d = 4;
- optional double e = 5;
- optional string f = 6 [default="f"];
- optional bytes g = 7;
- optional Corpus h = 8;
- //this is must because protobuf have some field name parse bug if number is after "_".
- optional int32 f_abc_7d = 9;
+ optional int32 a = 1 [default = 10];
+ optional int64 b = 2 [default = 100];
+ optional bool c = 3;
+ optional float d = 4;
+ optional double e = 5;
+ optional string f = 6 [default = "f"];
+ optional bytes g = 7;
+ optional Corpus h = 8;
+ //this is must because protobuf have some field name parse bug if number is after "_".
+ optional int32 f_abc_7d = 9;
- enum Corpus {
- UNIVERSAL = 0;
- WEB = 1;
- IMAGES = 2;
- LOCAL = 3;
- NEWS = 4;
- PRODUCTS = 5;
- VIDEO = 7;
- }
+ enum Corpus {
+ UNIVERSAL = 0;
+ WEB = 1;
+ IMAGES = 2;
+ LOCAL = 3;
+ NEWS = 4;
+ PRODUCTS = 5;
+ VIDEO = 7;
+ }
}
-
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple_noouter_nomulti.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_nomulti.proto
similarity index 73%
rename from flink-formats/flink-protobuf/src/test/proto/test_simple_noouter_nomulti.proto
rename to flink-formats/flink-protobuf/src/test/proto/test_simple_nomulti.proto
index 379732d5503..f5ab90002b1 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_simple_noouter_nomulti.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_nomulti.proto
@@ -19,20 +19,26 @@
syntax = "proto2";
package org.apache.flink.formats.protobuf.proto;
option java_package = "org.apache.flink.formats.protobuf.testproto";
+option java_multiple_files = false;
-message SimpleTestNoouterNomulti {
+message SimpleTestNoMulti {
+ optional int32 a = 1;
+ optional int64 b = 2;
+ optional bool c = 3;
+ optional float d = 4;
+ optional double e = 5;
+ optional string f = 6;
+ optional bytes g = 7;
+ optional InnerMessageTest h = 8;
+ optional Result i = 9;
+
+ message InnerMessageTest{
optional int32 a = 1;
optional int64 b = 2;
- optional bool c = 3;
- optional float d = 4;
- optional double e = 5;
- optional string f = 6;
- optional bytes g = 7;
- optional InnerMessageTest h = 8;
-
- message InnerMessageTest{
- optional int32 a =1;
- optional int64 b =2;
- }
+ }
}
+enum Result {
+ SUCCESS = 0;
+ FAIL = 1;
+}
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_multi.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_multi.proto
index ca024b862d6..b47274a4ff2 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_multi.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_multi.proto
@@ -23,18 +23,17 @@ option java_outer_classname = "SimpleTestOuterMultiProto";
option java_multiple_files = true;
message SimpleTestOuterMulti {
+ optional int32 a = 1;
+ optional int64 b = 2;
+ optional bool c = 3;
+ optional float d = 4;
+ optional double e = 5;
+ optional string f = 6;
+ optional bytes g = 7;
+ optional InnerMessageTest h = 8;
+
+ message InnerMessageTest{
optional int32 a = 1;
optional int64 b = 2;
- optional bool c = 3;
- optional float d = 4;
- optional double e = 5;
- optional string f = 6;
- optional bytes g = 7;
- optional InnerMessageTest h = 8;
-
- message InnerMessageTest{
- optional int32 a =1;
- optional int64 b =2;
- }
+ }
}
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_nomulti.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_nomulti.proto
index 16459d27250..3185fcaa087 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_nomulti.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_nomulti.proto
@@ -20,20 +20,20 @@ syntax = "proto2";
package org.apache.flink.formats.protobuf.proto;
option java_package = "org.apache.flink.formats.protobuf.testproto";
option java_outer_classname = "SimpleTestOuterNomultiProto";
+option java_multiple_files = false;
-message SimpleTestOuterNomulti {
+message SimpleTestOuterNoMulti {
+ optional int32 a = 1;
+ optional int64 b = 2;
+ optional bool c = 3;
+ optional float d = 4;
+ optional double e = 5;
+ optional string f = 6;
+ optional bytes g = 7;
+ optional InnerMessageTest h = 8;
+
+ message InnerMessageTest{
optional int32 a = 1;
optional int64 b = 2;
- optional bool c = 3;
- optional float d = 4;
- optional double e = 5;
- optional string f = 6;
- optional bytes g = 7;
- optional InnerMessageTest h = 8;
-
- message InnerMessageTest{
- optional int32 a =1;
- optional int64 b =2;
- }
+ }
}
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_multi.proto
similarity index 88%
copy from flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
copy to flink-formats/flink-protobuf/src/test/proto/test_timestamp_multi.proto
index 814761b2cec..f7da5c47e62 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_multi.proto
@@ -16,14 +16,12 @@
* limitations under the License.
*/
-syntax = "proto2";
+syntax = "proto3";
package org.apache.flink.formats.protobuf.proto;
+import "google/protobuf/timestamp.proto";
option java_package = "org.apache.flink.formats.protobuf.testproto";
option java_multiple_files = true;
-message OneofTest {
- oneof test_oneof{
- int32 a = 1;
- int32 b = 2;
- }
+message TimestampTestMulti {
+ google.protobuf.Timestamp ts = 1;
}
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_nomulti.proto
similarity index 84%
copy from flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
copy to flink-formats/flink-protobuf/src/test/proto/test_timestamp_nomulti.proto
index 814761b2cec..6db2e9dbe15 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_nomulti.proto
@@ -16,14 +16,12 @@
* limitations under the License.
*/
-syntax = "proto2";
+syntax = "proto3";
package org.apache.flink.formats.protobuf.proto;
+import "google/protobuf/timestamp.proto";
option java_package = "org.apache.flink.formats.protobuf.testproto";
-option java_multiple_files = true;
+option java_multiple_files = false;
-message OneofTest {
- oneof test_oneof{
- int32 a = 1;
- int32 b = 2;
- }
+message TimestampTestNoMulti {
+ google.protobuf.Timestamp ts = 1;
}
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_multi.proto
similarity index 83%
copy from flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto
copy to flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_multi.proto
index 2a73a9a111b..bc21d85f378 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_multi.proto
@@ -16,17 +16,13 @@
* limitations under the License.
*/
-syntax = "proto2";
+syntax = "proto3";
package org.apache.flink.formats.protobuf.proto;
+import "google/protobuf/timestamp.proto";
option java_package = "org.apache.flink.formats.protobuf.testproto";
+option java_outer_classname = "TimestampTestOuterMultiProto";
option java_multiple_files = true;
-message RepeatedMessageTest {
- repeated InnerMessageTest d = 4;
- message InnerMessageTest{
- optional int32 a =1;
- optional int64 b =2;
- }
+message TimestampTestOuterMulti {
+ google.protobuf.Timestamp ts = 1;
}
-
-
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_nomulti.proto
similarity index 79%
copy from flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto
copy to flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_nomulti.proto
index 2a73a9a111b..5b571e78a12 100644
--- a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto
+++ b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_nomulti.proto
@@ -16,17 +16,13 @@
* limitations under the License.
*/
-syntax = "proto2";
+syntax = "proto3";
package org.apache.flink.formats.protobuf.proto;
+import "google/protobuf/timestamp.proto";
option java_package = "org.apache.flink.formats.protobuf.testproto";
-option java_multiple_files = true;
+option java_outer_classname = "TimestampTestOuterNomultiProto";
+option java_multiple_files = false;
-message RepeatedMessageTest {
- repeated InnerMessageTest d = 4;
- message InnerMessageTest{
- optional int32 a =1;
- optional int64 b =2;
- }
+message TimestampTestOuterNoMulti {
+ google.protobuf.Timestamp ts = 1;
}
-
-