You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/03/25 04:36:34 UTC
[GitHub] [pulsar] dennisylyung edited a comment on issue #6034: [Pulsar
IO][Issue 5633]Support avro schema for debezium connector
dennisylyung edited a comment on issue #6034: [Pulsar IO][Issue 5633]Support avro schema for debezium connector
URL: https://github.com/apache/pulsar/pull/6034#issuecomment-603635083
I have tried to build and run it.
When I use JsonConverter ("org.apache.kafka.connect.json.JsonConverter")
There will be an error of:
```
12:29:55.891 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Step 9: scanned 40 rows in 1 tables in 00:00:00.228
12:29:55.891 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Step 10: committing transaction
12:29:55.939 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Step 11: releasing table read locks to enable MySQL writes
12:29:56.044 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Writes to MySQL prevented for a total of 00:00:02.03
12:29:56.044 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Completed snapshot in 00:00:03.829
12:29:56.095 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [null] Creating producer on cnx [id: 0x7d5e380e, L:/172.27.240.97:60775 - R:pulsar.data.ztore.com/34.71.116.218:6650]
12:29:56.336 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [pulsar-117-17] Created producer on cnx [id: 0x7d5e380e, L:/172.27.240.97:60775 - R:pulsar.data.ztore.com/34.71.116.218:6650]
12:29:56.634 [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [pulsar-117-17] GetOrCreateSchema error
org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Key schemas or Value schemas are different schema type, from key schema type is BYTES and to key schema is JSON, from value schema is BYTES and to value schema is JSON
at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:997) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.client.impl.ClientCnx.lambda$sendGetOrCreateSchema$22(ClientCnx.java:839) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ~[?:1.8.0_221]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_221]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) [?:1.8.0_221]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) [?:1.8.0_221]
at org.apache.pulsar.client.impl.ClientCnx.handleGetOrCreateSchemaResponse(ClientCnx.java:733) [pulsar-client-original.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:329) [pulsar-common.jar:2.6.0-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) [netty-codec-4.1.43.Final.jar:4.1.43.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) [netty-codec-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) [netty-common-4.1.43.Final.jar:4.1.43.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.43.Final.jar:4.1.43.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.43.Final.jar:4.1.43.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
```
I have made sure to delete all the existing schemas before running so the schema mutation error is not stem from leftover schema from previous runs.
And if I use AvroConverter (org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter)
There will be an error of:
```
12:24:17,895 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Requested thread factory for connector MySqlConnector, id = ztoreSalesDb named = binlog-client
12:24:17,907 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Requested thread factory for connector MySqlConnector, id = ztoreSalesDb named = snapshot
12:24:17,908 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Creating thread debezium-mysqlconnector-ztoreSalesDb-snapshot
12:24:22,622 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] AvroDataConfig - AvroDataConfig values:
connect.meta.data = true
enhanced.avro.schema.support = false
schemas.cache.config = 1000
12:24:24,488 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Encountered exception in sink write:
org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
at [Source: (byte[])"�����"; line: 1, column: 2]
Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
at [Source: (byte[])"�����"; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.1.jar:2.10.1]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712) ~[jackson-core-2.10.1.jar:2.10.1]
at com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:690) ~[jackson-core-2.10.1.jar:2.10.1]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2373) ~[jackson-core-2.10.1.jar:2.10.1]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:672) ~[jackson-core-2.10.1.jar:2.10.1]
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4231) ~[jackson-databind-2.10.1.jar:2.10.1]
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2725) ~[jackson-databind-2.10.1.jar:2.10.1]
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:45) ~[?:?]
at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:90) ~[?:?]
at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:43) ~[?:?]
at org.apache.pulsar.common.schema.KeyValue.encode(KeyValue.java:99) ~[pulsar-client-api.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:128) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:37) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:155) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:297) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:444) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:427) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:282) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
12:24:24,499 WARN [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Failed to process result of message org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord@37912f1e
java.lang.RuntimeException: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
at [Source: (byte[])"�����"; line: 1, column: 2]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:448) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:427) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:282) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
at [Source: (byte[])"�����"; line: 1, column: 2]
Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
at [Source: (byte[])"�����"; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.1.jar:2.10.1]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712) ~[jackson-core-2.10.1.jar:2.10.1]
at com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:690) ~[jackson-core-2.10.1.jar:2.10.1]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2373) ~[jackson-core-2.10.1.jar:2.10.1]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:672) ~[jackson-core-2.10.1.jar:2.10.1]
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4231) ~[jackson-databind-2.10.1.jar:2.10.1]
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2725) ~[jackson-databind-2.10.1.jar:2.10.1]
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:45) ~[?:?]
at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:90) ~[?:?]
at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:43) ~[?:?]
at org.apache.pulsar.common.schema.KeyValue.encode(KeyValue.java:99) ~[pulsar-client-api.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:128) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:37) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:155) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:297) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:444) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
... 3 more
12:24:24,501 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Encountered exception in source read:
java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_221]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_221]
at org.apache.pulsar.io.kafka.connect.KafkaConnectSource.read(KafkaConnectSource.java:165) ~[pulsar-io-kafka-connect-adaptor-2.6.0-SNAPSHOT.jar:?]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:460) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: java.lang.Exception: Sink Error
at org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord.fail(KafkaConnectSource.java:322) ~[pulsar-io-kafka-connect-adaptor-2.6.0-SNAPSHOT.jar:?]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:285) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
... 1 more
12:24:24,502 ERROR [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - [ztore-data/debezium-local/debezium-mysql-source:0] Uncaught exception in Java Instance
java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:464) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_221]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_221]
at org.apache.pulsar.io.kafka.connect.KafkaConnectSource.read(KafkaConnectSource.java:165) ~[?:?]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:460) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
... 2 more
Caused by: java.lang.Exception: Sink Error
at org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord.fail(KafkaConnectSource.java:322) ~[?:?]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:285) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
... 1 more
12:24:24,503 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Closing instance
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services