You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "kim-up (via GitHub)" <gi...@apache.org> on 2023/05/05 07:21:16 UTC
[GitHub] [incubator-seatunnel] kim-up opened a new issue, #4706: [Bug] [Connector-v2] [pulsar-source] Unreasonable shutdown of consumer results in: `consumer not ready. State: Closed`
kim-up opened a new issue, #4706:
URL: https://github.com/apache/incubator-seatunnel/issues/4706
### Search before asking
- [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues.
### What happened
When I run a `Pulsar Batch Task ` using below config :
```
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
Pulsar {
topic = "test_topic_source"
subscription.name = "seatunnel"
client.service-url = "pulsar://host:6650"
admin.service-url = "http://host:8080"
result_table_name = "pulsar_table"
format_error_handle_way = skip
cursor.startup.mode = "EARLIEST"
cursor.stop.mode = "LATEST"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}
transform {
}
sink {
Console {
source_table_name = "pulsar_table"
}
}
```
Following error occurred:
```
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:188)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: CheckpointCoordinator inside have error.
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:218)
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:214)
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$startTriggerPendingCheckpoint$7(CheckpointCoordinator.java:411)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException: ErrorCode:[PULSAR-07], ErrorDescription:[Pulsar consumer acknowledgeCumulative failed] - pulsar consumer acknowledgeCumulative failed.
at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:245)
at java.util.HashMap.forEach(HashMap.java:1290)
at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.notifyCheckpointComplete(PulsarSourceReader.java:224)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.notifyCheckpointComplete(SourceFlowLifeCycle.java:230)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyCheckpointComplete$7(SeaTunnelTask.java:356)
at org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky(ExceptionUtil.java:130)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyAllAction$11(SeaTunnelTask.java:370)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyAllAction(SeaTunnelTask.java:370)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyCheckpointComplete(SeaTunnelTask.java:356)
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91)
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed
at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027)
at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:377)
at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSplitReaderThread.committingCursor(PulsarSplitReaderThread.java:126)
at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:230)
... 29 more
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:372)
... 31 more
Caused by: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed
at org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:506)
at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:543)
at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:503)
at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:488)
... 32 more
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1298)
at java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1284)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at com.hazelcast.spi.impl.AbstractInvocationFuture.onComplete(AbstractInvocationFuture.java:1243)
at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1234)
at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.completeExceptionally(Invocation.java:680)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyThrowable(Invocation.java:386)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:330)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.sendResponse(Invocation.java:230)
at com.hazelcast.spi.impl.operationservice.Operation.sendResponse(Operation.java:483)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.sendResponseAfterOperationError(OperationRunnerImpl.java:426)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.handleOperationError(OperationRunnerImpl.java:420)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:253)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException: ErrorCode:[PULSAR-07], ErrorDescription:[Pulsar consumer acknowledgeCumulative failed] - pulsar consumer acknowledgeCumulative failed.
at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:245)
at java.util.HashMap.forEach(HashMap.java:1290)
at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.notifyCheckpointComplete(PulsarSourceReader.java:224)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.notifyCheckpointComplete(SourceFlowLifeCycle.java:230)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyCheckpointComplete$7(SeaTunnelTask.java:356)
at org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky(ExceptionUtil.java:130)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyAllAction$11(SeaTunnelTask.java:370)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyAllAction(SeaTunnelTask.java:370)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyCheckpointComplete(SeaTunnelTask.java:356)
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91)
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed
at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027)
at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:377)
at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSplitReaderThread.committingCursor(PulsarSplitReaderThread.java:126)
at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:230)
... 29 more
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:372)
... 31 more
Caused by: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed
at org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:506)
at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:543)
at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:503)
at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:488)
... 32 more
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:97)
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
... 5 more
at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:123)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)
... 2 more
```
### SeaTunnel Version
2.3.1-SNAPSHOT
### SeaTunnel Config
```conf
*
```
### Running Command
```shell
*
```
### Error Exception
```log
*
```
### Flink or Spark Version
_No response_
### Java or Scala Version
_No response_
### Screenshots
![image](https://user-images.githubusercontent.com/9264192/236398541-a66abae9-b89a-4054-8c12-629650541b82.png)
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [seatunnel] github-actions[bot] commented on issue #4706: [Bug] [Connector-v2] [pulsar-source] Unreasonable shutdown of consumer results in: `consumer not ready. State: Closed`
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #4706:
URL: https://github.com/apache/seatunnel/issues/4706#issuecomment-1586398423
This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] hailin0 commented on issue #4706: [Bug] [Connector-v2] [pulsar-source] Unreasonable shutdown of consumer results in: `consumer not ready. State: Closed`
Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on issue #4706:
URL: https://github.com/apache/incubator-seatunnel/issues/4706#issuecomment-1535840108
fix suggestion
move `consumer#close` to `catch`
reference
https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java#L107
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [seatunnel] github-actions[bot] closed issue #4706: [Bug] [Connector-v2] [pulsar-source] Unreasonable shutdown of consumer results in: `consumer not ready. State: Closed`
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed issue #4706: [Bug] [Connector-v2] [pulsar-source] Unreasonable shutdown of consumer results in: `consumer not ready. State: Closed`
URL: https://github.com/apache/seatunnel/issues/4706
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] kim-up commented on issue #4706: [Bug] [Connector-v2] [pulsar-source] Unreasonable shutdown of consumer results in: `consumer not ready. State: Closed`
Posted by "kim-up (via GitHub)" <gi...@apache.org>.
kim-up commented on issue #4706:
URL: https://github.com/apache/incubator-seatunnel/issues/4706#issuecomment-1535850777
@hailin0 Then put it in a PR together with #4684? Because the e2e of pulsar is added by the way, there is no e2e now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [seatunnel] github-actions[bot] commented on issue #4706: [Bug] [Connector-v2] [pulsar-source] Unreasonable shutdown of consumer results in: `consumer not ready. State: Closed`
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #4706:
URL: https://github.com/apache/seatunnel/issues/4706#issuecomment-1575859423
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org