You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "chenwei182729 (via GitHub)" <gi...@apache.org> on 2023/01/31 13:55:51 UTC

[GitHub] [incubator-seatunnel] chenwei182729 opened a new issue, #4023: [Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

chenwei182729 opened a new issue, #4023:
URL: https://github.com/apache/incubator-seatunnel/issues/4023

   ### Search before asking
   
   - [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues.
   
   
   ### What happened
   
   当使用Seatunnel自已的引擎,sink到kafka时,报如下异常:
   可能异常产生原因是: org.apache.kafka.common.serialization.ByteArraySerializer 使用的类加载器为SeatunnelChildFirstClassLoader,org.apache.kafka.common.serialization.Serializer使用的类加载器为AppClassLoader,
   
   ```
   Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
   	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:403) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:365) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaNoTransactionSender.<init>(KafkaNoTransactionSender.java:42) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.<init>(KafkaSinkWriter.java:99) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:82) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:196) ~[classes/:?]
   	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$14(SeaTunnelTask.java:323) ~[classes/:?]
   	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_202]
   	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_202]
   	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) ~[?:1.8.0_202]
   	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_202]
   	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_202]
   	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_202]
   	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[?:1.8.0_202]
   	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[?:1.8.0_202]
   	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_202]
   	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_202]
   	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:321) ~[classes/:?]
   	at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$run$0(NotifyTaskRestoreOperation.java:87) ~[classes/:?]
   	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:47) ~[classes/:?]
   	at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.run(NotifyTaskRestoreOperation.java:81) ~[classes/:?]
   	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61) ~[hazelcast-5.1.jar:5.1]
   	at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:40) ~[classes/:?]
   	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.sendOperationToMemberNode(CheckpointManager.java:230) ~[classes/:?]
   	... 10 more
   ```
   
   
   
   ### SeaTunnel Version
   
   2.3.1
   
   ### SeaTunnel Config
   
   ```conf
   env {
     # You can set flink configuration here
     execution.parallelism = 1
     job.mode = "BATCH"
     execution.checkpoint.interval = 5000
     #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
   }
   
   source {
     # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
       result_table_name = "fake"
       schema = {
         fields {
           name = "string"
           age = "int"
         }
       }
       parallelism = 3
     }
   }
   
   transform {
   }
   
   sink {
     kafka {
       topic = "quickstart-seatunnel"
       bootstrap.servers="127.0.0.1:9092"
       partition=1
       format=json
       kafka.request.timeout.ms=60000
       kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
       kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
     }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   org.apache.seatunnel.engine.client.SeaTunnelClientTest 类加,使用test方式运行
   
   @Test
       public void testExecuteJob_kafka() {
           Common.setDeployMode(DeployMode.CLIENT);
           String filePath = TestUtils.getResource("/batch_fakesource_to_kafka.conf");
           JobConfig jobConfig = new JobConfig();
           jobConfig.setName("fake_to_kafka");
   
           JobExecutionEnvironment jobExecutionEnv = CLIENT.createExecutionContext(filePath, jobConfig);
   
           try {
               final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
               CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
                   return clientJobProxy.waitForJobComplete();
               });
   
               await().atMost(180000, TimeUnit.MILLISECONDS)
                       .untilAsserted(() -> Assertions.assertTrue(
                               objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
   
           } catch (ExecutionException | InterruptedException e) {
               throw new RuntimeException(e);
           }
       }
   ```
   ```
   
   
   ### Error Exception
   
   ```log
   Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
   	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:403) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:365) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaNoTransactionSender.<init>(KafkaNoTransactionSender.java:42) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.<init>(KafkaSinkWriter.java:99) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:82) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
   	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:196) ~[classes/:?]
   	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$14(SeaTunnelTask.java:323) ~[classes/:?]
   	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_202]
   	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_202]
   	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) ~[?:1.8.0_202]
   	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_202]
   	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_202]
   	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_202]
   	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[?:1.8.0_202]
   	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[?:1.8.0_202]
   	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_202]
   	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_202]
   	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:321) ~[classes/:?]
   	at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$run$0(NotifyTaskRestoreOperation.java:87) ~[classes/:?]
   	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:47) ~[classes/:?]
   	at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.run(NotifyTaskRestoreOperation.java:81) ~[classes/:?]
   	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241) ~[hazelcast-5.1.jar:5.1]
   	at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61) ~[hazelcast-5.1.jar:5.1]
   	at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:40) ~[classes/:?]
   	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.sendOperationToMemberNode(CheckpointManager.java:230) ~[classes/:?]
   	... 10 more
   ```
   ```
   
   
   ### Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   1.8.0
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer [seatunnel]

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on issue #4023:
URL: https://github.com/apache/seatunnel/issues/4023#issuecomment-2078807741

   link https://github.com/apache/seatunnel/pull/6355


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] chenwei182729 closed issue #4023: [Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

Posted by "chenwei182729 (via GitHub)" <gi...@apache.org>.
chenwei182729 closed issue #4023: [Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
URL: https://github.com/apache/incubator-seatunnel/issues/4023


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] chenwei182729 commented on issue #4023: [Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

Posted by "chenwei182729 (via GitHub)" <gi...@apache.org>.
chenwei182729 commented on issue #4023:
URL: https://github.com/apache/incubator-seatunnel/issues/4023#issuecomment-1435693398

   Bug is fixed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on issue #4023: [Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on issue #4023:
URL: https://github.com/apache/incubator-seatunnel/issues/4023#issuecomment-1410415849

   @Hisoka-X 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on issue #4023: [Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on issue #4023:
URL: https://github.com/apache/incubator-seatunnel/issues/4023#issuecomment-1411497141

   Can you use dev branch test again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org