You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Filip Karnicki <fi...@gmail.com> on 2021/02/10 15:46:35 UTC

statefun: Unable to find a source translation for ingress

Hi, I modified the Stateful Functions 2.2.0 asyc example to include a real
binding to kafka, I included statefun-flink-distribution and
stateful-kafka-io in the pom and I created a fat jar using the
maven-assembly-plugin,

and my flink cluster complains about:

java.lang.IllegalStateException: Unable to find a source translation for
ingress of type IngressType(statefun.kafka.io, universal-ingress), which is
bound for key IngressIdentifier(org.apache.flink.statefun.examples.async,
tasks, class
org.apache.flink.statefun.examples.async.events.TaskStartedEvent)
org.apache.flink.statefun.flink.core.translation.IngressToSourceFunctionTranslator.sourceFromSpec(IngressToSourceFunctionTranslator.java:45)
org.apache.flink.statefun.flink.core.common.Maps.transformValues(Maps.java:54)
org.apache.flink.statefun.flink.core.translation.IngressToSourceFunctionTranslator.translate(IngressToSourceFunctionTranslator.java:37)
org.apache.flink.statefun.flink.core.translation.Sources.ingressToSourceFunction(Sources.java:117)
org.apache.flink.statefun.flink.core.translation.Sources.create(Sources.java:52)
org.apache.flink.statefun.flink.core.translation.FlinkUniverse.configure(FlinkUniverse.java:44)
org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:74)
org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:47)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:566)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 12 more


Does anyone have any idea why this wouldn't work on a cluster, yet is
completely fine when I'm using the test harness with a real kafka?

Many thanks
Fil

Re: statefun: Unable to find a source translation for ingress

Posted by Igal Shilman <ig...@ververica.com>.
Hello,

I believe that your assembly plugin configuration doesn't merge files under
META-INF/services.
Can you unzip your jar and examin manually the contents of:
META-INF/services/org.apache.flink.statefun.flink.io.spi.FlinkIoModule

It should include at least the following lines:
org.apache.flink.statefun.flink.io.datastream.SourceSinkModule
org.apache.flink.statefun.flink.io.kafka.KafkaFlinkIoModule
org.apache.flink.statefun.flink.io.kinesis.KinesisFlinkIOModule

If you are okay with using an alternative plugin, take a look at the
maven-shade-plugin, and how we
use it to obtain this task [1].

Side note 1: If you can, please use statefun 2.2.2 instead of 2.2.0, as it
fixed an important bug.
Side note 2: If you _must_ submit your statefun job to an existing cluster,
then consider using the DataStream integration API [2]

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-distribution/pom.xml#L178,L180
[2]
https://github.com/apache/flink-statefun/blob/d1744eaa888a530edf102396675dfa4377489560/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java#L45

Good luck!
Igal.



On Wed, Feb 10, 2021 at 4:47 PM Filip Karnicki <fi...@gmail.com>
wrote:

> Hi, I modified the Stateful Functions 2.2.0 asyc example to include a real
> binding to kafka, I included statefun-flink-distribution and
> stateful-kafka-io in the pom and I created a fat jar using the
> maven-assembly-plugin,
>
> and my flink cluster complains about:
>
> java.lang.IllegalStateException: Unable to find a source translation for
> ingress of type IngressType(statefun.kafka.io, universal-ingress), which
> is bound for key
> IngressIdentifier(org.apache.flink.statefun.examples.async, tasks, class
> org.apache.flink.statefun.examples.async.events.TaskStartedEvent)
>
> org.apache.flink.statefun.flink.core.translation.IngressToSourceFunctionTranslator.sourceFromSpec(IngressToSourceFunctionTranslator.java:45)
>
> org.apache.flink.statefun.flink.core.common.Maps.transformValues(Maps.java:54)
>
> org.apache.flink.statefun.flink.core.translation.IngressToSourceFunctionTranslator.translate(IngressToSourceFunctionTranslator.java:37)
>
> org.apache.flink.statefun.flink.core.translation.Sources.ingressToSourceFunction(Sources.java:117)
>
> org.apache.flink.statefun.flink.core.translation.Sources.create(Sources.java:52)
>
> org.apache.flink.statefun.flink.core.translation.FlinkUniverse.configure(FlinkUniverse.java:44)
>
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:74)
>
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:47)
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.base/java.lang.reflect.Method.invoke(Method.java:566)
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 12 more
>
>
> Does anyone have any idea why this wouldn't work on a cluster, yet is
> completely fine when I'm using the test harness with a real kafka?
>
> Many thanks
> Fil
>