You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tragicjun <gi...@git.apache.org> on 2018/05/26 12:50:41 UTC

[GitHub] flink pull request #6082: [FLINK-9444] KafkaAvroTableSource failed to work f...

GitHub user tragicjun opened a pull request:

    https://github.com/apache/flink/pull/6082

    [FLINK-9444] KafkaAvroTableSource failed to work for map fields

    ## What is the purpose of the change
    
    Once some Avro schema has map fields, an exception will be thrown when registering the KafkaAvroTableSource, complaining like:
    
    ```
    Exception in thread "main" org.apache.flink.table.api.ValidationException: Type Map<String, String> of table field 'event' does not match with type GenericType<java.util.Map> of the field 'event' of the TableSource return type.
    at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
    at org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
    at org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
    at org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
    at org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
    at org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)
    ```
    
    This pull request adds a new unit test to expose the issue and then fixes it. 
    
    *Note: In this implementation, following Avro primitive value types are supported: string, int, long, float, double and boolean, which should cater for most use cases.*
    
    ## Brief change log
    
      - Add a new unit test "testHasMapFieldsAvroClass()" in KafkaAvroTableSourceTestBase
      - Add some logic in "AvroTestUtils.createFlatAvroSchema()" to create Avro MapSchema
      - Add some logic in "AvroRecordClassConverter.convertType()" to convert "GenericType<java.util.Map>"  into "MapTypeInfo" with matching value types.
    
    ## Verifying this change
    
    This change can be verified as follows:
    - Run the unit test "testHasMapFieldsAvroClass()"  added in KafkaAvroTableSourceTestBase by this fix.
    - The unit test would fail with similar exceptions thrown described above.
    - Merge this fix and run the unit test again, it should pass 
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: ( no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tragicjun/flink master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6082.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6082
    
----
commit 802e7e211b7bea6fd17b88a058591272f0fb215f
Author: jerryjzhang <zh...@...>
Date:   2018-05-16T16:27:32Z

    [FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch

commit b731f98ff3ca920883bc3c9daebb599c25049c0d
Author: jerryjzhang <zh...@...>
Date:   2018-05-17T03:00:58Z

    [FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch

commit f291a34debca992ea675b75ffdb4358dfbfa3b47
Author: jerryjzhang <zh...@...>
Date:   2018-05-19T07:06:24Z

    [FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch

commit 61d2081ef7f8aa3669d9774da6149d4020d9581c
Author: jerryjzhang <zh...@...>
Date:   2018-05-19T07:34:20Z

    [FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch

commit d3d1afb710858b8f3cce988541ef3e805bd75b03
Author: jerryjzhang <zh...@...>
Date:   2018-05-23T15:01:10Z

    [FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch

commit 112873fd9cca097db7948d8454a3d66c5dd2b32f
Author: tragicjun <zh...@...>
Date:   2018-05-23T15:06:32Z

    Merge branch 'master' into master

commit e450e8b64c066339331e158e6d599b2599636d55
Author: jerryjzhang <zh...@...>
Date:   2018-05-23T15:55:51Z

    [FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch

commit 33349a82b3547e09e845f3d1d844d80a0ed0c091
Author: jerryjzhang <zh...@...>
Date:   2018-05-24T13:22:13Z

    revert FLINK-9384 changes

commit 8cc12a112ea673c3ec2794949b1a6ab63e855195
Author: jerryjzhang <zh...@...>
Date:   2018-05-24T13:24:49Z

    [FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch

commit d91a475a328e051f4717ec8b95be7adff92a3913
Author: jerryjzhang <zh...@...>
Date:   2018-05-26T08:15:17Z

    Sync with upstream

commit 84ac010f0480342fa5fdf912d7fb10ff1f444900
Author: jerryjzhang <zh...@...>
Date:   2018-05-26T08:17:03Z

    Sync with upstream

commit 5940fcbf7988a898a3e961f65d34f5711c17a5c4
Author: jerryjzhang <zh...@...>
Date:   2018-05-26T12:29:05Z

    [FLINK-9444]KafkaAvroTableSource failed to work for map fields

----


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    In the end it seems that I had to rewrite this whole Avro logic in order to finally support all types and both specific and generic Avro records. I hope it is ok that I could not include your contribution. Please let me know if this big change also solved your issues.


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6082


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    Thans @suez1224 @twalthr for reviewing. I've moved the unit tests from KafkaAvroTableSourceTestBase to AvroRowDeSerializationSchemaTest. As for other comments, plz see my comments.


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6082#discussion_r193092151
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
    @@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
     			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
     			if (genericTypeInfo.getTypeClass() == Utf8.class) {
     				return BasicTypeInfo.STRING_TYPE_INFO;
    +			} else if (genericTypeInfo.getTypeClass() == Map.class) {
    +				// avro map key is always string
    +				return Types.MAP(Types.STRING,
    +					convertPrimitiveType(schema.getValueType().getType()));
    --- End diff --
    
    This function requires "TypeInformation<?> extracted, Schema schema" , but we can only get "org.apache.avro.Schema.Type" from Avro MapSchema (value type) and ArraySchema (element type). 


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6082#discussion_r192771502
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
    @@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
     			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
     			if (genericTypeInfo.getTypeClass() == Utf8.class) {
     				return BasicTypeInfo.STRING_TYPE_INFO;
    +			} else if (genericTypeInfo.getTypeClass() == Map.class) {
    +				// avro map key is always string
    +				return Types.MAP(Types.STRING,
    +					convertPrimitiveType(schema.getValueType().getType()));
    +			} else if (genericTypeInfo.getTypeClass() == List.class &&
    +				schema.getType() == Schema.Type.ARRAY) {
    +				return Types.LIST(convertPrimitiveType(schema.getElementType().getType()));
    --- End diff --
    
    Call this function recursively. Btw also update the method docs about this behavior.


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    We treat sequences of values as arrays in SQL and the Table API. There are no built-in functions to handle lists. So we should return the values as an array, and hence don't need a List type.


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6082#discussion_r193101536
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
    @@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
     			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
     			if (genericTypeInfo.getTypeClass() == Utf8.class) {
     				return BasicTypeInfo.STRING_TYPE_INFO;
    +			} else if (genericTypeInfo.getTypeClass() == Map.class) {
    +				// avro map key is always string
    +				return Types.MAP(Types.STRING,
    +					convertPrimitiveType(schema.getValueType().getType()));
    --- End diff --
    
    do you actually mean the value **might** not be primitive?


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6082#discussion_r193102162
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
    @@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
     			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
     			if (genericTypeInfo.getTypeClass() == Utf8.class) {
     				return BasicTypeInfo.STRING_TYPE_INFO;
    +			} else if (genericTypeInfo.getTypeClass() == Map.class) {
    +				// avro map key is always string
    +				return Types.MAP(Types.STRING,
    +					convertPrimitiveType(schema.getValueType().getType()));
    +			} else if (genericTypeInfo.getTypeClass() == List.class &&
    +				schema.getType() == Schema.Type.ARRAY) {
    --- End diff --
    
    it is necessary because List.class doesn't mean the Schema.Type must be ARRAY. But I think it should be better use Schema.Type to do it.


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6082#discussion_r193132257
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
    @@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
     			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
     			if (genericTypeInfo.getTypeClass() == Utf8.class) {
     				return BasicTypeInfo.STRING_TYPE_INFO;
    +			} else if (genericTypeInfo.getTypeClass() == Map.class) {
    --- End diff --
    
    could you explain more about this? I didn't find any coupling between AvroRecordClassConverter and AvroRow(De)SerializationSchema. But I did encounter "UTF8<->String" cast problem during my integration which I was not sure if I should open a separate issue.


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6082#discussion_r192771700
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
    @@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
     			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
     			if (genericTypeInfo.getTypeClass() == Utf8.class) {
     				return BasicTypeInfo.STRING_TYPE_INFO;
    +			} else if (genericTypeInfo.getTypeClass() == Map.class) {
    +				// avro map key is always string
    +				return Types.MAP(Types.STRING,
    +					convertPrimitiveType(schema.getValueType().getType()));
    +			} else if (genericTypeInfo.getTypeClass() == List.class &&
    +				schema.getType() == Schema.Type.ARRAY) {
    --- End diff --
    
    Is this check necessary? If yes, why is it not necessary for Maps?


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6082#discussion_r192770567
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
    @@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
     			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
     			if (genericTypeInfo.getTypeClass() == Utf8.class) {
     				return BasicTypeInfo.STRING_TYPE_INFO;
    +			} else if (genericTypeInfo.getTypeClass() == Map.class) {
    +				// avro map key is always string
    +				return Types.MAP(Types.STRING,
    +					convertPrimitiveType(schema.getValueType().getType()));
    --- End diff --
    
    The value must not be primitive. Call this function recursively instead?


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    @twalthr Sure, please go ahead and let me know if anything I can help further. 


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    Thanks for the PR, @tragicjun. I will take a look in the next few days.


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    @fhueske Great, let me take a look and commit another version later. 


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6082#discussion_r192965275
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
    @@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
     			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
     			if (genericTypeInfo.getTypeClass() == Utf8.class) {
     				return BasicTypeInfo.STRING_TYPE_INFO;
    +			} else if (genericTypeInfo.getTypeClass() == Map.class) {
    --- End diff --
    
    If you update this converter class, you should also update the corresponding runtime classes in `org.apache.flink.formats.avro.AvroRow(De)SerializationSchema`


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    Particularly I suggest that we add LIST in org.apache.flink.table.api.Types to support Avro array types. I can submit it in next commits if you guys think the same.


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6082#discussion_r193137905
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
    @@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
     			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
     			if (genericTypeInfo.getTypeClass() == Utf8.class) {
     				return BasicTypeInfo.STRING_TYPE_INFO;
    +			} else if (genericTypeInfo.getTypeClass() == Map.class) {
    +				// avro map key is always string
    +				return Types.MAP(Types.STRING,
    +					convertPrimitiveType(schema.getValueType().getType()));
    --- End diff --
    
    if the value is not primitive, say another record, how could we get the **TypeInformation extracted**? One solution is to get full class name of the map value type and then use reflection to get the class type of it and pass the class type to **convert(Class<T> avroClass)**. Any better idea?


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    @suez1224 @twalthr any update please?


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    I encountered another exception working with string type in Avro map/array, any advice whether I should open a separate issue or just reusing this one.


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6082#discussion_r192955182
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
    @@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
     			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
     			if (genericTypeInfo.getTypeClass() == Utf8.class) {
     				return BasicTypeInfo.STRING_TYPE_INFO;
    +			} else if (genericTypeInfo.getTypeClass() == Map.class) {
    +				// avro map key is always string
    +				return Types.MAP(Types.STRING,
    +					convertPrimitiveType(schema.getValueType().getType()));
    +			} else if (genericTypeInfo.getTypeClass() == List.class &&
    +				schema.getType() == Schema.Type.ARRAY) {
    +				return Types.LIST(convertPrimitiveType(schema.getElementType().getType()));
    --- End diff --
    
    I dont think Flink Table & SQL support LIST, please see org.apache.flink.table.api.Types.


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    I think we have to return an typed array here. A `List` won't be supported by the built-in SQL functions. 
    
    There are a few tricks on can play to create typed arrays, even in static code like
    
    ```
    Object[] array = (Object[]) Array.newInstance(clazz, length);
    ```
    
    Have a look at the code of the ORC InputFormat that had to solve a similar challenge: [OrcBatchReader.java](https://github.com/apache/flink/blob/master/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java).


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6082#discussion_r193168591
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
    @@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
     			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
     			if (genericTypeInfo.getTypeClass() == Utf8.class) {
     				return BasicTypeInfo.STRING_TYPE_INFO;
    +			} else if (genericTypeInfo.getTypeClass() == Map.class) {
    +				// avro map key is always string
    +				return Types.MAP(Types.STRING,
    +					convertPrimitiveType(schema.getValueType().getType()));
    --- End diff --
    
    I've implemented a reflection version, which now supports record type within map/array.


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    Thanks for the update @tragicjun. I had a look at the changes. I think the logic does still not cover all cases that we want to cover. Avro has the following types:
    `RECORD, ENUM, ARRAY, MAP, UNION, FIXED, STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN, NULL`
    And they should all be covered in the converter and ser/deser schemes. Would it be ok for you if I try to simplify the logic a bit and build on top of your commits?


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    @twalthr @suez1224 would you please review on this?


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    Thanks @suez1224 @twalthr for reviewing. Plz see my comments and latest commits as per your comments.


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    Thanks @suez1224 @twalthr for reviewing. Plz see my comments and latest commits as per your comments.


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6082#discussion_r193039834
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java ---
    @@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
     			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
     			if (genericTypeInfo.getTypeClass() == Utf8.class) {
     				return BasicTypeInfo.STRING_TYPE_INFO;
    +			} else if (genericTypeInfo.getTypeClass() == Map.class) {
    +				// avro map key is always string
    +				return Types.MAP(Types.STRING,
    +					convertPrimitiveType(schema.getValueType().getType()));
    +			} else if (genericTypeInfo.getTypeClass() == List.class &&
    +				schema.getType() == Schema.Type.ARRAY) {
    +				return Types.LIST(convertPrimitiveType(schema.getElementType().getType()));
    --- End diff --
    
    yes, org.apache.flink.table.api.Types doesn't support LIST, but org.apache.flink.api.common.typeinfo.Types does. The Avro array type would be converted to java List type. Can we add LIST in org.apache.flink.table.api.Types to support Avro arrays?


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6082#discussion_r193084082
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java ---
    @@ -128,6 +130,82 @@ public void testDifferentFieldsAvroClass() {
     			source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
     	}
     
    +	@Test
    +	public void testHasMapFieldsAvroClass() {
    --- End diff --
    
    The issue is exposed when using KafkaAvroTableSource, but moving the unit tests to AvroRowDeSerializationSchemaTest should be fine.


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    Hi @fhueske , 
    
    Avro array type actually is mapped to Java List type, specifically the class **org.apache.avro.generic.GenericData.Array** extends **java.util.AbstractList**. I tried to convert the List  to an Array in **AvroRowDeserializationSchema**, but to make it generic an **Object []** must be returned, which would then lead to a cast problem when passing the **Object []** to TypeSerializer.copy().
    
    I tried using **ListTypeInfo** to declare corresponding Avro array type, it was just working fine, as we already have **ListSerializer** to support it.


---

[GitHub] flink issue #6082: [FLINK-9444] KafkaAvroTableSource failed to work for map ...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    Please ignore previous commits, only the latest commit is relevant to this issue.


---

[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6082#discussion_r192768664
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java ---
    @@ -128,6 +130,82 @@ public void testDifferentFieldsAvroClass() {
     			source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
     	}
     
    +	@Test
    +	public void testHasMapFieldsAvroClass() {
    --- End diff --
    
    I think we don't need changes in Kafka-related classes. This is an issue with the `AvroRowDeserializationSchema` and should be covered by the `AvroRowDeSerializationSchemaTest`.


---

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

Posted by tragicjun <gi...@git.apache.org>.
Github user tragicjun commented on the issue:

    https://github.com/apache/flink/pull/6082
  
    The latest commit uses **Types.OBJECT_ARRAY** to map Avro array type. Hence, Avro **GenericData.Array** has to be converted into regular java arrays back (see **AvroRowSerializationSchema**) and forth(see **AvroRowDeserializationSchema**). Moreover, nested record within Avro map/array is also supported.
    
    The unit tests and my local integration tests have passed. Would you please review? @fhueske @twalthr @suez1224 


---