You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2019/10/03 19:57:00 UTC

[jira] [Commented] (KAFKA-6290) Kafka Connect cast transformation should support logical types

    [ https://issues.apache.org/jira/browse/KAFKA-6290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16943861#comment-16943861 ] 

ASF GitHub Bot commented on KAFKA-6290:
---------------------------------------

rhauch commented on pull request #7371: KAFKA-6290: Support casting from logical types in cast transform
URL: https://github.com/apache/kafka/pull/7371
 
 
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Kafka Connect cast transformation should support logical types
> --------------------------------------------------------------
>
>                 Key: KAFKA-6290
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6290
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.11.0.0
>            Reporter: Sudhir Pradhan
>            Assignee: Nigel Liang
>            Priority: Major
>              Labels: confluent-kafka, confluentic, connect, connect-api, connect-transformation, kafka, kafka-connect, transform
>
> I am facing same issue when consuming from KAFKA to HDFS with CAST TRANSFORMS. Any pointer please.
> My Connector :
> *********************
> {code:java}
> {
>  "name": "hdfs-sink-avro-cast-test-stndln",
>  "config": {
>   "key.converter": "io.confluent.connect.avro.AvroConverter",
>   "key.converter.schema.registry.url": "http://localhost:8081",
>   "value.converter": "io.confluent.connect.avro.AvroConverter",
>   "value.converter.schema.registry.url": "http://localhost:8081",
>   "key.converter.schemas.enable": "true",
>   "value.converter.schemas.enable": "true",
>   "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
>   "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
>   "internal.key.converter.schemas.enable": "false",
>   "internal.value.converter.schemas.enable": "false",
>   "offset.storage.file.filename": "/tmp/connect.offsets.avroHdfsConsumer.casttest.stndln",
>   "offset.flush.interval.ms": "500",
>   "parse.key": "true",
>   "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
>   "hadoop.home": "/usr/lib/hadoop",
>   "hdfs.url": "hdfs://ip-127-34-56-789.us-east-1.compute.interna:8020",
>   "topics": "avro_raw_KFK_SRP_USER_TEST_V,avro_raw_KFK_SRP_PG_HITS_TEST_V",
>   "tasks.max": "1",
>   "topics.dir": "/home/hadoop/kafka/data/streams/in/raw/casttest1",
>   "logs.dir": "/home/hadoop/kafka/wal/streams/in/raw/casttest1",
>   "hive.integration": "true",
>   "hive.metastore.uris": "thrift://ip-127-34-56-789.us-east-1.compute.internal:9083",
>   "schema.compatibility": "BACKWARD",
>   "flush.size": "10000",
>   "rotate.interval.ms": "1000",
>   "mode": "timestamp",
>   "transforms": "Cast",
>   "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
>   "transforms.Cast.spec": "residuals:float64,comp:float64"
>  }
> }
> {code}
> Exception :
> *************
> {code:java}
> [2017-11-16 01:14:39,719] ERROR Task hdfs-sink-avro-cast-test-stndln-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
> org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.util.Date for field: "null"
>         at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239)
>         at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209)
>         at org.apache.kafka.connect.data.Struct.put(Struct.java:214)
>         at org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152)
>         at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108)
>         at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:414)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
>         at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2017-11-16 01:14:39,719] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)
> [2017-11-16 01:14:39,719] INFO Shutting down Hive executor service. (io.confluent.connect.hdfs.DataWriter:309)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)