You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Robin Moffatt (Jira)" <ji...@apache.org> on 2019/10/11 11:33:00 UTC

[jira] [Created] (KAFKA-9024) org.apache.kafka.connect.transforms.ValueToKey throws NPE

Robin Moffatt created KAFKA-9024:
------------------------------------

             Summary: org.apache.kafka.connect.transforms.ValueToKey throws NPE
                 Key: KAFKA-9024
                 URL: https://issues.apache.org/jira/browse/KAFKA-9024
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
            Reporter: Robin Moffatt


If a field named in the SMT does not exist a NPE is thrown. This is not helpful to users and should be caught correctly and reported back in a more friendly way.

For example, importing data from a database with this transform: 

 
{code:java}
transforms = [ksqlCreateKey, ksqlExtractString]
transforms.ksqlCreateKey.fields = [ID]
transforms.ksqlCreateKey.type = class org.apache.kafka.connect.transforms.ValueToKey
transforms.ksqlExtractString.field = ID
transforms.ksqlExtractString.type = class org.apache.kafka.connect.transforms.ExtractField$Key
{code}
If the field name is {{id}} not {{ID}} then the task fails : 
{code:java}
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
   at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
   at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
   at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
   at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
   at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
   at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
   ... 11 more

{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)