You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Randall Hauch (Jira)" <ji...@apache.org> on 2021/07/28 13:28:00 UTC

[jira] [Assigned] (KAFKA-10627) Connect TimestampConverter transform does not support multiple formats for the same field and only allows one field to be transformed at a time

     [ https://issues.apache.org/jira/browse/KAFKA-10627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Randall Hauch reassigned KAFKA-10627:
-------------------------------------

    Assignee: Joshua Grisham

> Connect TimestampConverter transform does not support multiple formats for the same field and only allows one field to be transformed at a time
> -----------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10627
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10627
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Joshua Grisham
>            Assignee: Joshua Grisham
>            Priority: Minor
>              Labels: connect-transformation, need-kip
>
> Some of the limitations of the *TimestampConverter* transform are causing issues for us since we have a lot of different producers from different systems producing events to some of our topics.  We try our best to have governance on the data formats including strict usage of Avro schemas but there are still variations in timestamp data types that are allowed by the schema.
> In the end there will be multiple formats coming into the same timestamp fields (for example, with and without milliseconds, with and without a timezone specifier, etc).
> And then you get failed events in Connect with messages like this:
> {noformat}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
> 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorror(RetryWithToleranceOperator.java:178)
> 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
> 	at org.apache.ntime.TransformationChain.apply(TransformationChain.java:50)
> 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
> 	at org.aect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
> 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
> 	at org.apache.kafka.corkerSinkTask.iteration(WorkerSinkTask.java:228)
> 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
> 	at org.apache.kafka.connect.runtime.WorrkerTask.java:184)
> 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	atrrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$WorolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.connect.errors.DataException: Could not parse timestamp: value (2020-10-06T12:12:27h pattern (yyyy-MM-dd'T'HH:mm:ss.SSSX)
> 	at org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampConverter.java:120)
> 	at org.apache.kafka.connect.transformrter.convertTimestamp(TimestampConverter.java:450)
> 	at org.apache.kafka.connect.transforms.TimestampConverter.applyValueWithSchema(TimestampConverter.java:375)
> 	at org.apachtransforms.TimestampConverter.applyWithSchema(TimestampConverter.java:362)
> 	at org.apache.kafka.connect.transforms.TimestampConverter.apply(TimestampConverter.java:279)
> 	at .connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
> 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithT.java:128)
> 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
> 	... 14 more
> Caused by: java.text.Unparseable date: \"2020-10-06T12:12:27Z\"
> 	at java.text.DateFormat.parse(DateFormat.java:366)
> 	at org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampCo)
> 	... 21 more
> {noformat}
>  
> My thinking is that maybe a good solution is to switch from using *java.util.Date* to instead using *java.util.Time*, then instead of *SimpleDateFormatter* switch to *DateTimeFormatter* which will allow usage of more sophisticated patterns in the config to match multiple different allowable formats.
> For example instead of effectively doing this:
> {code:java}
> SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");{code}
> It can be something like this:
> {code:java}
> DateTimeFormatter format = DateTimeFormatter.ofPattern("[yyyy-MM-dd[['T'][ ]HH:mm:ss[.SSSSSSSz][.SSS[XXX][X]]]]");{code}
> Also if there are multiple timestamp fields in the schema/events, then today you have to chain multiple *TimestampConverter* transforms together but I can see a little bit of a performance impact if there are many timestamps on large events and the topic has a lot of events coming through.
> So it would be great actually if the field name could instead be a comma-separated list of field names (much like you can use with *Cast*, *ReplaceField*, etc transforms) and then it will just loop through each field in the list and apply the same logic (parse field based on string and give requested output type).
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)