You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/03/25 04:36:34 UTC

[GitHub] [pulsar] dennisylyung edited a comment on issue #6034: [Pulsar IO][Issue 5633]Support avro schema for debezium connector

dennisylyung edited a comment on issue #6034: [Pulsar IO][Issue 5633]Support avro schema for debezium connector
URL: https://github.com/apache/pulsar/pull/6034#issuecomment-603635083
 
 
   I have tried to build and run it. 
   When I use JsonConverter ("org.apache.kafka.connect.json.JsonConverter")
   There will be an error of:
   ```
   12:29:55.891 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  io.debezium.connector.mysql.SnapshotReader - Step 9: scanned 40 rows in 1 tables in 00:00:00.228
   12:29:55.891 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  io.debezium.connector.mysql.SnapshotReader - Step 10: committing transaction
   12:29:55.939 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  io.debezium.connector.mysql.SnapshotReader - Step 11: releasing table read locks to enable MySQL writes
   12:29:56.044 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  io.debezium.connector.mysql.SnapshotReader - Writes to MySQL prevented for a total of 00:00:02.03
   12:29:56.044 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  io.debezium.connector.mysql.SnapshotReader - Completed snapshot in 00:00:03.829
   12:29:56.095 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [null] Creating producer on cnx [id: 0x7d5e380e, L:/172.27.240.97:60775 - R:pulsar.data.ztore.com/34.71.116.218:6650]
   12:29:56.336 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [pulsar-117-17] Created producer on cnx [id: 0x7d5e380e, L:/172.27.240.97:60775 - R:pulsar.data.ztore.com/34.71.116.218:6650]
   12:29:56.634 [pulsar-client-io-1-1] WARN  org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [pulsar-117-17] GetOrCreateSchema error
   org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Key schemas or Value schemas are different schema type, from key schema type is BYTES and to key schema is JSON, from value schema is BYTES and to value schema is JSON
           at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:997) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
           at org.apache.pulsar.client.impl.ClientCnx.lambda$sendGetOrCreateSchema$22(ClientCnx.java:839) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
           at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ~[?:1.8.0_221]
           at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_221]
           at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) [?:1.8.0_221]
           at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) [?:1.8.0_221]
           at org.apache.pulsar.client.impl.ClientCnx.handleGetOrCreateSchemaResponse(ClientCnx.java:733) [pulsar-client-original.jar:2.6.0-SNAPSHOT]
           at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:329) [pulsar-common.jar:2.6.0-SNAPSHOT]
           at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) [netty-codec-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) [netty-codec-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) [netty-common-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.43.Final.jar:4.1.43.Final]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
   ```
   I have made sure to delete all the existing schemas before running so the schema mutation error is not stem from leftover schema from previous runs.
   
   And if I use AvroConverter (org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter)
   There will be an error of:
   ```
   12:24:17,895 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Requested thread factory for connector MySqlConnector, id = ztoreSalesDb named = binlog-client
   12:24:17,907 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Requested thread factory for connector MySqlConnector, id = ztoreSalesDb named = snapshot
   12:24:17,908 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Creating thread debezium-mysqlconnector-ztoreSalesDb-snapshot
   12:24:22,622 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] AvroDataConfig - AvroDataConfig values: 
   	connect.meta.data = true
   	enhanced.avro.schema.support = false
   	schemas.cache.config = 1000
   
   12:24:24,488 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Encountered exception in sink write: 
   org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
    at [Source: (byte[])"�����"; line: 1, column: 2]
   Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
    at [Source: (byte[])"�����"; line: 1, column: 2]
   	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.1.jar:2.10.1]
   	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712) ~[jackson-core-2.10.1.jar:2.10.1]
   	at com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:690) ~[jackson-core-2.10.1.jar:2.10.1]
   	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2373) ~[jackson-core-2.10.1.jar:2.10.1]
   	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:672) ~[jackson-core-2.10.1.jar:2.10.1]
   	at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4231) ~[jackson-databind-2.10.1.jar:2.10.1]
   	at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2725) ~[jackson-databind-2.10.1.jar:2.10.1]
   	at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:45) ~[?:?]
   	at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:90) ~[?:?]
   	at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:43) ~[?:?]
   	at org.apache.pulsar.common.schema.KeyValue.encode(KeyValue.java:99) ~[pulsar-client-api.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:128) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:37) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:155) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:297) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:444) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:427) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:282) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
   12:24:24,499 WARN [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Failed to process result of message org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord@37912f1e
   java.lang.RuntimeException: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
    at [Source: (byte[])"�����"; line: 1, column: 2]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:448) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:427) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:282) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
   Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
    at [Source: (byte[])"�����"; line: 1, column: 2]
   Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
    at [Source: (byte[])"�����"; line: 1, column: 2]
   	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.1.jar:2.10.1]
   	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712) ~[jackson-core-2.10.1.jar:2.10.1]
   	at com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:690) ~[jackson-core-2.10.1.jar:2.10.1]
   	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2373) ~[jackson-core-2.10.1.jar:2.10.1]
   	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:672) ~[jackson-core-2.10.1.jar:2.10.1]
   	at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4231) ~[jackson-databind-2.10.1.jar:2.10.1]
   	at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2725) ~[jackson-databind-2.10.1.jar:2.10.1]
   	at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:45) ~[?:?]
   	at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:90) ~[?:?]
   	at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:43) ~[?:?]
   	at org.apache.pulsar.common.schema.KeyValue.encode(KeyValue.java:99) ~[pulsar-client-api.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:128) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:37) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:155) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:297) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:444) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	... 3 more
   12:24:24,501 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Encountered exception in source read: 
   java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error
   	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_221]
   	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_221]
   	at org.apache.pulsar.io.kafka.connect.KafkaConnectSource.read(KafkaConnectSource.java:165) ~[pulsar-io-kafka-connect-adaptor-2.6.0-SNAPSHOT.jar:?]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:460) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
   Caused by: java.lang.Exception: Sink Error
   	at org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord.fail(KafkaConnectSource.java:322) ~[pulsar-io-kafka-connect-adaptor-2.6.0-SNAPSHOT.jar:?]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:285) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	... 1 more
   12:24:24,502 ERROR [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - [ztore-data/debezium-local/debezium-mysql-source:0] Uncaught exception in Java Instance
   java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:464) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
   Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error
   	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_221]
   	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_221]
   	at org.apache.pulsar.io.kafka.connect.KafkaConnectSource.read(KafkaConnectSource.java:165) ~[?:?]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:460) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	... 2 more
   Caused by: java.lang.Exception: Sink Error
   	at org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord.fail(KafkaConnectSource.java:322) ~[?:?]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:285) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
   	... 1 more
   12:24:24,503 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Closing instance
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services