You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "jiangzzwy (via GitHub)" <gi...@apache.org> on 2023/04/17 13:35:50 UTC

[GitHub] [incubator-seatunnel] jiangzzwy opened a new issue, #4597: [Bug] [connector-kafka] InstanceAlreadyExistsException

jiangzzwy opened a new issue, #4597:
URL: https://github.com/apache/incubator-seatunnel/issues/4597

   ### Search before asking
   
   - [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues.
   
   
   ### What happened
   
   当任务的并行度大于2时,使用Kafka作为sink端,后台在启动服务的时候就会报错,但是这个错误不影响后续的运行,但是对于使用者而言,会带来困扰。
   
   ### SeaTunnel Version
   
   2.3.1
   
   ### SeaTunnel Config
   
   ```conf
   env {
     execution.parallelism = 3
     job.mode = "STREAMING"
     checkpoint.interval = 2000
     #execution.checkpoint.interval = 10000
     #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
   }
   
   source {
      Fmq {
       parallelism = 2
       format = "JSON"
       address = "xxxx"
       app = "xxx"
       password = "****"
       topic = "xxx"
       batch_size = 10
       batch_window = 1000
       wait_interval = 1000
     }
   }
   
   sink {
   
       Kafka {
         topic = "t_count"
         bootstrap.servers = "localhost:9092"
     }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   % ./bin/seatunnel.sh --config ./config/fmq_kafka2.template -e local
   ```
   
   
   ### Error Exception
   
   ```log
   2023-04-17 20:46:13,136 ERROR org.apache.kafka.common.metrics.Metrics - Error when registering metric on org.apache.kafka.common.metrics.JmxReporter
   org.apache.kafka.common.KafkaException: Error registering mbean kafka.producer:type=kafka-metrics-count,client-id=producer-1
   	at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:229) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:144) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:573) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:512) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:495) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:480) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.kafka.common.metrics.Metrics.<init>(Metrics.java:180) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.kafka.common.metrics.Metrics.<init>(Metrics.java:134) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:357) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaNoTransactionSender.<init>(KafkaNoTransactionSender.java:43) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.<init>(KafkaSinkWriter.java:104) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:102) ~[connector-kafka-2.3.1.jar:2.3.1]
   	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:246) ~[seatunnel-starter.jar:2.3.1]
   	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$14(SeaTunnelTask.java:383) ~[seatunnel-starter.jar:2.3.1]
   	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_311]
   	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_311]
   	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) ~[?:1.8.0_311]
   	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) ~[?:1.8.0_311]
   	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_311]
   	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_311]
   	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[?:1.8.0_311]
   	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[?:1.8.0_311]
   	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_311]
   	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_311]
   	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:380) ~[seatunnel-starter.jar:2.3.1]
   	at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$run$0(NotifyTaskRestoreOperation.java:96) ~[seatunnel-starter.jar:2.3.1]
   	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) ~[seatunnel-starter.jar:2.3.1]
   	at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.run(NotifyTaskRestoreOperation.java:85) ~[seatunnel-starter.jar:2.3.1]
   	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.1]
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.1]
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.1]
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[seatunnel-starter.jar:2.3.1]
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411) ~[seatunnel-starter.jar:2.3.1]
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438) ~[seatunnel-starter.jar:2.3.1]
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601) ~[seatunnel-starter.jar:2.3.1]
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580) ~[seatunnel-starter.jar:2.3.1]
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541) ~[seatunnel-starter.jar:2.3.1]
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241) ~[seatunnel-starter.jar:2.3.1]
   	at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61) ~[seatunnel-starter.jar:2.3.1]
   	at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:51) ~[seatunnel-starter.jar:2.3.1]
   	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.sendOperationToMemberNode(CheckpointManager.java:272) ~[seatunnel-starter.jar:2.3.1]
   	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.restoreTaskState(CheckpointCoordinator.java:249) ~[seatunnel-starter.jar:2.3.1]
   	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$reportedTask$1(CheckpointCoordinator.java:189) ~[seatunnel-starter.jar:2.3.1]
   	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) [?:1.8.0_311]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_311]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_311]
   	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_311]
   Caused by: javax.management.InstanceAlreadyExistsException: kafka.producer:type=kafka-metrics-count,client-id=producer-1
   	at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[?:1.8.0_311]
   	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[?:1.8.0_311]
   	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[?:1.8.0_311]
   	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[?:1.8.0_311]
   	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[?:1.8.0_311]
   	at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:1.8.0_311]
   	at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:227) ~[connector-kafka-2.3.1.jar:2.3.1]
   	... 49 more
   ```
   
   
   ### Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   java
   
   ### Screenshots
   
   ![image](https://user-images.githubusercontent.com/23492991/232499897-dd08d4ef-159b-4620-97b1-3a60309fc6b2.png)
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on issue #4597: [Bug] [connector-kafka] InstanceAlreadyExistsException

Posted by "laglangyue (via GitHub)" <gi...@apache.org>.
laglangyue commented on issue #4597:
URL: https://github.com/apache/incubator-seatunnel/issues/4597#issuecomment-1513118708

   if possiple, can you submit a pr. Looking forward to your PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] github-actions[bot] closed issue #4597: [Bug] [connector-kafka] InstanceAlreadyExistsException

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed issue #4597: [Bug] [connector-kafka] InstanceAlreadyExistsException
URL: https://github.com/apache/seatunnel/issues/4597


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] github-actions[bot] commented on issue #4597: [Bug] [connector-kafka] InstanceAlreadyExistsException

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #4597:
URL: https://github.com/apache/seatunnel/issues/4597#issuecomment-1641205852

   This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] github-actions[bot] commented on issue #4597: [Bug] [connector-kafka] InstanceAlreadyExistsException

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #4597:
URL: https://github.com/apache/incubator-seatunnel/issues/4597#issuecomment-1553844707

   This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] jiangzzwy commented on issue #4597: [Bug] [connector-kafka] InstanceAlreadyExistsException

Posted by "jiangzzwy (via GitHub)" <gi...@apache.org>.
jiangzzwy commented on issue #4597:
URL: https://github.com/apache/incubator-seatunnel/issues/4597#issuecomment-1511368753

   解决方案:在构建Producer的时候可以将SubTaskIndex传递过去,这样就不会报错了,参考https://blog.csdn.net/qq_33642970/article/details/106410448
   ![image](https://user-images.githubusercontent.com/23492991/232501289-e76f33cf-75c5-44e1-9eeb-ae4ba5d03457.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org