You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 潘明文 <pa...@163.com> on 2022/03/07 02:06:45 UTC

Re:Re:回复:FlinkKafkaProducer 问题

目前出现下错误:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker


















在 2022-01-21 15:15:51,"潘明文" <pa...@163.com> 写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>HI ,
>   好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector flink-connector-kafka_2.11
>
>在 2022-01-21 14:36:06,"selves_nan" <se...@163.com> 写道:
>>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_nan@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月21日 13:00,潘明文<pa...@163.com> 写道:
>>HI,
>>"生产者的事务id"  怎么获取呀?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-01-21 10:41:37,"selves_nan" <se...@163.com> 写道:
>>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
>>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
>>//开启幂等性
>>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_nan@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月20日 14:39,潘明文<pa...@163.com> 写道:
>>hi,
>>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
>>
>>FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>>
>>
>>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id.
>>at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
>>at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
>>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>>at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
>>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>>at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
>>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>>at java.lang.Thread.run(Thread.java:748)
>>Suppressed: java.lang.NullPointerException
>>

hbase 写性能

Posted by 潘明文 <pa...@163.com>.
HI,

    目前环境是hbase4个Region server节点,内存为128`256, 目前写入hbase7000——10000多每秒,这正常,是否有调优的空间。谢谢!


代码如下:
BufferedMutatorParams params= new BufferedMutatorParams(TableName.valueOf(key));
//设置缓存10m,当达到10m时数据会自动刷到hbase
params.writeBufferSize(10*1024*1024L);//设置缓存的大小
params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L);//写缓存刷写时间为5s
BufferedMutator mutator = connection.getBufferedMutator(params);
mutator.mutate(recordList);
mutator.flush();
mutator.close();

Re:Re:Re:回复:FlinkKafkaProducer 问题

Posted by 潘明文 <pa...@163.com>.
HI,


 flink 还是报以下错误:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker


代码如下:
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", KAFKA_ADDR);
prop.setProperty("acks", "all");
//设置producer 幂等性 保证producer 数据写入到broker 不重复
prop.setProperty("enable.idempotence", "true");
// 设置FlinkKafkaProducer里面的事务超时时间,默认broker的最大事务超时时间为15分钟
prop.setProperty("transaction.timeout.ms", transaction + "");
prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my_tx_id");
prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);


FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
WRITE_TOPIC,
serializationSchema,
prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); 














At 2022-03-07 10:06:45, "潘明文" <pa...@163.com> wrote:

目前出现下错误:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker


















在 2022-01-21 15:15:51,"潘明文" <pa...@163.com> 写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>HI ,
>   好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector flink-connector-kafka_2.11
>
>在 2022-01-21 14:36:06,"selves_nan" <se...@163.com> 写道:
>>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_nan@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月21日 13:00,潘明文<pa...@163.com> 写道:
>>HI,
>>"生产者的事务id"  怎么获取呀?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-01-21 10:41:37,"selves_nan" <se...@163.com> 写道:
>>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
>>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
>>//开启幂等性
>>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_nan@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月20日 14:39,潘明文<pa...@163.com> 写道:
>>hi,
>>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
>>
>>FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>>
>>
>>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id.
>>at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
>>at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
>>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>>at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
>>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>>at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
>>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>>at java.lang.Thread.run(Thread.java:748)
>>Suppressed: java.lang.NullPointerException
>>