You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Barry Higgins <ba...@gmail.com> on 2021/09/09 15:21:53 UTC

DataStreamAPI and Stateful functions

Hi, 
I'm looing at using the DataStream API from a Flink application against a remote python stateful function deployed on another machine. I would like to investigate how feasible it is to have all of the state management being handled from the calling side meaning that we don't need another installation of Flink to manage the stateful functions.

Unfortunately the example referenced in the documentation: https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/sdk/flink-datastream/

is no longer in existence:
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java

There is an older version that is available here:
https://github.com/apache/flink-statefun/tree/release-2.2/statefun-examples/statefun-flink-datastream-example

and I have tried to work with this without much success

The calling element of the code looks as follows:

   StatefulFunctionEgressStreams out =
        StatefulFunctionDataStreamBuilder.builder("example")
            .withDataStreamAsIngress(names)
            .withFunctionProvider(GREET, unused -> new MyFunction())
            .withRequestReplyRemoteFunction(
                requestReplyFunctionBuilder(
                        REMOTE_GREET, URI.create("http://localhost:5000/statefun"))
                    .withPersistedState("seen_count")
                    .withMaxRequestDuration(Duration.ofSeconds(15))
                    .withMaxNumBatchRequests(500))
            .withEgressId(GREETINGS)
            .withConfiguration(statefunConfig)
            .build(env); 
			
with a reference to a FunctionProvider that exists as an inner class in the same class. We would like this to be a remote call, where I guess I would replace http://localhost:5000/statefun with the remote address of the SF.
However when I do make such a change the code is still referring to the inner function and any changes to the local MyFunction class are returned regardless of what is deployed remotely.

If anyone has a working example of how to interact via DataStreams with a remotely deployed SF, I would be very grateful. I would be very happy to update the documentation if I can get this working.
Cheers,
Barry


Re: DataStreamAPI and Stateful functions

Posted by Barry Higgins <ba...@gmail.com>.
Thanks Igal, 
I appreciate you coming back to me. I have quickly tried the fat jar solution as you've gone through it and am running into an exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'StatefulFunctions'.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'StatefulFunctions'.
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
        at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
        at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
        at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:84)
        at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        ... 8 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Not found.]
        at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:486)
        at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)

I will keep looking at it as I'm sure it's something stupid but unfortunately I will be away next week and might not get it working before I leave.
Thanks,
Barry

On 2021/09/10 12:37:32, Igal Shilman <ig...@apache.org> wrote: 
> Hello Barry,
> 
> I assume that by "we don't need another installation of Flink to manage the
> stateful functions." You mean that you already have a running Flink cluster
> and you would like to submit an additional Flink Job that executes a
> Stateful functions application?
> 
> Then perhaps just try to submit [1] to the flink cluster. In addition you
> would have to make the module.yaml available in the class path.
> You can simply place it under your flink distribution /lib directory, or
> alternatively:
> 1. create a jar-with-dependencies (uber jar / fat jar) that has [1] as a
> dependency,
> 2. Add  the module.yaml definition under src/resources.
> 3. Make sure that the main class will
> be org.apache.flink.statefun.flink.core.StatefulFunctionsJob
> You should be able to submit it either the web interface, or by running:
> 
> ./bin/flink run -c
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob
> ./statefun-example.jar
> 
> If this approach doesn't work for you, let me know and we will figure out
> the DataStream integration approach.
> 
> All the best,
> Igal.
> 
> [1]
> https://mvnrepository.com/artifact/org.apache.flink/statefun-flink-distribution/3.1.0
> 
> 
> 
> On Thu, Sep 9, 2021 at 5:22 PM Barry Higgins <ba...@gmail.com>
> wrote:
> 
> > Hi,
> > I'm looing at using the DataStream API from a Flink application against a
> > remote python stateful function deployed on another machine. I would like
> > to investigate how feasible it is to have all of the state management being
> > handled from the calling side meaning that we don't need another
> > installation of Flink to manage the stateful functions.
> >
> > Unfortunately the example referenced in the documentation:
> > https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/sdk/flink-datastream/
> >
> > is no longer in existence:
> >
> > https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
> >
> > There is an older version that is available here:
> >
> > https://github.com/apache/flink-statefun/tree/release-2.2/statefun-examples/statefun-flink-datastream-example
> >
> > and I have tried to work with this without much success
> >
> > The calling element of the code looks as follows:
> >
> >    StatefulFunctionEgressStreams out =
> >         StatefulFunctionDataStreamBuilder.builder("example")
> >             .withDataStreamAsIngress(names)
> >             .withFunctionProvider(GREET, unused -> new MyFunction())
> >             .withRequestReplyRemoteFunction(
> >                 requestReplyFunctionBuilder(
> >                         REMOTE_GREET, URI.create("
> > http://localhost:5000/statefun"))
> >                     .withPersistedState("seen_count")
> >                     .withMaxRequestDuration(Duration.ofSeconds(15))
> >                     .withMaxNumBatchRequests(500))
> >             .withEgressId(GREETINGS)
> >             .withConfiguration(statefunConfig)
> >             .build(env);
> >
> > with a reference to a FunctionProvider that exists as an inner class in
> > the same class. We would like this to be a remote call, where I guess I
> > would replace http://localhost:5000/statefun with the remote address of
> > the SF.
> > However when I do make such a change the code is still referring to the
> > inner function and any changes to the local MyFunction class are returned
> > regardless of what is deployed remotely.
> >
> > If anyone has a working example of how to interact via DataStreams with a
> > remotely deployed SF, I would be very grateful. I would be very happy to
> > update the documentation if I can get this working.
> > Cheers,
> > Barry
> >
> >
> 

Re: DataStreamAPI and Stateful functions

Posted by Igal Shilman <ig...@apache.org>.
No worries!
Glad everything worked out!

Cheers,
Igal

On Thu, Sep 23, 2021 at 2:42 PM Barry Higgins <ba...@gmail.com>
wrote:

> Hi Igal,
> Apologies you are correct. I had my wires crossed. I had been trying to
> get everything working through my local ide before I deployed to our
> ververica cluster.
> I was only able to get the code running through IntelliJ by following the
> steps below. Once I reverted the hack and changed the config on our
> cluster, everything worked perfectly.
> Sorry for the confusion and thanks for all your help.
> Barry
>
> On 2021/09/23 10:57:36, Igal Shilman <ig...@apache.org> wrote:
> > Hi Barry!
> > Glad to hear that it works for you!
> >
> > I just didn't understand:
> > a) what is "flink.yaml" perhaps you are referring to "flink-conf.yaml"?
> > b) why is it bundled with the distribution jar? I couldn't find it there
> > (nor it should be there)
> > I've looked manually by:
> > jar tf statefun-flink-distribution-3.1.0.jar | grep "\.yaml" couldn't
> find
> > it there.
> >
> > Generally flink-conf.yaml should be part of your Flink runtime. For
> example
> > a file at /opt/flink/conf/flink-conf.yaml
> >
> > Thanks,
> > Igal.
> >
> >
> >
> > On Thu, Sep 23, 2021 at 11:22 AM Barry Higgins <
> barry.p.higgins1@gmail.com>
> > wrote:
> >
> > > Hi Igal,
> > > I just wanted to give you an update on my deployment of stateful
> functions
> > > to an existing Flink cluster.
> > > The good news is that it works now when I submit my config with the
> > > statefun-flink-distribution. Thank you very much for your help.
> > > There was one gotcha and that was down to the requirement to update the
> > > flink.yaml to include:
> > > classloader.parent-first-patterns.additional:
> > > org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
> > >
> > > As the flink.yaml is bundled in the newly created fat jar, there is no
> > > easy way to edit that. My hacky solution was to code around that and
> create
> > > a new entrypoint class, which replicated the code in
> > > org.apache.flink.statefun.flink.core.StatefulFunctionsJob without the
> > > validation code that was stopping my deployment.
> > > It may be easier if the flink.yaml in the statefun distribution
> dependency
> > > were shipped with the required value in it by default?
> > > Thanks a million,
> > > Barry
> > >
> > > On 2021/09/10 12:37:32, Igal Shilman <ig...@apache.org> wrote:
> > > > Hello Barry,
> > > >
> > > > I assume that by "we don't need another installation of Flink to
> manage
> > > the
> > > > stateful functions." You mean that you already have a running Flink
> > > cluster
> > > > and you would like to submit an additional Flink Job that executes a
> > > > Stateful functions application?
> > > >
> > > > Then perhaps just try to submit [1] to the flink cluster. In
> addition you
> > > > would have to make the module.yaml available in the class path.
> > > > You can simply place it under your flink distribution /lib
> directory, or
> > > > alternatively:
> > > > 1. create a jar-with-dependencies (uber jar / fat jar) that has [1]
> as a
> > > > dependency,
> > > > 2. Add  the module.yaml definition under src/resources.
> > > > 3. Make sure that the main class will
> > > > be org.apache.flink.statefun.flink.core.StatefulFunctionsJob
> > > > You should be able to submit it either the web interface, or by
> running:
> > > >
> > > > ./bin/flink run -c
> > > > org.apache.flink.statefun.flink.core.StatefulFunctionsJob
> > > > ./statefun-example.jar
> > > >
> > > > If this approach doesn't work for you, let me know and we will
> figure out
> > > > the DataStream integration approach.
> > > >
> > > > All the best,
> > > > Igal.
> > > >
> > > > [1]
> > > >
> > >
> https://mvnrepository.com/artifact/org.apache.flink/statefun-flink-distribution/3.1.0
> > > >
> > > >
> > > >
> > > > On Thu, Sep 9, 2021 at 5:22 PM Barry Higgins <
> barry.p.higgins1@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > I'm looing at using the DataStream API from a Flink application
> > > against a
> > > > > remote python stateful function deployed on another machine. I
> would
> > > like
> > > > > to investigate how feasible it is to have all of the state
> management
> > > being
> > > > > handled from the calling side meaning that we don't need another
> > > > > installation of Flink to manage the stateful functions.
> > > > >
> > > > > Unfortunately the example referenced in the documentation:
> > > > >
> > >
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/sdk/flink-datastream/
> > > > >
> > > > > is no longer in existence:
> > > > >
> > > > >
> > >
> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
> > > > >
> > > > > There is an older version that is available here:
> > > > >
> > > > >
> > >
> https://github.com/apache/flink-statefun/tree/release-2.2/statefun-examples/statefun-flink-datastream-example
> > > > >
> > > > > and I have tried to work with this without much success
> > > > >
> > > > > The calling element of the code looks as follows:
> > > > >
> > > > >    StatefulFunctionEgressStreams out =
> > > > >         StatefulFunctionDataStreamBuilder.builder("example")
> > > > >             .withDataStreamAsIngress(names)
> > > > >             .withFunctionProvider(GREET, unused -> new
> MyFunction())
> > > > >             .withRequestReplyRemoteFunction(
> > > > >                 requestReplyFunctionBuilder(
> > > > >                         REMOTE_GREET, URI.create("
> > > > > http://localhost:5000/statefun"))
> > > > >                     .withPersistedState("seen_count")
> > > > >                     .withMaxRequestDuration(Duration.ofSeconds(15))
> > > > >                     .withMaxNumBatchRequests(500))
> > > > >             .withEgressId(GREETINGS)
> > > > >             .withConfiguration(statefunConfig)
> > > > >             .build(env);
> > > > >
> > > > > with a reference to a FunctionProvider that exists as an inner
> class in
> > > > > the same class. We would like this to be a remote call, where I
> guess I
> > > > > would replace http://localhost:5000/statefun with the remote
> address
> > > of
> > > > > the SF.
> > > > > However when I do make such a change the code is still referring
> to the
> > > > > inner function and any changes to the local MyFunction class are
> > > returned
> > > > > regardless of what is deployed remotely.
> > > > >
> > > > > If anyone has a working example of how to interact via DataStreams
> > > with a
> > > > > remotely deployed SF, I would be very grateful. I would be very
> happy
> > > to
> > > > > update the documentation if I can get this working.
> > > > > Cheers,
> > > > > Barry
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: DataStreamAPI and Stateful functions

Posted by Barry Higgins <ba...@gmail.com>.
Hi Igal,
Apologies you are correct. I had my wires crossed. I had been trying to get everything working through my local ide before I deployed to our ververica cluster.
I was only able to get the code running through IntelliJ by following the steps below. Once I reverted the hack and changed the config on our cluster, everything worked perfectly.
Sorry for the confusion and thanks for all your help.
Barry

On 2021/09/23 10:57:36, Igal Shilman <ig...@apache.org> wrote: 
> Hi Barry!
> Glad to hear that it works for you!
> 
> I just didn't understand:
> a) what is "flink.yaml" perhaps you are referring to "flink-conf.yaml"?
> b) why is it bundled with the distribution jar? I couldn't find it there
> (nor it should be there)
> I've looked manually by:
> jar tf statefun-flink-distribution-3.1.0.jar | grep "\.yaml" couldn't find
> it there.
> 
> Generally flink-conf.yaml should be part of your Flink runtime. For example
> a file at /opt/flink/conf/flink-conf.yaml
> 
> Thanks,
> Igal.
> 
> 
> 
> On Thu, Sep 23, 2021 at 11:22 AM Barry Higgins <ba...@gmail.com>
> wrote:
> 
> > Hi Igal,
> > I just wanted to give you an update on my deployment of stateful functions
> > to an existing Flink cluster.
> > The good news is that it works now when I submit my config with the
> > statefun-flink-distribution. Thank you very much for your help.
> > There was one gotcha and that was down to the requirement to update the
> > flink.yaml to include:
> > classloader.parent-first-patterns.additional:
> > org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
> >
> > As the flink.yaml is bundled in the newly created fat jar, there is no
> > easy way to edit that. My hacky solution was to code around that and create
> > a new entrypoint class, which replicated the code in
> > org.apache.flink.statefun.flink.core.StatefulFunctionsJob without the
> > validation code that was stopping my deployment.
> > It may be easier if the flink.yaml in the statefun distribution dependency
> > were shipped with the required value in it by default?
> > Thanks a million,
> > Barry
> >
> > On 2021/09/10 12:37:32, Igal Shilman <ig...@apache.org> wrote:
> > > Hello Barry,
> > >
> > > I assume that by "we don't need another installation of Flink to manage
> > the
> > > stateful functions." You mean that you already have a running Flink
> > cluster
> > > and you would like to submit an additional Flink Job that executes a
> > > Stateful functions application?
> > >
> > > Then perhaps just try to submit [1] to the flink cluster. In addition you
> > > would have to make the module.yaml available in the class path.
> > > You can simply place it under your flink distribution /lib directory, or
> > > alternatively:
> > > 1. create a jar-with-dependencies (uber jar / fat jar) that has [1] as a
> > > dependency,
> > > 2. Add  the module.yaml definition under src/resources.
> > > 3. Make sure that the main class will
> > > be org.apache.flink.statefun.flink.core.StatefulFunctionsJob
> > > You should be able to submit it either the web interface, or by running:
> > >
> > > ./bin/flink run -c
> > > org.apache.flink.statefun.flink.core.StatefulFunctionsJob
> > > ./statefun-example.jar
> > >
> > > If this approach doesn't work for you, let me know and we will figure out
> > > the DataStream integration approach.
> > >
> > > All the best,
> > > Igal.
> > >
> > > [1]
> > >
> > https://mvnrepository.com/artifact/org.apache.flink/statefun-flink-distribution/3.1.0
> > >
> > >
> > >
> > > On Thu, Sep 9, 2021 at 5:22 PM Barry Higgins <barry.p.higgins1@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi,
> > > > I'm looing at using the DataStream API from a Flink application
> > against a
> > > > remote python stateful function deployed on another machine. I would
> > like
> > > > to investigate how feasible it is to have all of the state management
> > being
> > > > handled from the calling side meaning that we don't need another
> > > > installation of Flink to manage the stateful functions.
> > > >
> > > > Unfortunately the example referenced in the documentation:
> > > >
> > https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/sdk/flink-datastream/
> > > >
> > > > is no longer in existence:
> > > >
> > > >
> > https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
> > > >
> > > > There is an older version that is available here:
> > > >
> > > >
> > https://github.com/apache/flink-statefun/tree/release-2.2/statefun-examples/statefun-flink-datastream-example
> > > >
> > > > and I have tried to work with this without much success
> > > >
> > > > The calling element of the code looks as follows:
> > > >
> > > >    StatefulFunctionEgressStreams out =
> > > >         StatefulFunctionDataStreamBuilder.builder("example")
> > > >             .withDataStreamAsIngress(names)
> > > >             .withFunctionProvider(GREET, unused -> new MyFunction())
> > > >             .withRequestReplyRemoteFunction(
> > > >                 requestReplyFunctionBuilder(
> > > >                         REMOTE_GREET, URI.create("
> > > > http://localhost:5000/statefun"))
> > > >                     .withPersistedState("seen_count")
> > > >                     .withMaxRequestDuration(Duration.ofSeconds(15))
> > > >                     .withMaxNumBatchRequests(500))
> > > >             .withEgressId(GREETINGS)
> > > >             .withConfiguration(statefunConfig)
> > > >             .build(env);
> > > >
> > > > with a reference to a FunctionProvider that exists as an inner class in
> > > > the same class. We would like this to be a remote call, where I guess I
> > > > would replace http://localhost:5000/statefun with the remote address
> > of
> > > > the SF.
> > > > However when I do make such a change the code is still referring to the
> > > > inner function and any changes to the local MyFunction class are
> > returned
> > > > regardless of what is deployed remotely.
> > > >
> > > > If anyone has a working example of how to interact via DataStreams
> > with a
> > > > remotely deployed SF, I would be very grateful. I would be very happy
> > to
> > > > update the documentation if I can get this working.
> > > > Cheers,
> > > > Barry
> > > >
> > > >
> > >
> >
> 

Re: DataStreamAPI and Stateful functions

Posted by Igal Shilman <ig...@apache.org>.
Hi Barry!
Glad to hear that it works for you!

I just didn't understand:
a) what is "flink.yaml" perhaps you are referring to "flink-conf.yaml"?
b) why is it bundled with the distribution jar? I couldn't find it there
(nor it should be there)
I've looked manually by:
jar tf statefun-flink-distribution-3.1.0.jar | grep "\.yaml" couldn't find
it there.

Generally flink-conf.yaml should be part of your Flink runtime. For example
a file at /opt/flink/conf/flink-conf.yaml

Thanks,
Igal.



On Thu, Sep 23, 2021 at 11:22 AM Barry Higgins <ba...@gmail.com>
wrote:

> Hi Igal,
> I just wanted to give you an update on my deployment of stateful functions
> to an existing Flink cluster.
> The good news is that it works now when I submit my config with the
> statefun-flink-distribution. Thank you very much for your help.
> There was one gotcha and that was down to the requirement to update the
> flink.yaml to include:
> classloader.parent-first-patterns.additional:
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>
> As the flink.yaml is bundled in the newly created fat jar, there is no
> easy way to edit that. My hacky solution was to code around that and create
> a new entrypoint class, which replicated the code in
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob without the
> validation code that was stopping my deployment.
> It may be easier if the flink.yaml in the statefun distribution dependency
> were shipped with the required value in it by default?
> Thanks a million,
> Barry
>
> On 2021/09/10 12:37:32, Igal Shilman <ig...@apache.org> wrote:
> > Hello Barry,
> >
> > I assume that by "we don't need another installation of Flink to manage
> the
> > stateful functions." You mean that you already have a running Flink
> cluster
> > and you would like to submit an additional Flink Job that executes a
> > Stateful functions application?
> >
> > Then perhaps just try to submit [1] to the flink cluster. In addition you
> > would have to make the module.yaml available in the class path.
> > You can simply place it under your flink distribution /lib directory, or
> > alternatively:
> > 1. create a jar-with-dependencies (uber jar / fat jar) that has [1] as a
> > dependency,
> > 2. Add  the module.yaml definition under src/resources.
> > 3. Make sure that the main class will
> > be org.apache.flink.statefun.flink.core.StatefulFunctionsJob
> > You should be able to submit it either the web interface, or by running:
> >
> > ./bin/flink run -c
> > org.apache.flink.statefun.flink.core.StatefulFunctionsJob
> > ./statefun-example.jar
> >
> > If this approach doesn't work for you, let me know and we will figure out
> > the DataStream integration approach.
> >
> > All the best,
> > Igal.
> >
> > [1]
> >
> https://mvnrepository.com/artifact/org.apache.flink/statefun-flink-distribution/3.1.0
> >
> >
> >
> > On Thu, Sep 9, 2021 at 5:22 PM Barry Higgins <barry.p.higgins1@gmail.com
> >
> > wrote:
> >
> > > Hi,
> > > I'm looing at using the DataStream API from a Flink application
> against a
> > > remote python stateful function deployed on another machine. I would
> like
> > > to investigate how feasible it is to have all of the state management
> being
> > > handled from the calling side meaning that we don't need another
> > > installation of Flink to manage the stateful functions.
> > >
> > > Unfortunately the example referenced in the documentation:
> > >
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/sdk/flink-datastream/
> > >
> > > is no longer in existence:
> > >
> > >
> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
> > >
> > > There is an older version that is available here:
> > >
> > >
> https://github.com/apache/flink-statefun/tree/release-2.2/statefun-examples/statefun-flink-datastream-example
> > >
> > > and I have tried to work with this without much success
> > >
> > > The calling element of the code looks as follows:
> > >
> > >    StatefulFunctionEgressStreams out =
> > >         StatefulFunctionDataStreamBuilder.builder("example")
> > >             .withDataStreamAsIngress(names)
> > >             .withFunctionProvider(GREET, unused -> new MyFunction())
> > >             .withRequestReplyRemoteFunction(
> > >                 requestReplyFunctionBuilder(
> > >                         REMOTE_GREET, URI.create("
> > > http://localhost:5000/statefun"))
> > >                     .withPersistedState("seen_count")
> > >                     .withMaxRequestDuration(Duration.ofSeconds(15))
> > >                     .withMaxNumBatchRequests(500))
> > >             .withEgressId(GREETINGS)
> > >             .withConfiguration(statefunConfig)
> > >             .build(env);
> > >
> > > with a reference to a FunctionProvider that exists as an inner class in
> > > the same class. We would like this to be a remote call, where I guess I
> > > would replace http://localhost:5000/statefun with the remote address
> of
> > > the SF.
> > > However when I do make such a change the code is still referring to the
> > > inner function and any changes to the local MyFunction class are
> returned
> > > regardless of what is deployed remotely.
> > >
> > > If anyone has a working example of how to interact via DataStreams
> with a
> > > remotely deployed SF, I would be very grateful. I would be very happy
> to
> > > update the documentation if I can get this working.
> > > Cheers,
> > > Barry
> > >
> > >
> >
>

Re: DataStreamAPI and Stateful functions

Posted by Barry Higgins <ba...@gmail.com>.
Hi Igal,
I just wanted to give you an update on my deployment of stateful functions to an existing Flink cluster.
The good news is that it works now when I submit my config with the statefun-flink-distribution. Thank you very much for your help.
There was one gotcha and that was down to the requirement to update the flink.yaml to include:
classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf

As the flink.yaml is bundled in the newly created fat jar, there is no easy way to edit that. My hacky solution was to code around that and create a new entrypoint class, which replicated the code in org.apache.flink.statefun.flink.core.StatefulFunctionsJob without the validation code that was stopping my deployment.
It may be easier if the flink.yaml in the statefun distribution dependency were shipped with the required value in it by default?
Thanks a million,
Barry

On 2021/09/10 12:37:32, Igal Shilman <ig...@apache.org> wrote: 
> Hello Barry,
> 
> I assume that by "we don't need another installation of Flink to manage the
> stateful functions." You mean that you already have a running Flink cluster
> and you would like to submit an additional Flink Job that executes a
> Stateful functions application?
> 
> Then perhaps just try to submit [1] to the flink cluster. In addition you
> would have to make the module.yaml available in the class path.
> You can simply place it under your flink distribution /lib directory, or
> alternatively:
> 1. create a jar-with-dependencies (uber jar / fat jar) that has [1] as a
> dependency,
> 2. Add  the module.yaml definition under src/resources.
> 3. Make sure that the main class will
> be org.apache.flink.statefun.flink.core.StatefulFunctionsJob
> You should be able to submit it either the web interface, or by running:
> 
> ./bin/flink run -c
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob
> ./statefun-example.jar
> 
> If this approach doesn't work for you, let me know and we will figure out
> the DataStream integration approach.
> 
> All the best,
> Igal.
> 
> [1]
> https://mvnrepository.com/artifact/org.apache.flink/statefun-flink-distribution/3.1.0
> 
> 
> 
> On Thu, Sep 9, 2021 at 5:22 PM Barry Higgins <ba...@gmail.com>
> wrote:
> 
> > Hi,
> > I'm looing at using the DataStream API from a Flink application against a
> > remote python stateful function deployed on another machine. I would like
> > to investigate how feasible it is to have all of the state management being
> > handled from the calling side meaning that we don't need another
> > installation of Flink to manage the stateful functions.
> >
> > Unfortunately the example referenced in the documentation:
> > https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/sdk/flink-datastream/
> >
> > is no longer in existence:
> >
> > https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
> >
> > There is an older version that is available here:
> >
> > https://github.com/apache/flink-statefun/tree/release-2.2/statefun-examples/statefun-flink-datastream-example
> >
> > and I have tried to work with this without much success
> >
> > The calling element of the code looks as follows:
> >
> >    StatefulFunctionEgressStreams out =
> >         StatefulFunctionDataStreamBuilder.builder("example")
> >             .withDataStreamAsIngress(names)
> >             .withFunctionProvider(GREET, unused -> new MyFunction())
> >             .withRequestReplyRemoteFunction(
> >                 requestReplyFunctionBuilder(
> >                         REMOTE_GREET, URI.create("
> > http://localhost:5000/statefun"))
> >                     .withPersistedState("seen_count")
> >                     .withMaxRequestDuration(Duration.ofSeconds(15))
> >                     .withMaxNumBatchRequests(500))
> >             .withEgressId(GREETINGS)
> >             .withConfiguration(statefunConfig)
> >             .build(env);
> >
> > with a reference to a FunctionProvider that exists as an inner class in
> > the same class. We would like this to be a remote call, where I guess I
> > would replace http://localhost:5000/statefun with the remote address of
> > the SF.
> > However when I do make such a change the code is still referring to the
> > inner function and any changes to the local MyFunction class are returned
> > regardless of what is deployed remotely.
> >
> > If anyone has a working example of how to interact via DataStreams with a
> > remotely deployed SF, I would be very grateful. I would be very happy to
> > update the documentation if I can get this working.
> > Cheers,
> > Barry
> >
> >
> 

Re: DataStreamAPI and Stateful functions

Posted by Igal Shilman <ig...@apache.org>.
Hello Barry,

I assume that by "we don't need another installation of Flink to manage the
stateful functions." You mean that you already have a running Flink cluster
and you would like to submit an additional Flink Job that executes a
Stateful functions application?

Then perhaps just try to submit [1] to the flink cluster. In addition you
would have to make the module.yaml available in the class path.
You can simply place it under your flink distribution /lib directory, or
alternatively:
1. create a jar-with-dependencies (uber jar / fat jar) that has [1] as a
dependency,
2. Add  the module.yaml definition under src/resources.
3. Make sure that the main class will
be org.apache.flink.statefun.flink.core.StatefulFunctionsJob
You should be able to submit it either the web interface, or by running:

./bin/flink run -c
org.apache.flink.statefun.flink.core.StatefulFunctionsJob
./statefun-example.jar

If this approach doesn't work for you, let me know and we will figure out
the DataStream integration approach.

All the best,
Igal.

[1]
https://mvnrepository.com/artifact/org.apache.flink/statefun-flink-distribution/3.1.0



On Thu, Sep 9, 2021 at 5:22 PM Barry Higgins <ba...@gmail.com>
wrote:

> Hi,
> I'm looing at using the DataStream API from a Flink application against a
> remote python stateful function deployed on another machine. I would like
> to investigate how feasible it is to have all of the state management being
> handled from the calling side meaning that we don't need another
> installation of Flink to manage the stateful functions.
>
> Unfortunately the example referenced in the documentation:
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/sdk/flink-datastream/
>
> is no longer in existence:
>
> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
>
> There is an older version that is available here:
>
> https://github.com/apache/flink-statefun/tree/release-2.2/statefun-examples/statefun-flink-datastream-example
>
> and I have tried to work with this without much success
>
> The calling element of the code looks as follows:
>
>    StatefulFunctionEgressStreams out =
>         StatefulFunctionDataStreamBuilder.builder("example")
>             .withDataStreamAsIngress(names)
>             .withFunctionProvider(GREET, unused -> new MyFunction())
>             .withRequestReplyRemoteFunction(
>                 requestReplyFunctionBuilder(
>                         REMOTE_GREET, URI.create("
> http://localhost:5000/statefun"))
>                     .withPersistedState("seen_count")
>                     .withMaxRequestDuration(Duration.ofSeconds(15))
>                     .withMaxNumBatchRequests(500))
>             .withEgressId(GREETINGS)
>             .withConfiguration(statefunConfig)
>             .build(env);
>
> with a reference to a FunctionProvider that exists as an inner class in
> the same class. We would like this to be a remote call, where I guess I
> would replace http://localhost:5000/statefun with the remote address of
> the SF.
> However when I do make such a change the code is still referring to the
> inner function and any changes to the local MyFunction class are returned
> regardless of what is deployed remotely.
>
> If anyone has a working example of how to interact via DataStreams with a
> remotely deployed SF, I would be very grateful. I would be very happy to
> update the documentation if I can get this working.
> Cheers,
> Barry
>
>