You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (JIRA)" <ji...@apache.org> on 2019/05/08 06:52:00 UTC

[jira] [Commented] (FLINK-12376) GCS runtime exn: Request payload size exceeds the limit

    [ https://issues.apache.org/jira/browse/FLINK-12376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835366#comment-16835366 ] 

Stephan Ewen commented on FLINK-12376:
--------------------------------------

This is actually not a GCS / checkpoint issue, it is an issue in the source (probably PubSub connector?)

The checkpoint completes, then Flink notifies the source that the checkpoint is complete and the source task acks some IDs back. That ack message is too large for the PubSub client's RPC service.

I think we need to rethink how the PubSub source works. Seems that keeping the IDS and acknowledging a large number or records is not feasible in a stable way.

I am not a PubSub expert, but is there a way to keep something like a sequence number (or vector of sequence numbers), similar to Kafka's offsets? 

> GCS runtime exn: Request payload size exceeds the limit
> -------------------------------------------------------
>
>                 Key: FLINK-12376
>                 URL: https://issues.apache.org/jira/browse/FLINK-12376
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystems
>    Affects Versions: 1.7.2
>         Environment: FROM flink:1.8.0-scala_2.11
> ARG version=0.17
> ADD https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar /opt/flink/lib
> COPY target/analytics-${version}.jar /opt/flink/lib/analytics.jar
>            Reporter: Henrik
>            Priority: Major
>         Attachments: Screenshot 2019-04-30 at 22.32.34.png
>
>
> I'm trying to use the google cloud storage file system, but it would seem that the FLINK / GCS client libs are creating too-large requests far down in the GCS Java client.
> The Java client is added to the lib folder with this command in Dockerfile (probably [hadoop2-1.9.16|https://search.maven.org/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop2-1.9.16/jar] at the time of writing):
>  
> {code:java}
> ADD https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar /opt/flink/lib{code}
> This is the crash output. Focus lines:
> {code:java}
> java.lang.RuntimeException: Error while confirming checkpoint{code}
> and
> {code:java}
>  Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size exceeds the limit: 524288 bytes.{code}
> Full stacktrace:
>  
> {code:java}
> [analytics-867c867ff6-l622h taskmanager] 2019-04-30 20:23:14,532 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Process -> Timestamps/Watermarks -> app_events (1/1) (9a01e96c0271025d5ba73b735847cd4c) switched from RUNNING to FAILED.
> [analytics-867c867ff6-l622h taskmanager] java.lang.RuntimeException: Error while confirming checkpoint
> [analytics-867c867ff6-l622h taskmanager]     at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1211)
> [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [analytics-867c867ff6-l622h taskmanager]     at java.lang.Thread.run(Thread.java:748)
> [analytics-867c867ff6-l622h taskmanager] Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size exceeds the limit: 524288 bytes.
> [analytics-867c867ff6-l622h taskmanager]     at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:49)
> [analytics-867c867ff6-l622h taskmanager]     at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
> [analytics-867c867ff6-l622h taskmanager]     at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
> [analytics-867c867ff6-l622h taskmanager]     at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
> [analytics-867c867ff6-l622h taskmanager]     at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
> [analytics-867c867ff6-l622h taskmanager]     at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1056)
> [analytics-867c867ff6-l622h taskmanager]     at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
> [analytics-867c867ff6-l622h taskmanager]     at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1138)
> [analytics-867c867ff6-l622h taskmanager]     at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:958)
> [analytics-867c867ff6-l622h taskmanager]     at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:748)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:507)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:482)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:694)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:397)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:584)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [analytics-867c867ff6-l622h taskmanager]     ... 3 more
> [analytics-867c867ff6-l622h taskmanager]     Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
> [analytics-867c867ff6-l622h taskmanager]         at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
> [analytics-867c867ff6-l622h taskmanager]         at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
> [analytics-867c867ff6-l622h taskmanager]         at okr.sources.PubSubSource.acknowledgeSessionIDs(PubSubSource.java:122)
> [analytics-867c867ff6-l622h taskmanager]         at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.acknowledgeIDs(MultipleIdsMessageAcknowledgingSourceBase.java:122)
> [analytics-867c867ff6-l622h taskmanager]         at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.notifyCheckpointComplete(MessageAcknowledgingSourceBase.java:231)
> [analytics-867c867ff6-l622h taskmanager]         at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
> [analytics-867c867ff6-l622h taskmanager]         at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
> [analytics-867c867ff6-l622h taskmanager]         at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1206)
> [analytics-867c867ff6-l622h taskmanager]         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [analytics-867c867ff6-l622h taskmanager]         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [analytics-867c867ff6-l622h taskmanager]         ... 3 more
> {code}
> The file system is configured as such in `conf/flink-conf.yaml`:
>  
> {code:java}
> state.backend: rocksdb
> state.checkpoints.num-retained: 3
> state.checkpoints.dir: gs://example_bucket/flink/checkpoints
> state.savepoints.dir: gs://example_bucket/flink/savepoints
> state.backend.incremental: true
> {code}
> ...and the checkpoints that are created before the crash are small in size:
>  
> !Screenshot 2019-04-30 at 22.32.34.png! I'll be testing with Flink 1.8.0 as well.
> The pom.xml config:
> {code:java}
> <!-- https://stackoverflow.com/questions/51860988/flink-checkpoints-to-google-cloud-storage -->
> <!-- https://search.maven.org/search?q=a:flink-statebackend-rocksdb_2.11 -->
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
>   <version>${flink.version}</version>
> </dependency>
> <!-- https://search.maven.org/search?q=g:com.google.cloud.bigdataoss -->
> <!-- https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/pubsub/README.md -->
> <!-- Cloud Storage: -->
> <dependency>
>   <groupId>com.google.cloud.bigdataoss</groupId>
>   <artifactId>gcs-connector</artifactId>
>   <version>hadoop2-1.9.16</version>
> </dependency>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-connector-filesystem_2.11</artifactId>
>   <version>${flink.version}</version>
> </dependency>
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)