You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Lydian <ly...@gmail.com> on 2022/09/20 08:45:34 UTC

UNIMPLEMENTED method: org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker

Hi,
I am using protable runner (flink) with python SDK.  I am on latest version
of Beam (0.41).
The job is running on kubernetes. I launched the job manager with sidecar
container (using image: apache/beam_flink1.14_job_server:2.41.0) to start
the expansion service with following command:
```
java
-cp /opt/apache/beam/jars/
org.apache.beam.sdk.expansion.service.ExpansionService
8097
--javaClassLookupAllowlistFile=*
--defaultEnvironmentType=EXTERNAL
--defaultEnvironmentConfig=localhost:8097
```
In the code I am doing:
```
ReadFromKafka(
consumer_config={
"bootstrap.servers": 'BROKER',
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.jaas.config":
f'org.apache.kafka.common.security.scram.ScramLoginModule required
username="{sasl_username}" password="{sasl_password}";',
},
topics=[self.options.topic],
with_metadata=False,
expansion_service="localhost:8097"
)
```
But it shows with error:
```
2022-09-20 08:36:36,549 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Source: Impulse -> [3]Reading message from kafka/KafkaIO
.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
KafkaIO.ReadSourceDescriptors} (1/1) (da27593d9232b9781fa1db3fd49d228e)
switched from INITIALIZING to FAILED on 10.0.69.250:35101-76f99c @ ip-10-0-
69-250.ec2.internal (dataPort=43553).
org.apache.flink.util.SerializedThrowable:
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException:
UNIMPLEMENTED: Method not found: org.apache.beam.model.fn_execution.v1.
BeamFnExternalWorkerPool/StartWorker
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
LocalCache$Segment.get(LocalCache.java:2050) ~[?:?]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache
.get(LocalCache.java:3952) ~[?:?]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache
.getOrLoad(LocalCache.java:3974) ~[?:?]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[?:?]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[?:?]
at org.apache.beam.runners.fnexecution.control.
DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
DefaultJobBundleFactory.java:451) ~[?:?]
at org.apache.beam.runners.fnexecution.control.
DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
DefaultJobBundleFactory.java:436) ~[?:?]
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory
.forStage(DefaultJobBundleFactory.java:303) ~[?:?]
at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext
.getStageBundleFactory(DefaultExecutableStageContext.java:38) ~[?:?]
at org.apache.beam.runners.fnexecution.control.
ReferenceCountingExecutableStageContextFactory$WrappedContext
.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:
202) ~[?:?]
at org.apache.beam.runners.flink.translation.wrappers.streaming.
ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249)
~[?:?]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
StreamTask.java:711) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.
StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(
StreamTaskActionExecutor.java:100) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
StreamTask.java:687) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask
.java:654) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
Task.java:958) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
``` Does anyone know how I could fix this issue? Thanks!

Sincerely,
Lydian Lee

Re: UNIMPLEMENTED method: org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker

Posted by Lydian <ly...@gmail.com>.
I found a related thread:
https://www.mail-archive.com/user@beam.apache.org/msg06181.html
Turned out the job server and the task manager have to share the same
volume so that it is able to access it.
After adding a shared PVC to the job manager and task manager, the problem
is solved.

Although it seems weird to me because the error log complains about missing
an staging artifact in the task manager. However, I am able to find the
same file there. Wondering why that is required though.
For building the fat jar, I did tried using the BeamJarExpansionService:
```
BeamJarExpansionService(
      'sdks:java:io:expansion-service:shadowJar',
      append_args=[
          '--defaultEnvironmentType=PROCESS',

"--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/java_boot\"}",
          '--experiments=use_unbounded_sdf_wrapper',
      ])
```
which according to the doc is going to build the shadow jar for me, but
that also failed with the same error unless I ensure the task manager and
the job server are using the same PVC.



Sincerely,
Lydian Lee



On Wed, Sep 21, 2022 at 2:14 PM Lydian <ly...@gmail.com> wrote:

> Yes, the logs is on TaskManager.  The command line options is:
> ```
> --streaming
> --runner=portableRunner
> --environment_type=PROCESS
> --environment_config={"command": "/opt/apache/beam/boot"}
> ```
> Note that I actually copied both python and java in the same docker image
> for both job and task manager in flink.
>
> ```
> COPY --from=apache/beam_python3.7_sdk:2.41.0 /opt/apache/beam/
> /opt/apache/beam/
> COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/
> /opt/apache/beam_java/
> RUN mv /opt/apache/beam_java/boot /opt/apache/beam/java_boot && cp -r
> /opt/apache/beam_java/* /opt/apache/beam/
> ```
> Therefore the actual line to start the expansion service is:
> ```
> java -cp /opt/apache/beam/jars/*
> org.apache.beam.sdk.expansion.service.ExpansionService 8097
> --javaClassLookupAllowlistFile=*
> --defaultEnvironmentType=PROCESS
> --defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/java_boot\"}
> ```
>
> I am not familiar with Java, would you mind recommend me on how to pack
> all the dependencies (or how do I find what are the dependencies) in a
> single jar? Thanks!
>
>
> Sincerely,
> Lydian Lee
>
>
>
> On Wed, Sep 21, 2022 at 1:11 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Lydian,
>>
>> I'm not sure about this one. Can you please clarify - the logs are logs
>> of TaskManager, right? Can you please share the command line options you
>> pass to the Pipeline?
>>
>> I'm not sure why the SDK harness would need the job server, that seems
>> strange. Can you let the python x-lang transform start its own expansion
>> service using the default expansion service? That would mean you have to:
>>
>>  a) pack the expansion service with dependencies into single jar (shadow
>> jar)
>>
>>  b) create something like get_expansion_service function in [1]
>>
>>  c) passing this expansion service to the ReadFromKafka(...,
>> expansion_service=get_expansion_service())
>>
>> That should start the expansion service locally where you run your python
>> main method and then submit the job.
>>
>>   Jan
>>
>> [1]
>> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py
>> On 9/20/22 23:26, Lydian wrote:
>>
>> Thanks Jan! I am now building my taskmanager with
>> ```
>> COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/
>> /opt/apache/beam/
>> ```
>> and start the expansion service with:
>> ```
>> java -cp /opt/apache/beam/jars/*
>> org.apache.beam.sdk.expansion.service.ExpansionService 8097
>> --javaClassLookupAllowlistFile=*
>> --defaultEnvironmentType=PROCESS
>> --defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/boot\"}
>> ```
>> But now I got another error which seems related to staging artifact:
>> ```
>> 2022/09/20 20:56:35 Initializing java harness: /opt/apache/beam/java_boot
>> --id=1-1 --provision_endpoint=localhost:33487
>> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>>                   [] - opening file
>> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-ru
>> nners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy
>> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>>                   [] - opening file
>> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/3-external_1beam:env:proces-jaccess
>> -oq6STrCYDJnGcSOIfKmQgoBfAcDfKf-zQ3-p2ZtEQsY.jar
>> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>>                   [] - opening file
>> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/2-external_1beam:env:proces-dnsns-F
>> kpviqOgSjgfQr2mi062RMyZ-gSRyWRKVwOvxTXdcFA.jar
>> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>>                   [] - opening file
>> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/5-external_1beam:env:proces-nashorn
>> -y9iNNWfaBYThUJK3bCSr0D1ntOzfXi13Zp8V-pzM2h0.jar
>> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>>                   [] - opening file
>> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/1-external_1beam:env:proces-cldrdat
>> a-IHVqwlrHHtgWuTtrhzSTUFawFn1x1Qi97s3jYsygE0Y.jar
>> 2022-09-20 20:56:35,923 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>>                   [] - opening file
>> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/4-external_1beam:env:proces-localed
>> ata-xQnGOSPHoBjAh1B5aVBszDCEI8BXc7-R_L7taFUKHg8.jar
>> java.io.FileNotFoundException:
>> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-runners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy
>> (No such fil
>> e or directory)
>>         at java.io.FileInputStream.open0(Native Method)
>>         at java.io.FileInputStream.open(FileInputStream.java:195)
>>         at java.io.FileInputStream.<init>(FileInputStream.java:138)
>>         at
>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:128)
>>         at
>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:84)
>>         at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:256)
>>         at
>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:124)
>>         at
>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
>>         at
>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:315)
>>         at
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
>>         at
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>>         at
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>>         at
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>>         at
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>>         at
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:340)
>>         at
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
>>         at
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>         at
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>> ```
>>
>> Does anyone know which things I would also need to configure?  Thanks
>>
>>
>> Sincerely,
>> Lydian Lee
>>
>>
>>
>> On Tue, Sep 20, 2022 at 1:57 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Lydian,
>>>
>>> there are two parts involved.
>>>
>>>  a) expansion service (which you run on port 8097) - this service
>>> expands the ReadFromKafka which is Java transform
>>>
>>>  b) Java SDK environment, which is not the expansion service, it must be
>>> some environment that is able to run the Java ReadFromKafka transform. In
>>> flink, you can use PROCESS environment type (e.g. [1]), but there might be
>>> other options (e.g. DOCKER), see [2]
>>>
>>> Hope this helps,
>>>
>>>  Jan
>>>
>>> [1]
>>> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py
>>>
>>> [2] https://beam.apache.org/documentation/runtime/sdk-harness-config/
>>> On 9/20/22 10:45, Lydian wrote:
>>>
>>> Hi,
>>> I am using protable runner (flink) with python SDK.  I am on latest
>>> version of Beam (0.41).
>>> The job is running on kubernetes. I launched the job manager with
>>> sidecar container (using image: apache/beam_flink1.14_job_server:2.41.0) to
>>> start the expansion service with following command:
>>> ```
>>> java
>>> -cp /opt/apache/beam/jars/
>>> org.apache.beam.sdk.expansion.service.ExpansionService
>>> 8097
>>> --javaClassLookupAllowlistFile=*
>>> --defaultEnvironmentType=EXTERNAL
>>> --defaultEnvironmentConfig=localhost:8097
>>> ```
>>> In the code I am doing:
>>> ```
>>> ReadFromKafka(
>>> consumer_config={
>>> "bootstrap.servers": 'BROKER',
>>> "security.protocol": "SASL_SSL",
>>> "sasl.mechanism": "SCRAM-SHA-512",
>>> "sasl.jaas.config":
>>> f'org.apache.kafka.common.security.scram.ScramLoginModule required
>>> username="{sasl_username}" password="{sasl_password}";',
>>> },
>>> topics=[self.options.topic],
>>> with_metadata=False,
>>> expansion_service="localhost:8097"
>>> )
>>> ```
>>> But it shows with error:
>>> ```
>>> 2022-09-20 08:36:36,549 INFO org.apache.flink.runtime.executiongraph.
>>> ExecutionGraph [] - Source: Impulse -> [3]Reading message from kafka/
>>> KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(
>>> GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)
>>> (da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to FAILED
>>> on 10.0.69.250:35101-76f99c @ ip-10-0-69-250.ec2.internal (dataPort=
>>> 43553).
>>> org.apache.flink.util.SerializedThrowable:
>>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException:
>>> UNIMPLEMENTED: Method not found: org.apache.beam.model.fn_execution.v1.
>>> BeamFnExternalWorkerPool/StartWorker
>>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>>> LocalCache$Segment.get(LocalCache.java:2050) ~[?:?]
>>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>>> LocalCache.get(LocalCache.java:3952) ~[?:?]
>>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>>> LocalCache.getOrLoad(LocalCache.java:3974) ~[?:?]
>>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>>> LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[?:?]
>>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>>> LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[?:?]
>>> at org.apache.beam.runners.fnexecution.control.
>>> DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
>>> DefaultJobBundleFactory.java:451) ~[?:?]
>>> at org.apache.beam.runners.fnexecution.control.
>>> DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
>>> DefaultJobBundleFactory.java:436) ~[?:?]
>>> at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory
>>> .forStage(DefaultJobBundleFactory.java:303) ~[?:?]
>>> at org.apache.beam.runners.fnexecution.control.
>>> DefaultExecutableStageContext.getStageBundleFactory(
>>> DefaultExecutableStageContext.java:38) ~[?:?]
>>> at org.apache.beam.runners.fnexecution.control.
>>> ReferenceCountingExecutableStageContextFactory$WrappedContext
>>> .getStageBundleFactory(ReferenceCountingExecutableStageContextFactory
>>> .java:202) ~[?:?]
>>> at org.apache.beam.runners.flink.translation.wrappers.streaming.
>>> ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249)
>>> ~[?:?]
>>> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
>>> .initializeStateAndOpenOperators(RegularOperatorChain.java:110)
>>> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
>>> StreamTask.java:711) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>> at org.apache.flink.streaming.runtime.tasks.
>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(
>>> StreamTaskActionExecutor.java:100) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
>>> StreamTask.java:687) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(
>>> StreamTask.java:654) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>> at org.apache.flink.runtime.taskmanager.Task
>>> .runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.12-1.14.5
>>> .jar:1.14.5]
>>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
>>> 927) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>>> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
>>> ``` Does anyone know how I could fix this issue? Thanks!
>>> Sincerely,
>>> Lydian Lee
>>>
>>>

Re: UNIMPLEMENTED method: org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker

Posted by Lydian <ly...@gmail.com>.
Yes, the logs is on TaskManager.  The command line options is:
```
--streaming
--runner=portableRunner
--environment_type=PROCESS
--environment_config={"command": "/opt/apache/beam/boot"}
```
Note that I actually copied both python and java in the same docker image
for both job and task manager in flink.

```
COPY --from=apache/beam_python3.7_sdk:2.41.0 /opt/apache/beam/
/opt/apache/beam/
COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/
/opt/apache/beam_java/
RUN mv /opt/apache/beam_java/boot /opt/apache/beam/java_boot && cp -r
/opt/apache/beam_java/* /opt/apache/beam/
```
Therefore the actual line to start the expansion service is:
```
java -cp /opt/apache/beam/jars/*
org.apache.beam.sdk.expansion.service.ExpansionService 8097
--javaClassLookupAllowlistFile=*
--defaultEnvironmentType=PROCESS
--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/java_boot\"}
```

I am not familiar with Java, would you mind recommend me on how to pack all
the dependencies (or how do I find what are the dependencies) in a single
jar? Thanks!


Sincerely,
Lydian Lee



On Wed, Sep 21, 2022 at 1:11 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Lydian,
>
> I'm not sure about this one. Can you please clarify - the logs are logs of
> TaskManager, right? Can you please share the command line options you pass
> to the Pipeline?
>
> I'm not sure why the SDK harness would need the job server, that seems
> strange. Can you let the python x-lang transform start its own expansion
> service using the default expansion service? That would mean you have to:
>
>  a) pack the expansion service with dependencies into single jar (shadow
> jar)
>
>  b) create something like get_expansion_service function in [1]
>
>  c) passing this expansion service to the ReadFromKafka(...,
> expansion_service=get_expansion_service())
>
> That should start the expansion service locally where you run your python
> main method and then submit the job.
>
>   Jan
>
> [1]
> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py
> On 9/20/22 23:26, Lydian wrote:
>
> Thanks Jan! I am now building my taskmanager with
> ```
> COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/
> /opt/apache/beam/
> ```
> and start the expansion service with:
> ```
> java -cp /opt/apache/beam/jars/*
> org.apache.beam.sdk.expansion.service.ExpansionService 8097
> --javaClassLookupAllowlistFile=*
> --defaultEnvironmentType=PROCESS
> --defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/boot\"}
> ```
> But now I got another error which seems related to staging artifact:
> ```
> 2022/09/20 20:56:35 Initializing java harness: /opt/apache/beam/java_boot
> --id=1-1 --provision_endpoint=localhost:33487
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>                 [] - opening file
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-ru
> nners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>                 [] - opening file
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/3-external_1beam:env:proces-jaccess
> -oq6STrCYDJnGcSOIfKmQgoBfAcDfKf-zQ3-p2ZtEQsY.jar
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>                 [] - opening file
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/2-external_1beam:env:proces-dnsns-F
> kpviqOgSjgfQr2mi062RMyZ-gSRyWRKVwOvxTXdcFA.jar
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>                 [] - opening file
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/5-external_1beam:env:proces-nashorn
> -y9iNNWfaBYThUJK3bCSr0D1ntOzfXi13Zp8V-pzM2h0.jar
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>                 [] - opening file
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/1-external_1beam:env:proces-cldrdat
> a-IHVqwlrHHtgWuTtrhzSTUFawFn1x1Qi97s3jYsygE0Y.jar
> 2022-09-20 20:56:35,923 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>                 [] - opening file
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/4-external_1beam:env:proces-localed
> ata-xQnGOSPHoBjAh1B5aVBszDCEI8BXc7-R_L7taFUKHg8.jar
> java.io.FileNotFoundException:
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-runners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy
> (No such fil
> e or directory)
>         at java.io.FileInputStream.open0(Native Method)
>         at java.io.FileInputStream.open(FileInputStream.java:195)
>         at java.io.FileInputStream.<init>(FileInputStream.java:138)
>         at
> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:128)
>         at
> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:84)
>         at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:256)
>         at
> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:124)
>         at
> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
>         at
> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:315)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:340)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> ```
>
> Does anyone know which things I would also need to configure?  Thanks
>
>
> Sincerely,
> Lydian Lee
>
>
>
> On Tue, Sep 20, 2022 at 1:57 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Lydian,
>>
>> there are two parts involved.
>>
>>  a) expansion service (which you run on port 8097) - this service expands
>> the ReadFromKafka which is Java transform
>>
>>  b) Java SDK environment, which is not the expansion service, it must be
>> some environment that is able to run the Java ReadFromKafka transform. In
>> flink, you can use PROCESS environment type (e.g. [1]), but there might be
>> other options (e.g. DOCKER), see [2]
>>
>> Hope this helps,
>>
>>  Jan
>>
>> [1]
>> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py
>>
>> [2] https://beam.apache.org/documentation/runtime/sdk-harness-config/
>> On 9/20/22 10:45, Lydian wrote:
>>
>> Hi,
>> I am using protable runner (flink) with python SDK.  I am on latest
>> version of Beam (0.41).
>> The job is running on kubernetes. I launched the job manager with sidecar
>> container (using image: apache/beam_flink1.14_job_server:2.41.0) to start
>> the expansion service with following command:
>> ```
>> java
>> -cp /opt/apache/beam/jars/
>> org.apache.beam.sdk.expansion.service.ExpansionService
>> 8097
>> --javaClassLookupAllowlistFile=*
>> --defaultEnvironmentType=EXTERNAL
>> --defaultEnvironmentConfig=localhost:8097
>> ```
>> In the code I am doing:
>> ```
>> ReadFromKafka(
>> consumer_config={
>> "bootstrap.servers": 'BROKER',
>> "security.protocol": "SASL_SSL",
>> "sasl.mechanism": "SCRAM-SHA-512",
>> "sasl.jaas.config":
>> f'org.apache.kafka.common.security.scram.ScramLoginModule required
>> username="{sasl_username}" password="{sasl_password}";',
>> },
>> topics=[self.options.topic],
>> with_metadata=False,
>> expansion_service="localhost:8097"
>> )
>> ```
>> But it shows with error:
>> ```
>> 2022-09-20 08:36:36,549 INFO org.apache.flink.runtime.executiongraph.
>> ExecutionGraph [] - Source: Impulse -> [3]Reading message from kafka/
>> KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(
>> GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)
>> (da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to FAILED
>> on 10.0.69.250:35101-76f99c @ ip-10-0-69-250.ec2.internal (dataPort=43553
>> ).
>> org.apache.flink.util.SerializedThrowable:
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException:
>> UNIMPLEMENTED: Method not found: org.apache.beam.model.fn_execution.v1.
>> BeamFnExternalWorkerPool/StartWorker
>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>> LocalCache$Segment.get(LocalCache.java:2050) ~[?:?]
>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>> LocalCache.get(LocalCache.java:3952) ~[?:?]
>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>> LocalCache.getOrLoad(LocalCache.java:3974) ~[?:?]
>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>> LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[?:?]
>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>> LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[?:?]
>> at org.apache.beam.runners.fnexecution.control.
>> DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
>> DefaultJobBundleFactory.java:451) ~[?:?]
>> at org.apache.beam.runners.fnexecution.control.
>> DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
>> DefaultJobBundleFactory.java:436) ~[?:?]
>> at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory
>> .forStage(DefaultJobBundleFactory.java:303) ~[?:?]
>> at org.apache.beam.runners.fnexecution.control.
>> DefaultExecutableStageContext.getStageBundleFactory(
>> DefaultExecutableStageContext.java:38) ~[?:?]
>> at org.apache.beam.runners.fnexecution.control.
>> ReferenceCountingExecutableStageContextFactory$WrappedContext
>> .getStageBundleFactory(ReferenceCountingExecutableStageContextFactory
>> .java:202) ~[?:?]
>> at org.apache.beam.runners.flink.translation.wrappers.streaming.
>> ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249)
>> ~[?:?]
>> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
>> .initializeStateAndOpenOperators(RegularOperatorChain.java:110)
>> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
>> StreamTask.java:711) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.streaming.runtime.tasks.
>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(
>> StreamTaskActionExecutor.java:100) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
>> StreamTask.java:687) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask
>> .java:654) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
>> Task.java:958) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
>> 927) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
>> ``` Does anyone know how I could fix this issue? Thanks!
>> Sincerely,
>> Lydian Lee
>>
>>

Re: UNIMPLEMENTED method: org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Lydian,

I'm not sure about this one. Can you please clarify - the logs are logs 
of TaskManager, right? Can you please share the command line options you 
pass to the Pipeline?

I'm not sure why the SDK harness would need the job server, that seems 
strange. Can you let the python x-lang transform start its own expansion 
service using the default expansion service? That would mean you have to:

  a) pack the expansion service with dependencies into single jar 
(shadow jar)

  b) create something like get_expansion_service function in [1]

  c) passing this expansion service to the ReadFromKafka(..., 
expansion_service=get_expansion_service())

That should start the expansion service locally where you run your 
python main method and then submit the job.

   Jan

[1] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py

On 9/20/22 23:26, Lydian wrote:
> Thanks Jan! I am now building my taskmanager with
> ```
> COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/ 
> /opt/apache/beam/
> ```
> and start the expansion service with:
> ```
> java -cp /opt/apache/beam/jars/* 
> org.apache.beam.sdk.expansion.service.ExpansionService 8097
> --javaClassLookupAllowlistFile=*
> --defaultEnvironmentType=PROCESS
> --defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/boot\"}
> ```
> But now I got another error which seems related to staging artifact:
> ```
> 2022/09/20 20:56:35 Initializing java harness: 
> /opt/apache/beam/java_boot --id=1-1 --provision_endpoint=localhost:33487
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem   
>                     [] - opening file 
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-ru
> nners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem   
>                     [] - opening file 
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/3-external_1beam:env:proces-jaccess
> -oq6STrCYDJnGcSOIfKmQgoBfAcDfKf-zQ3-p2ZtEQsY.jar
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem   
>                     [] - opening file 
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/2-external_1beam:env:proces-dnsns-F
> kpviqOgSjgfQr2mi062RMyZ-gSRyWRKVwOvxTXdcFA.jar
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem   
>                     [] - opening file 
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/5-external_1beam:env:proces-nashorn
> -y9iNNWfaBYThUJK3bCSr0D1ntOzfXi13Zp8V-pzM2h0.jar
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem   
>                     [] - opening file 
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/1-external_1beam:env:proces-cldrdat
> a-IHVqwlrHHtgWuTtrhzSTUFawFn1x1Qi97s3jYsygE0Y.jar
> 2022-09-20 20:56:35,923 DEBUG org.apache.beam.sdk.io.LocalFileSystem   
>                     [] - opening file 
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/4-external_1beam:env:proces-localed
> ata-xQnGOSPHoBjAh1B5aVBszDCEI8BXc7-R_L7taFUKHg8.jar
> java.io.FileNotFoundException: 
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-runners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy 
> (No such fil
> e or directory)
>         at java.io.FileInputStream.open0(Native Method)
>         at java.io.FileInputStream.open(FileInputStream.java:195)
>         at java.io.FileInputStream.<init>(FileInputStream.java:138)
>         at 
> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:128)
>         at 
> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:84)
>         at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:256)
>         at 
> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:124)
>         at 
> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
>         at 
> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:315)
>         at 
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
>         at 
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>         at 
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>         at 
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>         at 
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>         at 
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:340)
>         at 
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
>         at 
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>         at 
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> ```
>
> Does anyone know which things I would also need to configure? Thanks
>
>
> Sincerely,
> Lydian Lee
>
>
>
> On Tue, Sep 20, 2022 at 1:57 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>     Hi Lydian,
>
>     there are two parts involved.
>
>      a) expansion service (which you run on port 8097) - this service
>     expands the ReadFromKafka which is Java transform
>
>      b) Java SDK environment, which is not the expansion service, it
>     must be some environment that is able to run the Java
>     ReadFromKafka transform. In flink, you can use PROCESS environment
>     type (e.g. [1]), but there might be other options (e.g. DOCKER),
>     see [2]
>
>     Hope this helps,
>
>      Jan
>
>     [1]
>     https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py
>
>     [2] https://beam.apache.org/documentation/runtime/sdk-harness-config/
>
>     On 9/20/22 10:45, Lydian wrote:
>>     Hi,
>>     I am using protable runner (flink) with python SDK.  I am on
>>     latest version of Beam (0.41).
>>     The job is running on kubernetes. I launched the job manager with
>>     sidecar container (using
>>     image: apache/beam_flink1.14_job_server:2.41.0) to start the
>>     expansion service with following command:
>>     ```
>>     java
>>     -cp /opt/apache/beam/jars/
>>     org.apache.beam.sdk.expansion.service.ExpansionService
>>     8097
>>     --javaClassLookupAllowlistFile=*
>>     --defaultEnvironmentType=EXTERNAL
>>     --defaultEnvironmentConfig=localhost:8097
>>     ```
>>     In the code I am doing:
>>     ```
>>     ReadFromKafka(
>>     consumer_config={
>>     "bootstrap.servers": 'BROKER',
>>     "security.protocol": "SASL_SSL",
>>     "sasl.mechanism": "SCRAM-SHA-512",
>>     "sasl.jaas.config":
>>     f'org.apache.kafka.common.security.scram.ScramLoginModule
>>     required username="{sasl_username}" password="{sasl_password}";',
>>     },
>>     topics=[self.options.topic],
>>     with_metadata=False,
>>     expansion_service="localhost:8097"
>>     )
>>     ```
>>     But it shows with error:
>>     ```
>>     2022-09-20 08:36:36,549 INFO
>>     org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>>     Source: Impulse -> [3]Reading message from
>>     kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
>>     KafkaIO.ReadSourceDescriptors} (1/1)
>>     (da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to
>>     FAILED on 10.0.69.250:35101-76f99c @ ip-10-0-69-250.ec2.internal
>>     (dataPort=43553).
>>     org.apache.flink.util.SerializedThrowable:
>>     org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException:
>>     UNIMPLEMENTED: Method not found:
>>     org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker
>>     at
>>     org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
>>     ~[?:?]
>>     at
>>     org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
>>     ~[?:?]
>>     at
>>     org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
>>     ~[?:?]
>>     at
>>     org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
>>     ~[?:?]
>>     at
>>     org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
>>     ~[?:?]
>>     at
>>     org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
>>     ~[?:?]
>>     at
>>     org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
>>     ~[?:?]
>>     at
>>     org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
>>     ~[?:?]
>>     at
>>     org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
>>     ~[?:?]
>>     at
>>     org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
>>     ~[?:?]
>>     at
>>     org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249)
>>     ~[?:?]
>>     at
>>     org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
>>     ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>     at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
>>     ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>     at
>>     org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>>     ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>     at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
>>     ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>     at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
>>     ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>     at
>>     org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>>     ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>     at
>>     org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>>     ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>>     ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>     ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
>>     ``` Does anyone know how I could fix this issue? Thanks!
>>     Sincerely,
>>     Lydian Lee
>>

Re: UNIMPLEMENTED method: org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker

Posted by Lydian <ly...@gmail.com>.
Thanks Jan! I am now building my taskmanager with
```
COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/ /opt/apache/beam/
```
and start the expansion service with:
```
java -cp /opt/apache/beam/jars/*
org.apache.beam.sdk.expansion.service.ExpansionService 8097
--javaClassLookupAllowlistFile=*
--defaultEnvironmentType=PROCESS
--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/boot\"}
```
But now I got another error which seems related to staging artifact:
```
2022/09/20 20:56:35 Initializing java harness: /opt/apache/beam/java_boot
--id=1-1 --provision_endpoint=localhost:33487
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
                [] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-ru
nners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
                [] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/3-external_1beam:env:proces-jaccess
-oq6STrCYDJnGcSOIfKmQgoBfAcDfKf-zQ3-p2ZtEQsY.jar
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
                [] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/2-external_1beam:env:proces-dnsns-F
kpviqOgSjgfQr2mi062RMyZ-gSRyWRKVwOvxTXdcFA.jar
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
                [] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/5-external_1beam:env:proces-nashorn
-y9iNNWfaBYThUJK3bCSr0D1ntOzfXi13Zp8V-pzM2h0.jar
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
                [] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/1-external_1beam:env:proces-cldrdat
a-IHVqwlrHHtgWuTtrhzSTUFawFn1x1Qi97s3jYsygE0Y.jar
2022-09-20 20:56:35,923 DEBUG org.apache.beam.sdk.io.LocalFileSystem
                [] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/4-external_1beam:env:proces-localed
ata-xQnGOSPHoBjAh1B5aVBszDCEI8BXc7-R_L7taFUKHg8.jar
java.io.FileNotFoundException:
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-runners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy
(No such fil
e or directory)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at
org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:128)
        at
org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:84)
        at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:256)
        at
org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:124)
        at
org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
        at
org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:315)
        at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
        at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
        at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
        at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
        at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
        at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:340)
        at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
        at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
```

Does anyone know which things I would also need to configure?  Thanks


Sincerely,
Lydian Lee



On Tue, Sep 20, 2022 at 1:57 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Lydian,
>
> there are two parts involved.
>
>  a) expansion service (which you run on port 8097) - this service expands
> the ReadFromKafka which is Java transform
>
>  b) Java SDK environment, which is not the expansion service, it must be
> some environment that is able to run the Java ReadFromKafka transform. In
> flink, you can use PROCESS environment type (e.g. [1]), but there might be
> other options (e.g. DOCKER), see [2]
>
> Hope this helps,
>
>  Jan
>
> [1]
> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py
>
> [2] https://beam.apache.org/documentation/runtime/sdk-harness-config/
> On 9/20/22 10:45, Lydian wrote:
>
> Hi,
> I am using protable runner (flink) with python SDK.  I am on latest
> version of Beam (0.41).
> The job is running on kubernetes. I launched the job manager with sidecar
> container (using image: apache/beam_flink1.14_job_server:2.41.0) to start
> the expansion service with following command:
> ```
> java
> -cp /opt/apache/beam/jars/
> org.apache.beam.sdk.expansion.service.ExpansionService
> 8097
> --javaClassLookupAllowlistFile=*
> --defaultEnvironmentType=EXTERNAL
> --defaultEnvironmentConfig=localhost:8097
> ```
> In the code I am doing:
> ```
> ReadFromKafka(
> consumer_config={
> "bootstrap.servers": 'BROKER',
> "security.protocol": "SASL_SSL",
> "sasl.mechanism": "SCRAM-SHA-512",
> "sasl.jaas.config":
> f'org.apache.kafka.common.security.scram.ScramLoginModule required
> username="{sasl_username}" password="{sasl_password}";',
> },
> topics=[self.options.topic],
> with_metadata=False,
> expansion_service="localhost:8097"
> )
> ```
> But it shows with error:
> ```
> 2022-09-20 08:36:36,549 INFO org.apache.flink.runtime.executiongraph.
> ExecutionGraph [] - Source: Impulse -> [3]Reading message from kafka/
> KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(
> GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)
> (da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to FAILED on
> 10.0.69.250:35101-76f99c @ ip-10-0-69-250.ec2.internal (dataPort=43553).
> org.apache.flink.util.SerializedThrowable:
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException:
> UNIMPLEMENTED: Method not found: org.apache.beam.model.fn_execution.v1.
> BeamFnExternalWorkerPool/StartWorker
> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
> LocalCache$Segment.get(LocalCache.java:2050) ~[?:?]
> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
> LocalCache.get(LocalCache.java:3952) ~[?:?]
> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
> LocalCache.getOrLoad(LocalCache.java:3974) ~[?:?]
> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
> LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[?:?]
> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
> LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[?:?]
> at org.apache.beam.runners.fnexecution.control.
> DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
> DefaultJobBundleFactory.java:451) ~[?:?]
> at org.apache.beam.runners.fnexecution.control.
> DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
> DefaultJobBundleFactory.java:436) ~[?:?]
> at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory
> .forStage(DefaultJobBundleFactory.java:303) ~[?:?]
> at org.apache.beam.runners.fnexecution.control.
> DefaultExecutableStageContext.getStageBundleFactory(
> DefaultExecutableStageContext.java:38) ~[?:?]
> at org.apache.beam.runners.fnexecution.control.
> ReferenceCountingExecutableStageContextFactory$WrappedContext
> .getStageBundleFactory(ReferenceCountingExecutableStageContextFactory
> .java:202) ~[?:?]
> at org.apache.beam.runners.flink.translation.wrappers.streaming.
> ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249)
> ~[?:?]
> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
> .initializeStateAndOpenOperators(RegularOperatorChain.java:110)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
> StreamTask.java:711) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(
> StreamTaskActionExecutor.java:100) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
> StreamTask.java:687) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask
> .java:654) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
> Task.java:958) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
> 927) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> ``` Does anyone know how I could fix this issue? Thanks!
> Sincerely,
> Lydian Lee
>
>

Re: UNIMPLEMENTED method: org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Lydian,

there are two parts involved.

  a) expansion service (which you run on port 8097) - this service 
expands the ReadFromKafka which is Java transform

  b) Java SDK environment, which is not the expansion service, it must 
be some environment that is able to run the Java ReadFromKafka 
transform. In flink, you can use PROCESS environment type (e.g. [1]), 
but there might be other options (e.g. DOCKER), see [2]

Hope this helps,

  Jan

[1] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py

[2] https://beam.apache.org/documentation/runtime/sdk-harness-config/

On 9/20/22 10:45, Lydian wrote:
> Hi,
> I am using protable runner (flink) with python SDK.  I am on latest 
> version of Beam (0.41).
> The job is running on kubernetes. I launched the job manager with 
> sidecar container (using 
> image: apache/beam_flink1.14_job_server:2.41.0) to start the expansion 
> service with following command:
> ```
> java
> -cp /opt/apache/beam/jars/
> org.apache.beam.sdk.expansion.service.ExpansionService
> 8097
> --javaClassLookupAllowlistFile=*
> --defaultEnvironmentType=EXTERNAL
> --defaultEnvironmentConfig=localhost:8097
> ```
> In the code I am doing:
> ```
> ReadFromKafka(
> consumer_config={
> "bootstrap.servers": 'BROKER',
> "security.protocol": "SASL_SSL",
> "sasl.mechanism": "SCRAM-SHA-512",
> "sasl.jaas.config": 
> f'org.apache.kafka.common.security.scram.ScramLoginModule required 
> username="{sasl_username}" password="{sasl_password}";',
> },
> topics=[self.options.topic],
> with_metadata=False,
> expansion_service="localhost:8097"
> )
> ```
> But it shows with error:
> ```
> 2022-09-20 08:36:36,549 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: 
> Impulse -> [3]Reading message from 
> kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), 
> KafkaIO.ReadSourceDescriptors} (1/1) 
> (da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to 
> FAILED on 10.0.69.250:35101-76f99c @ ip-10-0-69-250.ec2.internal 
> (dataPort=43553).
> org.apache.flink.util.SerializedThrowable: 
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: 
> UNIMPLEMENTED: Method not found: 
> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) 
> ~[?:?]
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) 
> ~[?:?]
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) 
> ~[?:?]
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) 
> ~[?:?]
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) 
> ~[?:?]
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451) 
> ~[?:?]
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436) 
> ~[?:?]
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303) 
> ~[?:?]
> at 
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) 
> ~[?:?]
> at 
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202) 
> ~[?:?]
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249) 
> ~[?:?]
> at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) 
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) 
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) 
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) 
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) 
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) 
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> ``` Does anyone know how I could fix this issue? Thanks!
> Sincerely,
> Lydian Lee
>