You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "chenwei182729 (via GitHub)" <gi...@apache.org> on 2023/01/31 13:55:51 UTC
[GitHub] [incubator-seatunnel] chenwei182729 opened a new issue, #4023: [Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
chenwei182729 opened a new issue, #4023:
URL: https://github.com/apache/incubator-seatunnel/issues/4023
### Search before asking
- [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues.
### What happened
当使用Seatunnel自已的引擎,sink到kafka时,报如下异常:
可能异常产生原因是: org.apache.kafka.common.serialization.ByteArraySerializer 使用的类加载器为SeatunnelChildFirstClassLoader,org.apache.kafka.common.serialization.Serializer使用的类加载器为AppClassLoader,
```
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:403) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:365) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaNoTransactionSender.<init>(KafkaNoTransactionSender.java:42) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.<init>(KafkaSinkWriter.java:99) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:82) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:196) ~[classes/:?]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$14(SeaTunnelTask.java:323) ~[classes/:?]
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_202]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_202]
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) ~[?:1.8.0_202]
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_202]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_202]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_202]
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[?:1.8.0_202]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[?:1.8.0_202]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_202]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_202]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:321) ~[classes/:?]
at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$run$0(NotifyTaskRestoreOperation.java:87) ~[classes/:?]
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:47) ~[classes/:?]
at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.run(NotifyTaskRestoreOperation.java:81) ~[classes/:?]
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61) ~[hazelcast-5.1.jar:5.1]
at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:40) ~[classes/:?]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.sendOperationToMemberNode(CheckpointManager.java:230) ~[classes/:?]
... 10 more
```
### SeaTunnel Version
2.3.1
### SeaTunnel Config
```conf
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
schema = {
fields {
name = "string"
age = "int"
}
}
parallelism = 3
}
}
transform {
}
sink {
kafka {
topic = "quickstart-seatunnel"
bootstrap.servers="127.0.0.1:9092"
partition=1
format=json
kafka.request.timeout.ms=60000
kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
}
}
```
### Running Command
```shell
org.apache.seatunnel.engine.client.SeaTunnelClientTest 类加,使用test方式运行
@Test
public void testExecuteJob_kafka() {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/batch_fakesource_to_kafka.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_kafka");
JobExecutionEnvironment jobExecutionEnv = CLIENT.createExecutionContext(filePath, jobConfig);
try {
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
return clientJobProxy.waitForJobComplete();
});
await().atMost(180000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertTrue(
objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
```
```
### Error Exception
```log
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:403) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:365) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaNoTransactionSender.<init>(KafkaNoTransactionSender.java:42) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.<init>(KafkaSinkWriter.java:99) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:82) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:196) ~[classes/:?]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$14(SeaTunnelTask.java:323) ~[classes/:?]
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_202]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_202]
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) ~[?:1.8.0_202]
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_202]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_202]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_202]
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[?:1.8.0_202]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[?:1.8.0_202]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_202]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_202]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:321) ~[classes/:?]
at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$run$0(NotifyTaskRestoreOperation.java:87) ~[classes/:?]
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:47) ~[classes/:?]
at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.run(NotifyTaskRestoreOperation.java:81) ~[classes/:?]
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241) ~[hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61) ~[hazelcast-5.1.jar:5.1]
at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:40) ~[classes/:?]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.sendOperationToMemberNode(CheckpointManager.java:230) ~[classes/:?]
... 10 more
```
```
### Flink or Spark Version
_No response_
### Java or Scala Version
1.8.0
### Screenshots
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [I] [Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer [seatunnel]
Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on issue #4023:
URL: https://github.com/apache/seatunnel/issues/4023#issuecomment-2078807741
link https://github.com/apache/seatunnel/pull/6355
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] chenwei182729 closed issue #4023: [Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
Posted by "chenwei182729 (via GitHub)" <gi...@apache.org>.
chenwei182729 closed issue #4023: [Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
URL: https://github.com/apache/incubator-seatunnel/issues/4023
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] chenwei182729 commented on issue #4023: [Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
Posted by "chenwei182729 (via GitHub)" <gi...@apache.org>.
chenwei182729 commented on issue #4023:
URL: https://github.com/apache/incubator-seatunnel/issues/4023#issuecomment-1435693398
Bug is fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] hailin0 commented on issue #4023: [Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on issue #4023:
URL: https://github.com/apache/incubator-seatunnel/issues/4023#issuecomment-1410415849
@Hisoka-X
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] Hisoka-X commented on issue #4023: [Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on issue #4023:
URL: https://github.com/apache/incubator-seatunnel/issues/4023#issuecomment-1411497141
Can you use dev branch test again?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org