You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Daniel Lindeman (Jira)" <ji...@apache.org> on 2022/03/31 19:20:00 UTC

[jira] [Created] (BEAM-14223) PubsubLiteIO makes excessive calls to GCP google.cloud.pubsublite.v1.AdminService.GetTopicPartitions

Daniel Lindeman created BEAM-14223:
--------------------------------------

             Summary: PubsubLiteIO makes excessive calls to GCP google.cloud.pubsublite.v1.AdminService.GetTopicPartitions
                 Key: BEAM-14223
                 URL: https://issues.apache.org/jira/browse/BEAM-14223
             Project: Beam
          Issue Type: Bug
          Components: io-java-gcp
            Reporter: Daniel Lindeman


As per [https://the-asf.slack.com/archives/C9H0YNP3P/p1648752819277569?thread_ts=1648749667.880749&cid=C9H0YNP3P] [~ibzib], tagging [~dpcollins-google] 

When using PubsubLiteIO.read, the GCP admin client makes excessive calls to google.cloud.pubsublite.v1.AdminService.GetTopicPartitions. In a pipeline that ran for 1 minute and 55 seconds this call is made over 1000 times which results in the quota for this API being reached – resulting in job failure.

I looked into the history of the module, and I noticed that partition settings were exposed before 2.34.0, but have been removed. [https://github.com/apache/beam/commit/8a646aaa95e79a3f33dff204a659c8a221069ffe#diff-9828d5eb9f2fc844e12c0ebc87b3ffe12d7c4db5d9284a34b979598cf8fc6313R129]

!image-2022-03-31-15-11-09-570.png!


To replicate:
```

Pipeline p = Pipeline.create(options);
SubscriberOptions opts = SubscriberOptions.newBuilder()
.setSubscriptionPath(SubscriptionPath.parse("projects/<project-id>/locations/us-west1-a/subscriptions/<my-subscription>"))
.build();
p.apply(PubsubLiteIO.read(opts));
```



Logs:
```
2022-03-29 15:31:02
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
    at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.ExecutionException: org.apache.beam.sdk.util.UserCodeException: com.google.api.gax.rpc.ResourceExhaustedException: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Read-only administrator operations' and limit 'Read-only administrator operations per minute per region' of service 'pubsublite.googleapis.com' for consumer 'project_number:<THE PROJECT NUMBER REDACTED>'.
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:168)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:131)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:453)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:694)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:645)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.beam.sdk.util.UserCodeException: com.google.api.gax.rpc.ResourceExhaustedException: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Read-only administrator operations' and limit 'Read-only administrator operations per minute per region' of service 'pubsublite.googleapis.com' for consumer 'project_number:<THE PROJECT NUMBER REDACTED>'.
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
    at org.apache.beam.sdk.transforms.Watch$WatchGrowthFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:123)
    at org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:523)
    at org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
    at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
    at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
    at org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.fireTimer(SplittableDoFnOperator.java:174)
    at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1001)
    at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.processPendingProcessingTimeTimers(DoFnOperator.java:1356)
    at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.close(DoFnOperator.java:587)
    at org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.close(SplittableDoFnOperator.java:182)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:213)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:210)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:185)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:162)
    ... 9 more
Caused by: com.google.api.gax.rpc.ResourceExhaustedException: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Read-only administrator operations' and limit 'Read-only administrator operations per minute per region' of service 'pubsublite.googleapis.com' for consumer 'project_number:<THE PROJECT NUMBER REDACTED>'.
    at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:57)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
    at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
    at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
    at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1074)
    at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1213)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:983)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:771)
    at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:563)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
    at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463)
    at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427)
    at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:553)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:739)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:718)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Caused by: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Read-only administrator operations' and limit 'Read-only administrator operations per minute per region' of service 'pubsublite.googleapis.com' for consumer 'project_number:<THE PROJECT NUMBER REDACTED>'.
    at io.grpc.Status.asRuntimeException(Status.java:535)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
    at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463)
    at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427)
    at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:553)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:739)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:718)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

```

 

Flink version: 1.13.5
Beam version: 2.34.0



--
This message was sent by Atlassian Jira
(v8.20.1#820001)