You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tragicjun <gi...@git.apache.org> on 2018/05/26 12:50:41 UTC
[GitHub] flink pull request #6082: [FLINK-9444] KafkaAvroTableSource failed to work f...
GitHub user tragicjun opened a pull request:
https://github.com/apache/flink/pull/6082
[FLINK-9444] KafkaAvroTableSource failed to work for map fields
## What is the purpose of the change
Once some Avro schema has map fields, an exception will be thrown when registering the KafkaAvroTableSource, complaining like:
```
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type Map<String, String> of table field 'event' does not match with type GenericType<java.util.Map> of the field 'event' of the TableSource return type.
at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
at org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
at org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
at org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
at org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
at org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)
```
This pull request adds a new unit test to expose the issue and then fixes it.
*Note: In this implementation, following Avro primitive value types are supported: string, int, long, float, double and boolean, which should cater for most use cases.*
## Brief change log
- Add a new unit test "testHasMapFieldsAvroClass()" in KafkaAvroTableSourceTestBase
- Add some logic in "AvroTestUtils.createFlatAvroSchema()" to create Avro MapSchema
- Add some logic in "AvroRecordClassConverter.convertType()" to convert "GenericType<java.util.Map>" into "MapTypeInfo" with matching value types.
## Verifying this change
This change can be verified as follows:
- Run the unit test "testHasMapFieldsAvroClass()" added in KafkaAvroTableSourceTestBase by this fix.
- The unit test would fail with similar exceptions thrown described above.
- Merge this fix and run the unit test again, it should pass
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: ( no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tragicjun/flink master
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6082.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6082
----
commit 802e7e211b7bea6fd17b88a058591272f0fb215f
Author: jerryjzhang <zh...@...>
Date: 2018-05-16T16:27:32Z
[FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch
commit b731f98ff3ca920883bc3c9daebb599c25049c0d
Author: jerryjzhang <zh...@...>
Date: 2018-05-17T03:00:58Z
[FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch
commit f291a34debca992ea675b75ffdb4358dfbfa3b47
Author: jerryjzhang <zh...@...>
Date: 2018-05-19T07:06:24Z
[FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch
commit 61d2081ef7f8aa3669d9774da6149d4020d9581c
Author: jerryjzhang <zh...@...>
Date: 2018-05-19T07:34:20Z
[FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch
commit d3d1afb710858b8f3cce988541ef3e805bd75b03
Author: jerryjzhang <zh...@...>
Date: 2018-05-23T15:01:10Z
[FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch
commit 112873fd9cca097db7948d8454a3d66c5dd2b32f
Author: tragicjun <zh...@...>
Date: 2018-05-23T15:06:32Z
Merge branch 'master' into master
commit e450e8b64c066339331e158e6d599b2599636d55
Author: jerryjzhang <zh...@...>
Date: 2018-05-23T15:55:51Z
[FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch
commit 33349a82b3547e09e845f3d1d844d80a0ed0c091
Author: jerryjzhang <zh...@...>
Date: 2018-05-24T13:22:13Z
revert FLINK-9384 changes
commit 8cc12a112ea673c3ec2794949b1a6ab63e855195
Author: jerryjzhang <zh...@...>
Date: 2018-05-24T13:24:49Z
[FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch
commit d91a475a328e051f4717ec8b95be7adff92a3913
Author: jerryjzhang <zh...@...>
Date: 2018-05-26T08:15:17Z
Sync with upstream
commit 84ac010f0480342fa5fdf912d7fb10ff1f444900
Author: jerryjzhang <zh...@...>
Date: 2018-05-26T08:17:03Z
Sync with upstream
commit 5940fcbf7988a898a3e961f65d34f5711c17a5c4
Author: jerryjzhang <zh...@...>
Date: 2018-05-26T12:29:05Z
[FLINK-9444]KafkaAvroTableSource failed to work for map fields
----
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/6082
In the end it seems that I had to rewrite this whole Avro logic in order to finally support all types and both specific and generic Avro records. I hope it is ok that I could not include your contribution. Please let me know if this big change also solved your issues.
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6082
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:
https://github.com/apache/flink/pull/6082
Thans @suez1224 @twalthr for reviewing. I've moved the unit tests from KafkaAvroTableSourceTestBase to AvroRowDeSerializationSchemaTest. As for other comments, plz see my comments.
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r193092151
--- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+ } else if (genericTypeInfo.getTypeClass() == Map.class) {
+ // avro map key is always string
+ return Types.MAP(Types.STRING,
+ convertPrimitiveType(schema.getValueType().getType()));
--- End diff --
This function requires "TypeInformation<?> extracted, Schema schema" , but we can only get "org.apache.avro.Schema.Type" from Avro MapSchema (value type) and ArraySchema (element type).
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r192771502
--- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+ } else if (genericTypeInfo.getTypeClass() == Map.class) {
+ // avro map key is always string
+ return Types.MAP(Types.STRING,
+ convertPrimitiveType(schema.getValueType().getType()));
+ } else if (genericTypeInfo.getTypeClass() == List.class &&
+ schema.getType() == Schema.Type.ARRAY) {
+ return Types.LIST(convertPrimitiveType(schema.getElementType().getType()));
--- End diff --
Call this function recursively. Btw also update the method docs about this behavior.
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/6082
We treat sequences of values as arrays in SQL and the Table API. There are no built-in functions to handle lists. So we should return the values as an array, and hence don't need a List type.
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r193101536
--- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+ } else if (genericTypeInfo.getTypeClass() == Map.class) {
+ // avro map key is always string
+ return Types.MAP(Types.STRING,
+ convertPrimitiveType(schema.getValueType().getType()));
--- End diff --
do you actually mean the value **might** not be primitive?
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r193102162
--- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+ } else if (genericTypeInfo.getTypeClass() == Map.class) {
+ // avro map key is always string
+ return Types.MAP(Types.STRING,
+ convertPrimitiveType(schema.getValueType().getType()));
+ } else if (genericTypeInfo.getTypeClass() == List.class &&
+ schema.getType() == Schema.Type.ARRAY) {
--- End diff --
it is necessary because List.class doesn't mean the Schema.Type must be ARRAY. But I think it should be better use Schema.Type to do it.
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r193132257
--- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+ } else if (genericTypeInfo.getTypeClass() == Map.class) {
--- End diff --
could you explain more about this? I didn't find any coupling between AvroRecordClassConverter and AvroRow(De)SerializationSchema. But I did encounter "UTF8<->String" cast problem during my integration which I was not sure if I should open a separate issue.
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r192771700
--- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+ } else if (genericTypeInfo.getTypeClass() == Map.class) {
+ // avro map key is always string
+ return Types.MAP(Types.STRING,
+ convertPrimitiveType(schema.getValueType().getType()));
+ } else if (genericTypeInfo.getTypeClass() == List.class &&
+ schema.getType() == Schema.Type.ARRAY) {
--- End diff --
Is this check necessary? If yes, why is it not necessary for Maps?
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r192770567
--- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+ } else if (genericTypeInfo.getTypeClass() == Map.class) {
+ // avro map key is always string
+ return Types.MAP(Types.STRING,
+ convertPrimitiveType(schema.getValueType().getType()));
--- End diff --
The value must not be primitive. Call this function recursively instead?
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:
https://github.com/apache/flink/pull/6082
@twalthr Sure, please go ahead and let me know if anything I can help further.
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on the issue:
https://github.com/apache/flink/pull/6082
Thanks for the PR, @tragicjun. I will take a look in the next few days.
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:
https://github.com/apache/flink/pull/6082
@fhueske Great, let me take a look and commit another version later.
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r192965275
--- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+ } else if (genericTypeInfo.getTypeClass() == Map.class) {
--- End diff --
If you update this converter class, you should also update the corresponding runtime classes in `org.apache.flink.formats.avro.AvroRow(De)SerializationSchema`
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:
https://github.com/apache/flink/pull/6082
Particularly I suggest that we add LIST in org.apache.flink.table.api.Types to support Avro array types. I can submit it in next commits if you guys think the same.
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r193137905
--- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+ } else if (genericTypeInfo.getTypeClass() == Map.class) {
+ // avro map key is always string
+ return Types.MAP(Types.STRING,
+ convertPrimitiveType(schema.getValueType().getType()));
--- End diff --
if the value is not primitive, say another record, how could we get the **TypeInformation extracted**? One solution is to get full class name of the map value type and then use reflection to get the class type of it and pass the class type to **convert(Class<T> avroClass)**. Any better idea?
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:
https://github.com/apache/flink/pull/6082
@suez1224 @twalthr any update please?
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:
https://github.com/apache/flink/pull/6082
I encountered another exception working with string type in Avro map/array, any advice whether I should open a separate issue or just reusing this one.
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r192955182
--- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+ } else if (genericTypeInfo.getTypeClass() == Map.class) {
+ // avro map key is always string
+ return Types.MAP(Types.STRING,
+ convertPrimitiveType(schema.getValueType().getType()));
+ } else if (genericTypeInfo.getTypeClass() == List.class &&
+ schema.getType() == Schema.Type.ARRAY) {
+ return Types.LIST(convertPrimitiveType(schema.getElementType().getType()));
--- End diff --
I dont think Flink Table & SQL support LIST, please see org.apache.flink.table.api.Types.
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/6082
I think we have to return an typed array here. A `List` won't be supported by the built-in SQL functions.
There are a few tricks on can play to create typed arrays, even in static code like
```
Object[] array = (Object[]) Array.newInstance(clazz, length);
```
Have a look at the code of the ORC InputFormat that had to solve a similar challenge: [OrcBatchReader.java](https://github.com/apache/flink/blob/master/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java).
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r193168591
--- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+ } else if (genericTypeInfo.getTypeClass() == Map.class) {
+ // avro map key is always string
+ return Types.MAP(Types.STRING,
+ convertPrimitiveType(schema.getValueType().getType()));
--- End diff --
I've implemented a reflection version, which now supports record type within map/array.
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/6082
Thanks for the update @tragicjun. I had a look at the changes. I think the logic does still not cover all cases that we want to cover. Avro has the following types:
`RECORD, ENUM, ARRAY, MAP, UNION, FIXED, STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN, NULL`
And they should all be covered in the converter and ser/deser schemes. Would it be ok for you if I try to simplify the logic a bit and build on top of your commits?
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:
https://github.com/apache/flink/pull/6082
@twalthr @suez1224 would you please review on this?
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:
https://github.com/apache/flink/pull/6082
Thanks @suez1224 @twalthr for reviewing. Plz see my comments and latest commits as per your comments.
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:
https://github.com/apache/flink/pull/6082
Thanks @suez1224 @twalthr for reviewing. Plz see my comments and latest commits as per your comments.
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r193039834
--- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+ } else if (genericTypeInfo.getTypeClass() == Map.class) {
+ // avro map key is always string
+ return Types.MAP(Types.STRING,
+ convertPrimitiveType(schema.getValueType().getType()));
+ } else if (genericTypeInfo.getTypeClass() == List.class &&
+ schema.getType() == Schema.Type.ARRAY) {
+ return Types.LIST(convertPrimitiveType(schema.getElementType().getType()));
--- End diff --
yes, org.apache.flink.table.api.Types doesn't support LIST, but org.apache.flink.api.common.typeinfo.Types does. The Avro array type would be converted to java List type. Can we add LIST in org.apache.flink.table.api.Types to support Avro arrays?
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r193084082
--- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java ---
@@ -128,6 +130,82 @@ public void testDifferentFieldsAvroClass() {
source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
}
+ @Test
+ public void testHasMapFieldsAvroClass() {
--- End diff --
The issue is exposed when using KafkaAvroTableSource, but moving the unit tests to AvroRowDeSerializationSchemaTest should be fine.
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:
https://github.com/apache/flink/pull/6082
Hi @fhueske ,
Avro array type actually is mapped to Java List type, specifically the class **org.apache.avro.generic.GenericData.Array** extends **java.util.AbstractList**. I tried to convert the List to an Array in **AvroRowDeserializationSchema**, but to make it generic an **Object []** must be returned, which would then lead to a cast problem when passing the **Object []** to TypeSerializer.copy().
I tried using **ListTypeInfo** to declare corresponding Avro array type, it was just working fine, as we already have **ListSerializer** to support it.
---
[GitHub] flink issue #6082: [FLINK-9444] KafkaAvroTableSource failed to work for map ...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:
https://github.com/apache/flink/pull/6082
Please ignore previous commits, only the latest commit is relevant to this issue.
---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r192768664
--- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java ---
@@ -128,6 +130,82 @@ public void testDifferentFieldsAvroClass() {
source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
}
+ @Test
+ public void testHasMapFieldsAvroClass() {
--- End diff --
I think we don't need changes in Kafka-related classes. This is an issue with the `AvroRowDeserializationSchema` and should be covered by the `AvroRowDeSerializationSchemaTest`.
---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:
https://github.com/apache/flink/pull/6082
The latest commit uses **Types.OBJECT_ARRAY** to map Avro array type. Hence, Avro **GenericData.Array** has to be converted into regular java arrays back (see **AvroRowSerializationSchema**) and forth(see **AvroRowDeserializationSchema**). Moreover, nested record within Avro map/array is also supported.
The unit tests and my local integration tests have passed. Would you please review? @fhueske @twalthr @suez1224
---