You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Shiao-An Yuan (Jira)" <ji...@apache.org> on 2021/10/19 07:42:00 UTC

[jira] [Created] (FLINK-24587) Let PubSub source support changing subscriptions

Shiao-An Yuan created FLINK-24587:
-------------------------------------

             Summary: Let PubSub source support changing subscriptions
                 Key: FLINK-24587
                 URL: https://issues.apache.org/jira/browse/FLINK-24587
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Google Cloud PubSub
    Affects Versions: 1.12.2
            Reporter: Shiao-An Yuan


Original post on user mailing list: [link|https://lists.apache.org/thread.html/ra3047e5105fccbea42de6c37d52d05b492af496bd0bb95cc534630de%40%3Cuser.flink.apache.org%3E]

After resuming a Flink application from a snapshot with a *new subscription*, I got following errors repeatedly.

 
{code:java}
org.apache.flink.util.SerializedThrowable: INVALID_ARGUMENT: You have
passed a subscription that does not belong to the given ack ID
(resource=projects/xxxxx/subscriptions/xxxx).
        at
io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:244)
~[?:?]
        at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:225)
~[?:?]
        at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142)
~[?:?]
        at
com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.acknowledge(SubscriberGrpc.java:1628)
~[?:?]
        at
org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.acknowledge(BlockingGrpcPubSubSubscriber.java:99)
~[?:?]
        at
org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint.notifyCheckpointComplete(AcknowledgeOnCheckpoint.java:84)
~[?:?]
        at
org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.notifyCheckpointComplete(PubSubSource.java:208)
~[?:?]
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:319)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1083)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1048)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1071)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
{code}
 

As I see it, the [AckId|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java#L149] became invalid as long as we change to another subscription.

I also noticed an interesting thing. The process of doing a checkpoint/savepoint is as follow:
 # output a checkpoint/savepoint which contains non-acknowledged message's ackIds
 # If the checkpoint/savepoint success, do the ack ([s|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java#L84]rc)
 # remove those ackIds from state ([src|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java#L87])

If we resume a job from a snapshot, those acknowledged ackIds (removed in step 3) still exist in the savepoint (created in step 1), so it will do the ack again when the next checkpoint complete.

In my opinion, these ackIds stored in savepoint is the root cause to make we unable changing subscriptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)