You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/26 13:26:15 UTC

[GitHub] [flink] dawidwys commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

dawidwys commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r511842204



##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -417,4 +431,11 @@ public static LogicalType extractValueTypeToAvroMap(LogicalType type) {
 		}
 		return builder;
 	}
+
+	/** Returns schema with nullable true. */
+	private static Schema nullableSchema(Schema schema) {

Review comment:
       Could we stick to a single way of declaring `Schema` nullable? With this PR we have two methods for the same purpose:
   * `nullableSchema`
   * `getNullableBuilder`
   
   Either use the `nullableSchema` everywhere or use `getNullableBuilder(...).type(...)`.

##########
File path: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
##########
@@ -104,48 +104,115 @@ public void testRowTypeAvroSchemaConversion() {
 				DataTypes.FIELD("row3", DataTypes.ROW(DataTypes.FIELD("c", DataTypes.STRING())))))
 			.build().toRowDataType().getLogicalType();
 		Schema schema = AvroSchemaConverter.convertToSchema(rowType);
-		assertEquals("{\n" +
+		assertEquals("[ {\n" +

Review comment:
       I think it would be nice to add a test that we can convert back and forth between `DataType` and `Schema` in respect to the field names. 

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -362,7 +369,11 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
 					.record(rowName)
 					.fields();
 				for (int i = 0; i < rowType.getFieldCount(); i++) {
-					String fieldName = rowName + "_" + fieldNames.get(i);
+					String fieldName = fieldNames.get(i);
+					if (rowName.equals(fieldName)) {
+						// Can not build schema when the record and field have the same name
+						fieldName = rowName + "_" + fieldName;
+					}

Review comment:
       I think the correct solution will be:
   
   ```
   				RowType rowType = (RowType) logicalType;
   				List<String> fieldNames = rowType.getFieldNames();
   				// we have to make sure the record name is different in a Schema
   				SchemaBuilder.FieldAssembler<Schema> builder =
   						getNullableBuilder(logicalType)
   								.record(rowName)
   								.fields();
   				for (int i = 0; i < rowType.getFieldCount(); i++) {
   					String fieldName = fieldNames.get(i);
   					builder = builder
   						.name(fieldName)
   						.type(convertToSchema(rowType.getTypeAt(i), rowName + "_" + fieldName))
   						.noDefault();
   				}
   				return builder.endRecord();
   ```

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -362,7 +369,11 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
 					.record(rowName)
 					.fields();
 				for (int i = 0; i < rowType.getFieldCount(); i++) {
-					String fieldName = rowName + "_" + fieldNames.get(i);
+					String fieldName = fieldNames.get(i);
+					if (rowName.equals(fieldName)) {
+						// Can not build schema when the record and field have the same name
+						fieldName = rowName + "_" + fieldName;
+					}

Review comment:
       That is not correct. The builder does support same names for fields in different nested levels.
   
   Avro in general does not support same record types with different schemas. And it does it rightly so. Therefore a schema like:
   
   ```
   {
       "type": "record", 
       "name": "top", 
       "fields": [ 
   		  {
                "name": "top", 
                "type": { 
                    "type": "record", 
                    "name": "nested", 
                    "fields": [ 
                        {"type": "string", "name": "top"} 
                    ]
                }
             }
       ] 
   }
   ```
   is valid and supported. However if we change the name of the `nested` record to `top` it will be invalid:
   ```
   {
       "type": "record", 
       "name": "top", 
       "fields": [ 
   		  {
                "name": "top", 
                "type": { 
                    "type": "record", 
                    "name": "top", 
                    "fields": [ 
                        {"type": "string", "name": "top"} 
                    ]
                }
             }
       ] 
   }
   ```
   
   I think the core problem lays in how the `rowName` is generated. I think we should never adjust the `fieldName`, but we should append the `fieldName` to the `rowName`.
   
   BTW another shortcoming that I see is that we are losing the record name when converting from `Schema` to `DataType`. I think it is not a real issue though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org