You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/03/25 04:34:22 UTC
[GitHub] [pulsar] dennisylyung commented on issue #6034: [Pulsar IO][Issue
5633]Support avro schema for debezium connector
dennisylyung commented on issue #6034: [Pulsar IO][Issue 5633]Support avro schema for debezium connector
URL: https://github.com/apache/pulsar/pull/6034#issuecomment-603635083
I have tried to build and run it.
When I use JsonConverter ("org.apache.kafka.connect.json.JsonConverter")
There will be an error of:
```
12:29:55.891 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Step 9: scanned 40 rows in 1 tables in 00:00:00.228
12:29:55.891 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Step 10: committing transaction
12:29:55.939 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Step 11: releasing table read locks to enable MySQL writes
12:29:56.044 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Writes to MySQL prevented for a total of 00:00:02.03
12:29:56.044 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Completed snapshot in 00:00:03.829
12:29:56.095 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [null] Creating producer on cnx [id: 0x7d5e380e, L:/172.27.240.97:60775 - R:pulsar.data.ztore.com/34.71.116.218:6650]
12:29:56.336 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [pulsar-117-17] Created producer on cnx [id: 0x7d5e380e, L:/172.27.240.97:60775 - R:pulsar.data.ztore.com/34.71.116.218:6650]
12:29:56.634 [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [pulsar-117-17] GetOrCreateSchema error
org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Key schemas or Value schemas are different schema type, from key schema type is BYTES and to key schema is JSON, from value schema is BYTES and to value schema is JSON
at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:997) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.client.impl.ClientCnx.lambda$sendGetOrCreateSchema$22(ClientCnx.java:839) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ~[?:1.8.0_221]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_221]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) [?:1.8.0_221]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) [?:1.8.0_221]
at org.apache.pulsar.client.impl.ClientCnx.handleGetOrCreateSchemaResponse(ClientCnx.java:733) [pulsar-client-original.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:329) [pulsar-common.jar:2.6.0-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) [netty-codec-4.1.43.Final.jar:4.1.43.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) [netty-codec-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) [netty-common-4.1.43.Final.jar:4.1.43.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.43.Final.jar:4.1.43.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.43.Final.jar:4.1.43.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
```
And if I use AvroConverter (org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter)
There will be an error of:
```
12:24:17,895 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Requested thread factory for connector MySqlConnector, id = ztoreSalesDb named = binlog-client
12:24:17,907 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Requested thread factory for connector MySqlConnector, id = ztoreSalesDb named = snapshot
12:24:17,908 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Creating thread debezium-mysqlconnector-ztoreSalesDb-snapshot
12:24:22,622 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] AvroDataConfig - AvroDataConfig values:
connect.meta.data = true
enhanced.avro.schema.support = false
schemas.cache.config = 1000
12:24:24,488 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Encountered exception in sink write:
org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
at [Source: (byte[])"