You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Natan HP (Jira)" <ji...@apache.org> on 2022/04/25 11:02:00 UTC

[jira] [Created] (FLINK-27395) IllegalStateException: Could not find policy 'pick_first'. on Flink Application

Natan HP created FLINK-27395:
--------------------------------

             Summary: IllegalStateException: Could not find policy 'pick_first'. on Flink Application
                 Key: FLINK-27395
                 URL: https://issues.apache.org/jira/browse/FLINK-27395
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Google Cloud PubSub
    Affects Versions: 1.14.4, 1.14.2
         Environment: # Minikube

{noformat}
➜ minikube version
minikube version: v1.25.2
commit: 362d5fdc0a3dbee389b3d3f1034e8023e72bd3a7
{noformat}

 # Apache Flink Docker Image

{noformat}
apache/flink:1.14.4-scala_2.11{noformat}
            Reporter: Natan HP


I got this exception on flink taskmanager, but I can see that the data is successfully published in the pub sub. Here is the log:

 
{noformat}
2022-04-25 07:53:44,293 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched from INITIALIZING to RUNNING.
Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 uncaughtException
SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the SynchronizationContext. Panic!
java.lang.IllegalStateException: Could not find policy 'pick_first'. Make sure its implementation is either registered to LoadBalancerRegistry or included in META-INF/services/io.grpc.LoadBalancerProvider from your jar files.
    at io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.<init>(AutoConfiguredLoadBalancerFactory.java:94)
    at io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65)
    at io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375)
    at io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469)
    at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
    at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
    at io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473)
    at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253)
    at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210)
    at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32)
    at com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94)
    at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314)
    at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288)
    at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200)
    at com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58)
    at com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65)
    at com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64)
    at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86)
    at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63)
    at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41)
    at com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82)
    at com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79)
    at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126)
    at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87)
    at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425)
    at com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471)
    at com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399)
    at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88)
    at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
{noformat}
 

 

The code sample:
{code:java}
SinkFunction<String> pubsubSink = PubSubSink.newBuilder() .withSerializationSchema((SerializationSchema<String>) s -> s.getBytes(StandardCharsets.UTF_8)) .withProjectName("<project-name>") .withTopicName("<topic-name>")
.build(); dataStream.addSink(pubsubSink) .name("Pub-sub-sink");  {code}
 

I use Maven Assembly Plugin to create the uber JAR:
{noformat}
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>2.6</version>
    <configuration>
        <archive>
            <manifest>
                <mainClass>org.example.flink.Main</mainClass>
            </manifest>
        </archive>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
    <executions>
        <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
</plugin>{noformat}
 

The content of the JAR:
{noformat}
➜ jar tf MyApp.jar | grep io.grpc.LoadBalancerProvider       
io/grpc/LoadBalancerProvider$UnknownConfig.class
META-INF/services/io.grpc.LoadBalancerProvider
io/grpc/LoadBalancerProvider.class

➜ jar tf MyApp.jar | grep io.grpc.NameResolverProvider 
io/grpc/NameResolverProvider.class
META-INF/services/io.grpc.NameResolverProvider
{noformat}
 

What I've tried to solve this:
 # Downgrading version to 1.14.2
{noformat}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-gcp-pubsub_2.12</artifactId>
    <version>1.14.2</version>
</dependency>{noformat}

 # Using maven shade plugin (along side maven assembly plugin) with the following config as suggedted in [here|[https://github.com/googleapis/google-cloud-java/issues/4700#issuecomment-474739796]:]

{noformat}
<transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
  <resource>META-INF/services</resource>
  <file>io.grpc.LoadBalancerProvider</file>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
  <resource>META-INF/services</resource>
  <file>io.grpc.NameResolverProvider</file>
</transformer>{noformat}
3. Creating files inside META-INF/services as suggested in [here|https://github.com/googleapis/google-cloud-java/issues/4700#issuecomment-477658832]:

{noformat}
Data-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services on  master [⇡!+?] 
➜ ls
io.grpc.LoadBalancerProvider  io.grpc.NameResolverProviderData-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services on  master [⇡!+?] 
➜ cat io.grpc.LoadBalancerProvider
io.grpc.internal.PickFirstLoadBalancerProvider

Data-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services on  master [⇡!+?] 
➜ cat io.grpc.NameResolverProvider
io.grpc.internal.DnsNameResolverProvider              
{noformat}
 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)