You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by David Causse <dc...@wikimedia.org> on 2023/04/05 09:49:49 UTC

flink-operator: jarURI seems mandatory

Hi,

I'm trying to deploy a job (flink 1.16) with the flink-operator, the job
jar is part of the image and placed under /opt/flink/usrlib.
I thought that by placing the job jar there I could avoid setting the
jarURI in the JobSpec but I'm getting a NPE (pasted at the end of this
email) suggesting that this param is actually mandatory.
Ultimately what I would like is to avoid pushing the job jar to the flink
H/A storage since it's part of the image.
I'm probably missing something regarding the use cases where jarURI can be
omitted.

Thanks!

David.

org.apache.flink.kubernetes.operator.exception.ReconciliationException:
java.lang.NullPointerException
        at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:142)
        at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
        at
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:145)
        at
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:103)
        at
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
        at
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:102)
        at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
        at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
        at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
        at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
        at
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
        at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
        at
org.apache.flink.kubernetes.utils.KubernetesUtils.checkJarFileForApplicationMode(KubernetesUtils.java:403)
        at
org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployApplicationCluster(KubernetesClusterDescriptor.java:207)
        at
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
        at
org.apache.flink.kubernetes.operator.service.NativeFlinkService.deployApplicationCluster(NativeFlinkService.java:66)
        at
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:184)
        at
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:182)
        at
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:59)
        at
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:115)
        at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:130)
        ... 13 more

Re: flink-operator: jarURI seems mandatory

Posted by David Causse <dc...@wikimedia.org>.
Hi Weihua,

Thanks for your reply!

Even when I use jarURI: "local:///path/to/job.jar" I can see that the
classes loaded by the taskmanager are from something like:

file:/tmp/tm_flink-app-wdqs-taskmanager-1-10/blobStorage/job_94ec61137dc8814f4558116848dd29cc/blob_p-0037843784c2e3ee289c9cc3ec644bc0df2c1084-39e459efea8e0348e5a3c635e1c947c2

when logging:
aRandomJobObject.getClass().getProtectionDomain().getCodeSource().getLocation()
This file does seem present on the object storage under the
flink_ha_storage/flink-app-wdqs/blob/job_94ec61137dc8814f4558116848dd29cc/
si it's uploaded at some point.

I can confirm that I see things like in taskmanager logs:
Loading configuration property: pipeline.jars,
local:///opt/flink/streaming-updater-producer.jar
[...]
Loading dynamic configuration property: pipeline.jars,
file:/opt/flink/streaming-updater-producer.jar
So I'm not sure what makes flink willing to use the H/A blob storage rather
than the path provided in the image.

Regarding jarURI being optional I found
https://github.com/apache/flink-kubernetes-operator/pull/370 but I'm unsure
if this is going to help my case.

Thanks!

On Thu, Apr 6, 2023 at 7:38 AM Weihua Hu <hu...@gmail.com> wrote:

> Hi, David
>
> The jarURI is required[1], otherwise Flink doesn't know which jar should
> be used.
>
> If you are using application mode, you can set jarURI to
> "local:///opt/flink/usrlib/your-job.jar", and the jar will not upload to
> H/A storage.
>
> Best,
> Weihua
>
>
> On Wed, Apr 5, 2023 at 5:51 PM David Causse <dc...@wikimedia.org> wrote:
>
>> Hi,
>>
>> I'm trying to deploy a job (flink 1.16) with the flink-operator, the job
>> jar is part of the image and placed under /opt/flink/usrlib.
>> I thought that by placing the job jar there I could avoid setting the
>> jarURI in the JobSpec but I'm getting a NPE (pasted at the end of this
>> email) suggesting that this param is actually mandatory.
>> Ultimately what I would like is to avoid pushing the job jar to the flink
>> H/A storage since it's part of the image.
>> I'm probably missing something regarding the use cases where jarURI can
>> be omitted.
>>
>> Thanks!
>>
>> David.
>>
>> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
>> java.lang.NullPointerException
>>         at
>> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:142)
>>         at
>> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
>>         at
>> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:145)
>>         at
>> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:103)
>>         at
>> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>>         at
>> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:102)
>>         at
>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
>>         at
>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
>>         at
>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
>>         at
>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
>>         at
>> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
>>         at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>         at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>         at java.base/java.lang.Thread.run(Thread.java:829)
>> Caused by: java.lang.NullPointerException
>>         at
>> org.apache.flink.kubernetes.utils.KubernetesUtils.checkJarFileForApplicationMode(KubernetesUtils.java:403)
>>         at
>> org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployApplicationCluster(KubernetesClusterDescriptor.java:207)
>>         at
>> org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
>>         at
>> org.apache.flink.kubernetes.operator.service.NativeFlinkService.deployApplicationCluster(NativeFlinkService.java:66)
>>         at
>> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:184)
>>         at
>> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:182)
>>         at
>> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:59)
>>         at
>> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:115)
>>         at
>> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:130)
>>         ... 13 more
>>
>

Re: flink-operator: jarURI seems mandatory

Posted by Weihua Hu <hu...@gmail.com>.
Hi, David

The jarURI is required[1], otherwise Flink doesn't know which jar should be
used.

If you are using application mode, you can set jarURI to
"local:///opt/flink/usrlib/your-job.jar", and the jar will not upload to
H/A storage.

Best,
Weihua


On Wed, Apr 5, 2023 at 5:51 PM David Causse <dc...@wikimedia.org> wrote:

> Hi,
>
> I'm trying to deploy a job (flink 1.16) with the flink-operator, the job
> jar is part of the image and placed under /opt/flink/usrlib.
> I thought that by placing the job jar there I could avoid setting the
> jarURI in the JobSpec but I'm getting a NPE (pasted at the end of this
> email) suggesting that this param is actually mandatory.
> Ultimately what I would like is to avoid pushing the job jar to the flink
> H/A storage since it's part of the image.
> I'm probably missing something regarding the use cases where jarURI can be
> omitted.
>
> Thanks!
>
> David.
>
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
> java.lang.NullPointerException
>         at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:142)
>         at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
>         at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:145)
>         at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:103)
>         at
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>         at
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:102)
>         at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
>         at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
>         at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
>         at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
>         at
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
>         at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.NullPointerException
>         at
> org.apache.flink.kubernetes.utils.KubernetesUtils.checkJarFileForApplicationMode(KubernetesUtils.java:403)
>         at
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployApplicationCluster(KubernetesClusterDescriptor.java:207)
>         at
> org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
>         at
> org.apache.flink.kubernetes.operator.service.NativeFlinkService.deployApplicationCluster(NativeFlinkService.java:66)
>         at
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:184)
>         at
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:182)
>         at
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:59)
>         at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:115)
>         at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:130)
>         ... 13 more
>