You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by lp <97...@qq.com> on 2021/01/22 08:14:17 UTC

flink-Kafka 报错:ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

测试代码如下:
--------------------------
public class Sink_KafkaSink_1{
    public static void main(String[] args) throws Exception {
        final ParameterTool params =
ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties"));
        String host = params.get("host");
        int kafkaPort = Integer.parseInt(params.get("kafkaPort"));
        produceTestdata2kafka(new
StringJoiner(":").add(host).add(String.valueOf(kafkaPort)).toString());
    }

    private static void produceTestdata2kafka(String kafkaAddr) throws
Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
       
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        DataStreamSource<String> text = env.addSource(new
CustomsourceFuncation()).setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",kafkaAddr);

        FlinkKafkaProducer producer = new
FlinkKafkaProducer("flinktest",//topic
                new SimpleStringSchema(), //消息序列化
                properties
        );
        //写入 Kafka 时附加记录的事件时间戳
        producer.setWriteTimestampToKafka(true);
        text.addSink(producer);
        env.execute("[kafkaSink with custom source]");
    }
}

class CustomsourceFuncation implements SourceFunction<String> {
    //private long count = 1L;
    private boolean isRunning = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while(isRunning){
            //图书的排行榜
            List<String> books = new ArrayList<>();
            books.add("msg1");
            books.add("msg2");
            books.add("msg3");
            books.add("msg4");
            books.add("msg5");
            int i = new Random().nextInt(5);
            ctx.collect(books.get(i));
            //每2秒产生一条数据
            Thread.sleep(2000);
        }
    }

    //取消一个cancel的时候会调用的方法
    @Override
    public void cancel() {
        isRunning = false;
    }
}
------------------------------------------

本地测试无异常,maven打包后提交yarn集群运行,application Mode模式,jobmanager循环一直报错如下:
----------------------------------
2021-01-22 07:54:31,929 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RUNNING to RESTARTING.
2021-01-22 07:54:32,930 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RESTARTING to RUNNING.
2021-01-22 07:54:32,931 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No
checkpoint found during restore.
2021-01-22 07:54:32,931 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from CREATED to SCHEDULED.
2021-01-22 07:54:32,932 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from SCHEDULED to DEPLOYING.
2021-01-22 07:54:32,932 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #2) with attempt id
ca057bcbb78c0a81fc471d81db89ec28 to container_1611044725922_0027_01_000002 @
slave02 (dataPort=37913) with allocation id 3f0f1dc64e898272d68989ca9a8feff2
2021-01-22 07:54:32,950 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from DEPLOYING to RUNNING.
2021-01-22 07:54:32,969 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from RUNNING to FAILED on container_1611044725922_0027_01_000002 @
slave02 (dataPort=37913).
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
~[quickstart-0.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
~[quickstart-0.1.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[quickstart-0.1.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[quickstart-0.1.jar:?]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
Caused by: org.apache.kafka.common.KafkaException: class
org.apache.kafka.common.serialization.ByteArraySerializer is not an instance
of org.apache.kafka.common.serialization.Serializer
        at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
~[quickstart-0.1.jar:?]
        at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
~[quickstart-0.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
~[quickstart-0.1.jar:?]
        ... 23 more
2021-01-22 07:54:32,971 INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-01-22 07:54:32,971 INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 1 tasks should be restarted to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0. 
2021-01-22 07:54:32,971 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RUNNING to RESTARTING.
2021-01-22 07:54:33,973 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RESTARTING to RUNNING.
----------------------------------


flink 1.12.1版本,试着用per-job mode 部署是ok的,在flinktest 这个topic能正常消费到数据



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:flink-Kafka 报错:ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

Posted by felixzh <fe...@126.com>.
hi,这个是依赖的问题。如果集群flink/lib下已经有了flink-connector-kafka.jar,提交的任务pom里面就要provider一下

















在 2021-01-22 16:14:17,"lp" <97...@qq.com> 写道:
>测试代码如下:
>--------------------------
>public class Sink_KafkaSink_1{
>    public static void main(String[] args) throws Exception {
>        final ParameterTool params =
>ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties"));
>        String host = params.get("host");
>        int kafkaPort = Integer.parseInt(params.get("kafkaPort"));
>        produceTestdata2kafka(new
>StringJoiner(":").add(host).add(String.valueOf(kafkaPort)).toString());
>    }
>
>    private static void produceTestdata2kafka(String kafkaAddr) throws
>Exception {
>        StreamExecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment();
>        env.enableCheckpointing(5000);
>       
>env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
>        DataStreamSource<String> text = env.addSource(new
>CustomsourceFuncation()).setParallelism(1);
>
>        Properties properties = new Properties();
>        properties.setProperty("bootstrap.servers",kafkaAddr);
>
>        FlinkKafkaProducer producer = new
>FlinkKafkaProducer("flinktest",//topic
>                new SimpleStringSchema(), //消息序列化
>                properties
>        );
>        //写入 Kafka 时附加记录的事件时间戳
>        producer.setWriteTimestampToKafka(true);
>        text.addSink(producer);
>        env.execute("[kafkaSink with custom source]");
>    }
>}
>
>class CustomsourceFuncation implements SourceFunction<String> {
>    //private long count = 1L;
>    private boolean isRunning = true;
>
>    @Override
>    public void run(SourceContext<String> ctx) throws Exception {
>        while(isRunning){
>            //图书的排行榜
>            List<String> books = new ArrayList<>();
>            books.add("msg1");
>            books.add("msg2");
>            books.add("msg3");
>            books.add("msg4");
>            books.add("msg5");
>            int i = new Random().nextInt(5);
>            ctx.collect(books.get(i));
>            //每2秒产生一条数据
>            Thread.sleep(2000);
>        }
>    }
>
>    //取消一个cancel的时候会调用的方法
>    @Override
>    public void cancel() {
>        isRunning = false;
>    }
>}
>------------------------------------------
>
>本地测试无异常,maven打包后提交yarn集群运行,application Mode模式,jobmanager循环一直报错如下:
>----------------------------------
>2021-01-22 07:54:31,929 INFO 
>org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
>[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
>from state RUNNING to RESTARTING.
>2021-01-22 07:54:32,930 INFO 
>org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
>[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
>from state RESTARTING to RUNNING.
>2021-01-22 07:54:32,931 INFO 
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No
>checkpoint found during restore.
>2021-01-22 07:54:32,931 INFO 
>org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
>Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
>switched from CREATED to SCHEDULED.
>2021-01-22 07:54:32,932 INFO 
>org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
>Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
>switched from SCHEDULED to DEPLOYING.
>2021-01-22 07:54:32,932 INFO 
>org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying
>Source: Custom Source -> Sink: Unnamed (1/1) (attempt #2) with attempt id
>ca057bcbb78c0a81fc471d81db89ec28 to container_1611044725922_0027_01_000002 @
>slave02 (dataPort=37913) with allocation id 3f0f1dc64e898272d68989ca9a8feff2
>2021-01-22 07:54:32,950 INFO 
>org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
>Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
>switched from DEPLOYING to RUNNING.
>2021-01-22 07:54:32,969 INFO 
>org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
>Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
>switched from RUNNING to FAILED on container_1611044725922_0027_01_000002 @
>slave02 (dataPort=37913).
>org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>        at
>org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
>~[quickstart-0.1.jar:?]
>        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>~[quickstart-0.1.jar:?]
>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>~[quickstart-0.1.jar:?]
>        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
>Caused by: org.apache.kafka.common.KafkaException: class
>org.apache.kafka.common.serialization.ByteArraySerializer is not an instance
>of org.apache.kafka.common.serialization.Serializer
>        at
>org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
>~[quickstart-0.1.jar:?]
>        at
>org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
>~[quickstart-0.1.jar:?]
>        ... 23 more
>2021-01-22 07:54:32,971 INFO 
>org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>[] - Calculating tasks to restart to recover the failed task
>cbc357ccb763df2852fee8c4fc7d55f2_0.
>2021-01-22 07:54:32,971 INFO 
>org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>[] - 1 tasks should be restarted to recover the failed task
>cbc357ccb763df2852fee8c4fc7d55f2_0. 
>2021-01-22 07:54:32,971 INFO 
>org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
>[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
>from state RUNNING to RESTARTING.
>2021-01-22 07:54:33,973 INFO 
>org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
>[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
>from state RESTARTING to RUNNING.
>----------------------------------
>
>
>flink 1.12.1版本,试着用per-job mode 部署是ok的,在flinktest 这个topic能正常消费到数据
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/