You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Maria Pilar <pi...@gmail.com> on 2018/01/18 20:56:22 UTC

Error sink parse Json

Hi everyone,

I trying to send events from the topic

This is the sink json configuration I’ve used:



{

  "name":"CPSConnector",

  "config":{


"connector.class":"com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",

    "tasks.max":"1",

    "topics":"STREAM-CUSTOMER-ACCOUNTS",

    "cps.topic":"test",

    "cps.project":"test-dev",

    "maxBufferSize":"10"

  }

}


and these are the ‘converter’ configuration I have in the properties file
(basically they are the default ones):




key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true

value.converter.schemas.enable=true



internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false





but I ‘m getting this trace in the log of Connect:



"org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka
Connect data failed due to serialization error: \n\tat
org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:304)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
java.lang.Thread.run(Thread.java:748)\nCaused by:
org.apache.kafka.common.errors.SerializationException:
com.fasterxml.jackson.core.JsonParseException: Unrecognized token
'ac0e69cb': was expecting ('true', 'false' or 'null')\n at [Source:
(byte[])\"ac0e69cb-5cab-4134-90c1-91ac70ce8b11\"; line: 1, column:
10]\nCaused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized
token 'ac0e69cb': was expecting ('true', 'false' or 'null')\n at [Source:
(byte[])\"ac0e69cb-5cab-4134-90c1-91ac70ce8b11\"; line: 1, column:
10]\n\tat
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1798)\n\tat
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:673)\n\tat
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3527)\n\tat
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2622)\n\tat
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)\n\tat
com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)\n\tat
com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4030)\n\tat
com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2559)\n\tat
org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)\n\tat
org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:302)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
java.lang.Thread.run(Thread.java:748)\n"




I´m not sure what configuration needs to be able to send the event


Thanks