You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Shaik Zakir Hussain (Jira)" <ji...@apache.org> on 2020/09/11 21:30:00 UTC

[jira] [Commented] (KAFKA-10477) Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record

    [ https://issues.apache.org/jira/browse/KAFKA-10477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194533#comment-17194533 ] 

Shaik Zakir Hussain commented on KAFKA-10477:
---------------------------------------------

Observed that the issue doesn't occur in Kafka *v2.3.0* (where jackson dependency resolves to *v2.9.9*).

> Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10477
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10477
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.4.0
>            Reporter: Shaik Zakir Hussain
>            Priority: Major
>
> Sink connector is facing a DataException when trying to convert a kafka record with empty key to Connect data format. 
> Kafka's trunk branch currently depends on *jackson v2.10.5* 
> A short unit test (shared below) in `org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue.  
> {code:java}
> @Test
>     public void testToConnectDataEmptyKey() throws IOException {
>         Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
>         converter.configure(props, true);
>         String str = "";
>         SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", str.getBytes());
>         System.out.println(schemaAndValue);
>     }
> {code}
> This test code snippet fails with the following exception:
> {noformat}
> org.apache.kafka.connect.errors.DataException: Unknown schema type: null
> 	at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764)
> 	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385)
> 	at org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> 	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> 	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> 	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> 	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> 	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {noformat}
>  
> This seems related to the issue [https://github.com/FasterXML/jackson-databind/issues/2211] , where jackson lib started returning `MissingNode` for empty input in `ObjectMapper.readTree(input)` method invocation. Precise code change can be observed here: [https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094] 
>  
> This causes an exception to throw up in our JsonConverter class : [https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764] 
>  
> In my opinion, when the `jsonValue.getNodeType()` is `MISSING` ([https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L754] ), we need to fall back to the behaviour of the case `NULL` (i.e. return null), although not sure of any further repercussions this might bring in.
>  
> Things were working fine when the dependency on *jackson* lib was of version  *v2.9.10.3* or lesser as the `ObjectMapper` returned null in that case.
>  
> Thanks,
> Zakir



--
This message was sent by Atlassian Jira
(v8.3.4#803005)