You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Shiao-An Yuan (Jira)" <ji...@apache.org> on 2021/10/19 10:33:00 UTC

[jira] [Updated] (FLINK-24587) Let PubSub source support changing subscriptions

     [ https://issues.apache.org/jira/browse/FLINK-24587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Shiao-An Yuan updated FLINK-24587:
----------------------------------
    Description: 
Original post on user mailing list: [link|https://lists.apache.org/thread.html/ra3047e5105fccbea42de6c37d52d05b492af496bd0bb95cc534630de%40%3Cuser.flink.apache.org%3E]

After resuming a Flink application from a snapshot with a *new subscription*, I got following errors repeatedly.

 
{code:java}
org.apache.flink.util.SerializedThrowable: INVALID_ARGUMENT: You have
passed a subscription that does not belong to the given ack ID
(resource=projects/xxxxx/subscriptions/xxxx).
        at
io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:244)
~[?:?]
        at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:225)
~[?:?]
        at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142)
~[?:?]
        at
com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.acknowledge(SubscriberGrpc.java:1628)
~[?:?]
        at
org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.acknowledge(BlockingGrpcPubSubSubscriber.java:99)
~[?:?]
        at
org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint.notifyCheckpointComplete(AcknowledgeOnCheckpoint.java:84)
~[?:?]
        at
org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.notifyCheckpointComplete(PubSubSource.java:208)
~[?:?]
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:319)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1083)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1048)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1071)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
{code}
 

As I see it, the [AckId|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java#L149] became invalid as long as we change to another subscription.

I also noticed an interesting thing. The process of doing a checkpoint/savepoint is as follow:
 # output a checkpoint/savepoint which contains non-acknowledged message's ackIds
 # If the checkpoint/savepoint success, do the ack ([src|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java#L84])
 # remove those ackIds from state ([src|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java#L87])

If we resume a job from a snapshot, those acknowledged ackIds (removed in step 3) still exist in the savepoint (created in step 1), so it will do the ack again when the next checkpoint complete.

In my opinion, these ackIds stored in savepoint is the root cause to make we unable changing subscriptions.

  was:
Original post on user mailing list: [link|https://lists.apache.org/thread.html/ra3047e5105fccbea42de6c37d52d05b492af496bd0bb95cc534630de%40%3Cuser.flink.apache.org%3E]

After resuming a Flink application from a snapshot with a *new subscription*, I got following errors repeatedly.

 
{code:java}
org.apache.flink.util.SerializedThrowable: INVALID_ARGUMENT: You have
passed a subscription that does not belong to the given ack ID
(resource=projects/xxxxx/subscriptions/xxxx).
        at
io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:244)
~[?:?]
        at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:225)
~[?:?]
        at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142)
~[?:?]
        at
com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.acknowledge(SubscriberGrpc.java:1628)
~[?:?]
        at
org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.acknowledge(BlockingGrpcPubSubSubscriber.java:99)
~[?:?]
        at
org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint.notifyCheckpointComplete(AcknowledgeOnCheckpoint.java:84)
~[?:?]
        at
org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.notifyCheckpointComplete(PubSubSource.java:208)
~[?:?]
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:319)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1083)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1048)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1071)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
{code}
 

As I see it, the [AckId|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java#L149] became invalid as long as we change to another subscription.

I also noticed an interesting thing. The process of doing a checkpoint/savepoint is as follow:
 # output a checkpoint/savepoint which contains non-acknowledged message's ackIds
 # If the checkpoint/savepoint success, do the ack ([s|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java#L84]rc)
 # remove those ackIds from state ([src|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java#L87])

If we resume a job from a snapshot, those acknowledged ackIds (removed in step 3) still exist in the savepoint (created in step 1), so it will do the ack again when the next checkpoint complete.

In my opinion, these ackIds stored in savepoint is the root cause to make we unable changing subscriptions.


> Let PubSub source support changing subscriptions
> ------------------------------------------------
>
>                 Key: FLINK-24587
>                 URL: https://issues.apache.org/jira/browse/FLINK-24587
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Google Cloud PubSub
>    Affects Versions: 1.12.2
>            Reporter: Shiao-An Yuan
>            Priority: Major
>
> Original post on user mailing list: [link|https://lists.apache.org/thread.html/ra3047e5105fccbea42de6c37d52d05b492af496bd0bb95cc534630de%40%3Cuser.flink.apache.org%3E]
> After resuming a Flink application from a snapshot with a *new subscription*, I got following errors repeatedly.
>  
> {code:java}
> org.apache.flink.util.SerializedThrowable: INVALID_ARGUMENT: You have
> passed a subscription that does not belong to the given ack ID
> (resource=projects/xxxxx/subscriptions/xxxx).
>         at
> io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:244)
> ~[?:?]
>         at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:225)
> ~[?:?]
>         at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142)
> ~[?:?]
>         at
> com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.acknowledge(SubscriberGrpc.java:1628)
> ~[?:?]
>         at
> org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.acknowledge(BlockingGrpcPubSubSubscriber.java:99)
> ~[?:?]
>         at
> org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint.notifyCheckpointComplete(AcknowledgeOnCheckpoint.java:84)
> ~[?:?]
>         at
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.notifyCheckpointComplete(PubSubSource.java:208)
> ~[?:?]
>         at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>         at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:319)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1083)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1048)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1071)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>         at java.lang.Thread.run(Thread.java:834) ~[?:?]
> {code}
>  
> As I see it, the [AckId|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java#L149] became invalid as long as we change to another subscription.
> I also noticed an interesting thing. The process of doing a checkpoint/savepoint is as follow:
>  # output a checkpoint/savepoint which contains non-acknowledged message's ackIds
>  # If the checkpoint/savepoint success, do the ack ([src|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java#L84])
>  # remove those ackIds from state ([src|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java#L87])
> If we resume a job from a snapshot, those acknowledged ackIds (removed in step 3) still exist in the savepoint (created in step 1), so it will do the ack again when the next checkpoint complete.
> In my opinion, these ackIds stored in savepoint is the root cause to make we unable changing subscriptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)