You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "kim-up (via GitHub)" <gi...@apache.org> on 2023/05/05 07:21:16 UTC

[GitHub] [incubator-seatunnel] kim-up opened a new issue, #4706: [Bug] [Connector-v2] [pulsar-source] Unreasonable shutdown of consumer results in: `consumer not ready. State: Closed`

kim-up opened a new issue, #4706:
URL: https://github.com/apache/incubator-seatunnel/issues/4706

   ### Search before asking
   
   - [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues.
   
   
   ### What happened
   
   When I run a `Pulsar Batch Task ` using below config : 
   ```
   env {
     execution.parallelism = 1
     job.mode = "BATCH"
   }
   
   source {
     Pulsar {
       topic = "test_topic_source"
       subscription.name = "seatunnel"
       client.service-url = "pulsar://host:6650"
       admin.service-url = "http://host:8080"
       result_table_name = "pulsar_table"
       format_error_handle_way = skip
       cursor.startup.mode = "EARLIEST"
       cursor.stop.mode = "LATEST"
       schema = {
         fields {
           id = bigint
           c_map = "map<string, smallint>"
           c_array = "array<tinyint>"
           c_string = string
           c_boolean = boolean
           c_tinyint = tinyint
           c_smallint = smallint
           c_int = int
           c_bigint = bigint
           c_float = float
           c_double = double
           c_decimal = "decimal(2, 1)"
           c_bytes = bytes
           c_date = date
           c_timestamp = timestamp
         }
       }
     }
   }
   
   transform {
   }
   
   sink {
     Console {
       source_table_name = "pulsar_table"
     }
   }
   ```
   Following error occurred:
   ```
   Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
   	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:188)
   	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
   	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
   Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: CheckpointCoordinator inside have error.
   	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:218)
   	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:214)
   	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$startTriggerPendingCheckpoint$7(CheckpointCoordinator.java:411)
   	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   Caused by: java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException: ErrorCode:[PULSAR-07], ErrorDescription:[Pulsar consumer acknowledgeCumulative failed] - pulsar consumer acknowledgeCumulative failed.
   	at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:245)
   	at java.util.HashMap.forEach(HashMap.java:1290)
   	at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.notifyCheckpointComplete(PulsarSourceReader.java:224)
   	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.notifyCheckpointComplete(SourceFlowLifeCycle.java:230)
   	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyCheckpointComplete$7(SeaTunnelTask.java:356)
   	at org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky(ExceptionUtil.java:130)
   	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyAllAction$11(SeaTunnelTask.java:370)
   	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
   	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
   	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
   	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
   	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
   	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
   	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
   	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
   	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
   	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyAllAction(SeaTunnelTask.java:370)
   	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyCheckpointComplete(SeaTunnelTask.java:356)
   	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91)
   	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
   	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
   	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
   	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
   Caused by: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed
   	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027)
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:377)
   	at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSplitReaderThread.committingCursor(PulsarSplitReaderThread.java:126)
   	at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:230)
   	... 29 more
   Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed
   	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
   	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:372)
   	... 31 more
   Caused by: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed
   	at org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:506)
   	at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:543)
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:503)
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:488)
   	... 32 more
   
   	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
   	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
   	at java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1298)
   	at java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1284)
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
   	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
   	at com.hazelcast.spi.impl.AbstractInvocationFuture.onComplete(AbstractInvocationFuture.java:1243)
   	at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1234)
   	at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.completeExceptionally(Invocation.java:680)
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyThrowable(Invocation.java:386)
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:330)
   	at com.hazelcast.spi.impl.operationservice.impl.Invocation.sendResponse(Invocation.java:230)
   	at com.hazelcast.spi.impl.operationservice.Operation.sendResponse(Operation.java:483)
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.sendResponseAfterOperationError(OperationRunnerImpl.java:426)
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.handleOperationError(OperationRunnerImpl.java:420)
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:253)
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
   	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
   Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException: ErrorCode:[PULSAR-07], ErrorDescription:[Pulsar consumer acknowledgeCumulative failed] - pulsar consumer acknowledgeCumulative failed.
   	at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:245)
   	at java.util.HashMap.forEach(HashMap.java:1290)
   	at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.notifyCheckpointComplete(PulsarSourceReader.java:224)
   	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.notifyCheckpointComplete(SourceFlowLifeCycle.java:230)
   	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyCheckpointComplete$7(SeaTunnelTask.java:356)
   	at org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky(ExceptionUtil.java:130)
   	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyAllAction$11(SeaTunnelTask.java:370)
   	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
   	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
   	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
   	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
   	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
   	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
   	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
   	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
   	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
   	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyAllAction(SeaTunnelTask.java:370)
   	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyCheckpointComplete(SeaTunnelTask.java:356)
   	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91)
   	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
   	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
   	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
   	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
   	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
   Caused by: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed
   	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027)
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:377)
   	at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSplitReaderThread.committingCursor(PulsarSplitReaderThread.java:126)
   	at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:230)
   	... 29 more
   Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed
   	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
   	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:372)
   	... 31 more
   Caused by: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed
   	at org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:506)
   	at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:543)
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:503)
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:488)
   	... 32 more
   
   	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:97)
   	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
   	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
   	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
   	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
   	... 5 more
   
   	at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:123)
   	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)
   	... 2 more
   ```
   
   ### SeaTunnel Version
   
   2.3.1-SNAPSHOT
   
   ### SeaTunnel Config
   
   ```conf
   *
   ```
   
   
   ### Running Command
   
   ```shell
   *
   ```
   
   
   ### Error Exception
   
   ```log
   *
   ```
   
   
   ### Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   ![image](https://user-images.githubusercontent.com/9264192/236398541-a66abae9-b89a-4054-8c12-629650541b82.png)
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] github-actions[bot] commented on issue #4706: [Bug] [Connector-v2] [pulsar-source] Unreasonable shutdown of consumer results in: `consumer not ready. State: Closed`

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #4706:
URL: https://github.com/apache/seatunnel/issues/4706#issuecomment-1586398423

   This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on issue #4706: [Bug] [Connector-v2] [pulsar-source] Unreasonable shutdown of consumer results in: `consumer not ready. State: Closed`

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on issue #4706:
URL: https://github.com/apache/incubator-seatunnel/issues/4706#issuecomment-1535840108

   
   fix suggestion
   move `consumer#close` to `catch` 
   
   
   reference
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java#L107


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] github-actions[bot] closed issue #4706: [Bug] [Connector-v2] [pulsar-source] Unreasonable shutdown of consumer results in: `consumer not ready. State: Closed`

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed issue #4706: [Bug] [Connector-v2] [pulsar-source] Unreasonable shutdown of consumer results in: `consumer not ready. State: Closed`
URL: https://github.com/apache/seatunnel/issues/4706


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] kim-up commented on issue #4706: [Bug] [Connector-v2] [pulsar-source] Unreasonable shutdown of consumer results in: `consumer not ready. State: Closed`

Posted by "kim-up (via GitHub)" <gi...@apache.org>.
kim-up commented on issue #4706:
URL: https://github.com/apache/incubator-seatunnel/issues/4706#issuecomment-1535850777

   @hailin0 Then put it in a PR together with #4684? Because the e2e of pulsar is added by the way, there is no e2e now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] github-actions[bot] commented on issue #4706: [Bug] [Connector-v2] [pulsar-source] Unreasonable shutdown of consumer results in: `consumer not ready. State: Closed`

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #4706:
URL: https://github.com/apache/seatunnel/issues/4706#issuecomment-1575859423

   This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org