You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by suez1224 <gi...@git.apache.org> on 2018/06/05 06:12:27 UTC
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user suez1224 commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r192955182
--- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+ } else if (genericTypeInfo.getTypeClass() == Map.class) {
+ // avro map key is always string
+ return Types.MAP(Types.STRING,
+ convertPrimitiveType(schema.getValueType().getType()));
+ } else if (genericTypeInfo.getTypeClass() == List.class &&
+ schema.getType() == Schema.Type.ARRAY) {
+ return Types.LIST(convertPrimitiveType(schema.getElementType().getType()));
--- End diff --
I dont think Flink Table & SQL support LIST, please see org.apache.flink.table.api.Types.
---