You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by liuxiangcao <xi...@gmail.com> on 2022/11/08 03:55:24 UTC

support escaping `#` in flink job spec in Flink-operator

Hi,

We have a job that contains `#` as part of mainArgs and it used to work on
Ververica. Now we are switching to our own control plane to deploy to
flink-operaotor and the job started to fail due to the main args string
getting truncated at `#` character when passed to flink application. I
believe this is due to characters after `#` being interpreted as comments
in yaml file. To support having `#` in the mainArgs, the flink operator
needs to escape `#` when generating k8 yaml file.

Assuming the mainArgs contain '\"xyz#abc\".

Here is the stack-trace:
{"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
not parse value '\"xyz' *(Note: truncated by #)*

for key  '$internal.application.program-args'.\n\tat
org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused
by: *java.lang.IllegalArgumentException: Could not split string.
Quoting was not closed properly*.\n\tat
org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat
org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
java.base/java.util.Optional.map(Optional.java:265)\n\tat
org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t...
5 more\n"},"@version":1,"source_host":"xxxxxx","message":"Could not
create application
program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}


 Can someone take a look and help fixing this issue? or I can help fixing
this if someone can point me in the right direction.

-- 
Best Wishes & Regards
Shawn Xiangcao Liu

Re: support escaping `#` in flink job spec in Flink-operator

Posted by Maximilian Michels <mx...@apache.org>.
Taking a step back here: I think this needs to be handled in the
application mode in any case. Even if we had a better parser, it would
still treat # as a comment char. The application mode needs to be fixed to
come up with an escape scheme. YAML supports this via \# but that won't
work with our parser. So it needs to be something else. In the meantime, we
could at least add support for escapes in the configuration parser.

CC dev mailing list

-Max

On Tue, Nov 8, 2022 at 2:26 PM Maximilian Michels <mx...@apache.org> wrote:

> The job fails when starting because its arguments are passed through the
> Flink configuration in application deployment mode.
>
> >This is a known limit of the current Flink options parser. Refer to
> FLINK-15358[1] for more information.
>
> Exactly. The issue stems from the GlobalConfiguration#loadYAMLResource:
> https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L189
>
> It's indeed a long-standing issue. We could easily replace the parsing
> logic with a standard YAML parser, we even have Jackson with YAML support
> built into flink-core. However, I think we worry that this might be
> breaking some applications which rely on the lenient behavior of the
> existing parser.
>
> -Max
>
> On Tue, Nov 8, 2022 at 12:36 PM liuxiangcao <xi...@gmail.com>
> wrote:
>
>> Hi Yang,
>>
>> Do you think flink-conf not supporting `#` in FLINK-15358[1]  and Flink
>> job spec not supporting `#` are caused by some common code?   or maybe they
>> are in different code paths?  My first guess was they are in different
>> code paths. The flink-conf is parsed when starting the flink cluster while
>> job spec is parsed when starting the job application.
>>
>> On Tue, Nov 8, 2022 at 3:27 AM liuxiangcao <xi...@gmail.com>
>> wrote:
>>
>>> Hi Gyula,
>>>
>>> Thanks for getting back. Could you share how to submit job to
>>> flinkk8operator in json format?
>>>
>>> We use the java Fabric8 K8 client, which serializes java
>>> FlinkDeployment objects to CustomResource YAML (see the code snippet
>>> below).  Since `#` is considered a special character denoting comments in
>>> YAML,  it should be escaped properly when YAML file is generated. We are
>>> also reading into the code to see if we can identify the place for the fix.
>>>
>>> import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
>>> import org.apache.flink.kubernetes.operator.crd.FlinkDeploymentList;
>>> import
>>> io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
>>>
>>> FlinkDeployment deployment = xxxx;
>>> CustomResourceDefinitionContext context = xxx;
>>> DefaultKubernetesClient client = xxx;
>>>
>>>     client
>>>           .customResources(
>>>               context, FlinkDeployment.class, FlinkDeploymentList.class)
>>>           .inNamespace(xxx)
>>>           .withName(deploymentName)
>>>           .createOrReplace(deployment);
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Nov 8, 2022 at 2:41 AM Yang Wang <da...@gmail.com> wrote:
>>>
>>>> This is a known limit of the current Flink options parser. Refer to
>>>> FLINK-15358[1] for more information.
>>>>
>>>> [1]. https://issues.apache.org/jira/browse/FLINK-15358
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> Gyula Fóra <gy...@gmail.com> 于2022年11月8日周二 14:41写道:
>>>>
>>>>> It is also possible that this is a problem of the Flink native
>>>>> Kubernetes integration, we have to check where exactly it goes wrong before
>>>>> we try to fix it .
>>>>>
>>>>> We simply set the args into a Flink config and pass it to the native
>>>>> deployment logic in the operator.
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra <gy...@gmail.com> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> How do you submit your yaml?
>>>>>>
>>>>>> It’s possible that this is not operator problem. Did you try
>>>>>> submitting the deployment in json format instead?
>>>>>>
>>>>>> If it still doesn't work please open a JIRA ticket with the details
>>>>>> to reproduce and what you have tried :)
>>>>>>
>>>>>> Cheers
>>>>>> Gyula
>>>>>>
>>>>>> On Tue, 8 Nov 2022 at 04:56, liuxiangcao <xi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We have a job that contains `#` as part of mainArgs and it used to
>>>>>>> work on Ververica. Now we are switching to our own control plane to deploy
>>>>>>> to flink-operaotor and the job started to fail due to the main args string
>>>>>>> getting truncated at `#` character when passed to flink application. I
>>>>>>> believe this is due to characters after `#` being interpreted as comments
>>>>>>> in yaml file. To support having `#` in the mainArgs, the flink operator
>>>>>>> needs to escape `#` when generating k8 yaml file.
>>>>>>>
>>>>>>> Assuming the mainArgs contain '\"xyz#abc\".
>>>>>>>
>>>>>>> Here is the stack-trace:
>>>>>>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>>>>>>> not parse value '\"xyz' *(Note: truncated by #)*
>>>>>>>
>>>>>>> for key  '$internal.application.program-args'.\n\tat
>>>>>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>>>>>>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused by: *java.lang.IllegalArgumentException: Could not split string. Quoting was not closed properly*.\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat java.base/java.util.Optional.map(Optional.java:265)\n\tat org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t... 5 more\n"},"@version":1,"source_host":"xxxxxx","message":"Could not create application program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>>>>>>>
>>>>>>>
>>>>>>>  Can someone take a look and help fixing this issue? or I can help
>>>>>>> fixing this if someone can point me in the right direction.
>>>>>>>
>>>>>>> --
>>>>>>> Best Wishes & Regards
>>>>>>> Shawn Xiangcao Liu
>>>>>>>
>>>>>>
>>>
>>> --
>>> Best Wishes & Regards
>>> Shawn Xiangcao Liu
>>>
>>
>>
>> --
>> Best Wishes & Regards
>> Shawn Xiangcao Liu
>>
>

Re: support escaping `#` in flink job spec in Flink-operator

Posted by Maximilian Michels <mx...@apache.org>.
Taking a step back here: I think this needs to be handled in the
application mode in any case. Even if we had a better parser, it would
still treat # as a comment char. The application mode needs to be fixed to
come up with an escape scheme. YAML supports this via \# but that won't
work with our parser. So it needs to be something else. In the meantime, we
could at least add support for escapes in the configuration parser.

CC dev mailing list

-Max

On Tue, Nov 8, 2022 at 2:26 PM Maximilian Michels <mx...@apache.org> wrote:

> The job fails when starting because its arguments are passed through the
> Flink configuration in application deployment mode.
>
> >This is a known limit of the current Flink options parser. Refer to
> FLINK-15358[1] for more information.
>
> Exactly. The issue stems from the GlobalConfiguration#loadYAMLResource:
> https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L189
>
> It's indeed a long-standing issue. We could easily replace the parsing
> logic with a standard YAML parser, we even have Jackson with YAML support
> built into flink-core. However, I think we worry that this might be
> breaking some applications which rely on the lenient behavior of the
> existing parser.
>
> -Max
>
> On Tue, Nov 8, 2022 at 12:36 PM liuxiangcao <xi...@gmail.com>
> wrote:
>
>> Hi Yang,
>>
>> Do you think flink-conf not supporting `#` in FLINK-15358[1]  and Flink
>> job spec not supporting `#` are caused by some common code?   or maybe they
>> are in different code paths?  My first guess was they are in different
>> code paths. The flink-conf is parsed when starting the flink cluster while
>> job spec is parsed when starting the job application.
>>
>> On Tue, Nov 8, 2022 at 3:27 AM liuxiangcao <xi...@gmail.com>
>> wrote:
>>
>>> Hi Gyula,
>>>
>>> Thanks for getting back. Could you share how to submit job to
>>> flinkk8operator in json format?
>>>
>>> We use the java Fabric8 K8 client, which serializes java
>>> FlinkDeployment objects to CustomResource YAML (see the code snippet
>>> below).  Since `#` is considered a special character denoting comments in
>>> YAML,  it should be escaped properly when YAML file is generated. We are
>>> also reading into the code to see if we can identify the place for the fix.
>>>
>>> import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
>>> import org.apache.flink.kubernetes.operator.crd.FlinkDeploymentList;
>>> import
>>> io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
>>>
>>> FlinkDeployment deployment = xxxx;
>>> CustomResourceDefinitionContext context = xxx;
>>> DefaultKubernetesClient client = xxx;
>>>
>>>     client
>>>           .customResources(
>>>               context, FlinkDeployment.class, FlinkDeploymentList.class)
>>>           .inNamespace(xxx)
>>>           .withName(deploymentName)
>>>           .createOrReplace(deployment);
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Nov 8, 2022 at 2:41 AM Yang Wang <da...@gmail.com> wrote:
>>>
>>>> This is a known limit of the current Flink options parser. Refer to
>>>> FLINK-15358[1] for more information.
>>>>
>>>> [1]. https://issues.apache.org/jira/browse/FLINK-15358
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> Gyula Fóra <gy...@gmail.com> 于2022年11月8日周二 14:41写道:
>>>>
>>>>> It is also possible that this is a problem of the Flink native
>>>>> Kubernetes integration, we have to check where exactly it goes wrong before
>>>>> we try to fix it .
>>>>>
>>>>> We simply set the args into a Flink config and pass it to the native
>>>>> deployment logic in the operator.
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra <gy...@gmail.com> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> How do you submit your yaml?
>>>>>>
>>>>>> It’s possible that this is not operator problem. Did you try
>>>>>> submitting the deployment in json format instead?
>>>>>>
>>>>>> If it still doesn't work please open a JIRA ticket with the details
>>>>>> to reproduce and what you have tried :)
>>>>>>
>>>>>> Cheers
>>>>>> Gyula
>>>>>>
>>>>>> On Tue, 8 Nov 2022 at 04:56, liuxiangcao <xi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We have a job that contains `#` as part of mainArgs and it used to
>>>>>>> work on Ververica. Now we are switching to our own control plane to deploy
>>>>>>> to flink-operaotor and the job started to fail due to the main args string
>>>>>>> getting truncated at `#` character when passed to flink application. I
>>>>>>> believe this is due to characters after `#` being interpreted as comments
>>>>>>> in yaml file. To support having `#` in the mainArgs, the flink operator
>>>>>>> needs to escape `#` when generating k8 yaml file.
>>>>>>>
>>>>>>> Assuming the mainArgs contain '\"xyz#abc\".
>>>>>>>
>>>>>>> Here is the stack-trace:
>>>>>>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>>>>>>> not parse value '\"xyz' *(Note: truncated by #)*
>>>>>>>
>>>>>>> for key  '$internal.application.program-args'.\n\tat
>>>>>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>>>>>>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused by: *java.lang.IllegalArgumentException: Could not split string. Quoting was not closed properly*.\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat java.base/java.util.Optional.map(Optional.java:265)\n\tat org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t... 5 more\n"},"@version":1,"source_host":"xxxxxx","message":"Could not create application program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>>>>>>>
>>>>>>>
>>>>>>>  Can someone take a look and help fixing this issue? or I can help
>>>>>>> fixing this if someone can point me in the right direction.
>>>>>>>
>>>>>>> --
>>>>>>> Best Wishes & Regards
>>>>>>> Shawn Xiangcao Liu
>>>>>>>
>>>>>>
>>>
>>> --
>>> Best Wishes & Regards
>>> Shawn Xiangcao Liu
>>>
>>
>>
>> --
>> Best Wishes & Regards
>> Shawn Xiangcao Liu
>>
>

Re: support escaping `#` in flink job spec in Flink-operator

Posted by Maximilian Michels <mx...@apache.org>.
The job fails when starting because its arguments are passed through the
Flink configuration in application deployment mode.

>This is a known limit of the current Flink options parser. Refer to
FLINK-15358[1] for more information.

Exactly. The issue stems from the GlobalConfiguration#loadYAMLResource:
https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L189

It's indeed a long-standing issue. We could easily replace the parsing
logic with a standard YAML parser, we even have Jackson with YAML support
built into flink-core. However, I think we worry that this might be
breaking some applications which rely on the lenient behavior of the
existing parser.

-Max

On Tue, Nov 8, 2022 at 12:36 PM liuxiangcao <xi...@gmail.com> wrote:

> Hi Yang,
>
> Do you think flink-conf not supporting `#` in FLINK-15358[1]  and Flink
> job spec not supporting `#` are caused by some common code?   or maybe they
> are in different code paths?  My first guess was they are in different
> code paths. The flink-conf is parsed when starting the flink cluster while
> job spec is parsed when starting the job application.
>
> On Tue, Nov 8, 2022 at 3:27 AM liuxiangcao <xi...@gmail.com>
> wrote:
>
>> Hi Gyula,
>>
>> Thanks for getting back. Could you share how to submit job to
>> flinkk8operator in json format?
>>
>> We use the java Fabric8 K8 client, which serializes java FlinkDeployment objects
>> to CustomResource YAML (see the code snippet below).  Since `#` is
>> considered a special character denoting comments in YAML,  it should be
>> escaped properly when YAML file is generated. We are also reading into the
>> code to see if we can identify the place for the fix.
>>
>> import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
>> import org.apache.flink.kubernetes.operator.crd.FlinkDeploymentList;
>> import
>> io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
>>
>> FlinkDeployment deployment = xxxx;
>> CustomResourceDefinitionContext context = xxx;
>> DefaultKubernetesClient client = xxx;
>>
>>     client
>>           .customResources(
>>               context, FlinkDeployment.class, FlinkDeploymentList.class)
>>           .inNamespace(xxx)
>>           .withName(deploymentName)
>>           .createOrReplace(deployment);
>>
>>
>>
>>
>>
>> On Tue, Nov 8, 2022 at 2:41 AM Yang Wang <da...@gmail.com> wrote:
>>
>>> This is a known limit of the current Flink options parser. Refer to
>>> FLINK-15358[1] for more information.
>>>
>>> [1]. https://issues.apache.org/jira/browse/FLINK-15358
>>>
>>> Best,
>>> Yang
>>>
>>> Gyula Fóra <gy...@gmail.com> 于2022年11月8日周二 14:41写道:
>>>
>>>> It is also possible that this is a problem of the Flink native
>>>> Kubernetes integration, we have to check where exactly it goes wrong before
>>>> we try to fix it .
>>>>
>>>> We simply set the args into a Flink config and pass it to the native
>>>> deployment logic in the operator.
>>>>
>>>> Gyula
>>>>
>>>> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra <gy...@gmail.com> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> How do you submit your yaml?
>>>>>
>>>>> It’s possible that this is not operator problem. Did you try
>>>>> submitting the deployment in json format instead?
>>>>>
>>>>> If it still doesn't work please open a JIRA ticket with the details to
>>>>> reproduce and what you have tried :)
>>>>>
>>>>> Cheers
>>>>> Gyula
>>>>>
>>>>> On Tue, 8 Nov 2022 at 04:56, liuxiangcao <xi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We have a job that contains `#` as part of mainArgs and it used to
>>>>>> work on Ververica. Now we are switching to our own control plane to deploy
>>>>>> to flink-operaotor and the job started to fail due to the main args string
>>>>>> getting truncated at `#` character when passed to flink application. I
>>>>>> believe this is due to characters after `#` being interpreted as comments
>>>>>> in yaml file. To support having `#` in the mainArgs, the flink operator
>>>>>> needs to escape `#` when generating k8 yaml file.
>>>>>>
>>>>>> Assuming the mainArgs contain '\"xyz#abc\".
>>>>>>
>>>>>> Here is the stack-trace:
>>>>>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>>>>>> not parse value '\"xyz' *(Note: truncated by #)*
>>>>>>
>>>>>> for key  '$internal.application.program-args'.\n\tat
>>>>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>>>>>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused by: *java.lang.IllegalArgumentException: Could not split string. Quoting was not closed properly*.\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat java.base/java.util.Optional.map(Optional.java:265)\n\tat org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t... 5 more\n"},"@version":1,"source_host":"xxxxxx","message":"Could not create application program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>>>>>>
>>>>>>
>>>>>>  Can someone take a look and help fixing this issue? or I can help
>>>>>> fixing this if someone can point me in the right direction.
>>>>>>
>>>>>> --
>>>>>> Best Wishes & Regards
>>>>>> Shawn Xiangcao Liu
>>>>>>
>>>>>
>>
>> --
>> Best Wishes & Regards
>> Shawn Xiangcao Liu
>>
>
>
> --
> Best Wishes & Regards
> Shawn Xiangcao Liu
>

Re: support escaping `#` in flink job spec in Flink-operator

Posted by liuxiangcao <xi...@gmail.com>.
Hi Yang,

Do you think flink-conf not supporting `#` in FLINK-15358[1]  and Flink job
spec not supporting `#` are caused by some common code?   or maybe they are
in different code paths?  My first guess was they are in different code
paths. The flink-conf is parsed when starting the flink cluster while job
spec is parsed when starting the job application.

On Tue, Nov 8, 2022 at 3:27 AM liuxiangcao <xi...@gmail.com> wrote:

> Hi Gyula,
>
> Thanks for getting back. Could you share how to submit job to
> flinkk8operator in json format?
>
> We use the java Fabric8 K8 client, which serializes java FlinkDeployment objects
> to CustomResource YAML (see the code snippet below).  Since `#` is
> considered a special character denoting comments in YAML,  it should be
> escaped properly when YAML file is generated. We are also reading into the
> code to see if we can identify the place for the fix.
>
> import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
> import org.apache.flink.kubernetes.operator.crd.FlinkDeploymentList;
> import
> io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
>
> FlinkDeployment deployment = xxxx;
> CustomResourceDefinitionContext context = xxx;
> DefaultKubernetesClient client = xxx;
>
>     client
>           .customResources(
>               context, FlinkDeployment.class, FlinkDeploymentList.class)
>           .inNamespace(xxx)
>           .withName(deploymentName)
>           .createOrReplace(deployment);
>
>
>
>
>
> On Tue, Nov 8, 2022 at 2:41 AM Yang Wang <da...@gmail.com> wrote:
>
>> This is a known limit of the current Flink options parser. Refer to
>> FLINK-15358[1] for more information.
>>
>> [1]. https://issues.apache.org/jira/browse/FLINK-15358
>>
>> Best,
>> Yang
>>
>> Gyula Fóra <gy...@gmail.com> 于2022年11月8日周二 14:41写道:
>>
>>> It is also possible that this is a problem of the Flink native
>>> Kubernetes integration, we have to check where exactly it goes wrong before
>>> we try to fix it .
>>>
>>> We simply set the args into a Flink config and pass it to the native
>>> deployment logic in the operator.
>>>
>>> Gyula
>>>
>>> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra <gy...@gmail.com> wrote:
>>>
>>>> Hi!
>>>>
>>>> How do you submit your yaml?
>>>>
>>>> It’s possible that this is not operator problem. Did you try submitting
>>>> the deployment in json format instead?
>>>>
>>>> If it still doesn't work please open a JIRA ticket with the details to
>>>> reproduce and what you have tried :)
>>>>
>>>> Cheers
>>>> Gyula
>>>>
>>>> On Tue, 8 Nov 2022 at 04:56, liuxiangcao <xi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We have a job that contains `#` as part of mainArgs and it used to
>>>>> work on Ververica. Now we are switching to our own control plane to deploy
>>>>> to flink-operaotor and the job started to fail due to the main args string
>>>>> getting truncated at `#` character when passed to flink application. I
>>>>> believe this is due to characters after `#` being interpreted as comments
>>>>> in yaml file. To support having `#` in the mainArgs, the flink operator
>>>>> needs to escape `#` when generating k8 yaml file.
>>>>>
>>>>> Assuming the mainArgs contain '\"xyz#abc\".
>>>>>
>>>>> Here is the stack-trace:
>>>>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>>>>> not parse value '\"xyz' *(Note: truncated by #)*
>>>>>
>>>>> for key  '$internal.application.program-args'.\n\tat
>>>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>>>>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused by: *java.lang.IllegalArgumentException: Could not split string. Quoting was not closed properly*.\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat java.base/java.util.Optional.map(Optional.java:265)\n\tat org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t... 5 more\n"},"@version":1,"source_host":"xxxxxx","message":"Could not create application program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>>>>>
>>>>>
>>>>>  Can someone take a look and help fixing this issue? or I can help
>>>>> fixing this if someone can point me in the right direction.
>>>>>
>>>>> --
>>>>> Best Wishes & Regards
>>>>> Shawn Xiangcao Liu
>>>>>
>>>>
>
> --
> Best Wishes & Regards
> Shawn Xiangcao Liu
>


-- 
Best Wishes & Regards
Shawn Xiangcao Liu

Re: support escaping `#` in flink job spec in Flink-operator

Posted by liuxiangcao <xi...@gmail.com>.
Hi Gyula,

Thanks for getting back. Could you share how to submit job to
flinkk8operator in json format?

We use the java Fabric8 K8 client, which serializes java
FlinkDeployment objects
to CustomResource YAML (see the code snippet below).  Since `#` is
considered a special character denoting comments in YAML,  it should be
escaped properly when YAML file is generated. We are also reading into the
code to see if we can identify the place for the fix.

import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkDeploymentList;
import
io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;

FlinkDeployment deployment = xxxx;
CustomResourceDefinitionContext context = xxx;
DefaultKubernetesClient client = xxx;

    client
          .customResources(
              context, FlinkDeployment.class, FlinkDeploymentList.class)
          .inNamespace(xxx)
          .withName(deploymentName)
          .createOrReplace(deployment);





On Tue, Nov 8, 2022 at 2:41 AM Yang Wang <da...@gmail.com> wrote:

> This is a known limit of the current Flink options parser. Refer to
> FLINK-15358[1] for more information.
>
> [1]. https://issues.apache.org/jira/browse/FLINK-15358
>
> Best,
> Yang
>
> Gyula Fóra <gy...@gmail.com> 于2022年11月8日周二 14:41写道:
>
>> It is also possible that this is a problem of the Flink native Kubernetes
>> integration, we have to check where exactly it goes wrong before we try to
>> fix it .
>>
>> We simply set the args into a Flink config and pass it to the native
>> deployment logic in the operator.
>>
>> Gyula
>>
>> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra <gy...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> How do you submit your yaml?
>>>
>>> It’s possible that this is not operator problem. Did you try submitting
>>> the deployment in json format instead?
>>>
>>> If it still doesn't work please open a JIRA ticket with the details to
>>> reproduce and what you have tried :)
>>>
>>> Cheers
>>> Gyula
>>>
>>> On Tue, 8 Nov 2022 at 04:56, liuxiangcao <xi...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> We have a job that contains `#` as part of mainArgs and it used to work
>>>> on Ververica. Now we are switching to our own control plane to deploy to
>>>> flink-operaotor and the job started to fail due to the main args string
>>>> getting truncated at `#` character when passed to flink application. I
>>>> believe this is due to characters after `#` being interpreted as comments
>>>> in yaml file. To support having `#` in the mainArgs, the flink operator
>>>> needs to escape `#` when generating k8 yaml file.
>>>>
>>>> Assuming the mainArgs contain '\"xyz#abc\".
>>>>
>>>> Here is the stack-trace:
>>>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>>>> not parse value '\"xyz' *(Note: truncated by #)*
>>>>
>>>> for key  '$internal.application.program-args'.\n\tat
>>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>>>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused by: *java.lang.IllegalArgumentException: Could not split string. Quoting was not closed properly*.\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat java.base/java.util.Optional.map(Optional.java:265)\n\tat org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t... 5 more\n"},"@version":1,"source_host":"xxxxxx","message":"Could not create application program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>>>>
>>>>
>>>>  Can someone take a look and help fixing this issue? or I can help
>>>> fixing this if someone can point me in the right direction.
>>>>
>>>> --
>>>> Best Wishes & Regards
>>>> Shawn Xiangcao Liu
>>>>
>>>

-- 
Best Wishes & Regards
Shawn Xiangcao Liu

Re: support escaping `#` in flink job spec in Flink-operator

Posted by Yang Wang <da...@gmail.com>.
This is a known limit of the current Flink options parser. Refer to
FLINK-15358[1] for more information.

[1]. https://issues.apache.org/jira/browse/FLINK-15358

Best,
Yang

Gyula Fóra <gy...@gmail.com> 于2022年11月8日周二 14:41写道:

> It is also possible that this is a problem of the Flink native Kubernetes
> integration, we have to check where exactly it goes wrong before we try to
> fix it .
>
> We simply set the args into a Flink config and pass it to the native
> deployment logic in the operator.
>
> Gyula
>
> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra <gy...@gmail.com> wrote:
>
>> Hi!
>>
>> How do you submit your yaml?
>>
>> It’s possible that this is not operator problem. Did you try submitting
>> the deployment in json format instead?
>>
>> If it still doesn't work please open a JIRA ticket with the details to
>> reproduce and what you have tried :)
>>
>> Cheers
>> Gyula
>>
>> On Tue, 8 Nov 2022 at 04:56, liuxiangcao <xi...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> We have a job that contains `#` as part of mainArgs and it used to work
>>> on Ververica. Now we are switching to our own control plane to deploy to
>>> flink-operaotor and the job started to fail due to the main args string
>>> getting truncated at `#` character when passed to flink application. I
>>> believe this is due to characters after `#` being interpreted as comments
>>> in yaml file. To support having `#` in the mainArgs, the flink operator
>>> needs to escape `#` when generating k8 yaml file.
>>>
>>> Assuming the mainArgs contain '\"xyz#abc\".
>>>
>>> Here is the stack-trace:
>>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>>> not parse value '\"xyz' *(Note: truncated by #)*
>>>
>>> for key  '$internal.application.program-args'.\n\tat
>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused by: *java.lang.IllegalArgumentException: Could not split string. Quoting was not closed properly*.\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat java.base/java.util.Optional.map(Optional.java:265)\n\tat org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t... 5 more\n"},"@version":1,"source_host":"xxxxxx","message":"Could not create application program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>>>
>>>
>>>  Can someone take a look and help fixing this issue? or I can help
>>> fixing this if someone can point me in the right direction.
>>>
>>> --
>>> Best Wishes & Regards
>>> Shawn Xiangcao Liu
>>>
>>

Re: support escaping `#` in flink job spec in Flink-operator

Posted by Gyula Fóra <gy...@gmail.com>.
It is also possible that this is a problem of the Flink native Kubernetes
integration, we have to check where exactly it goes wrong before we try to
fix it .

We simply set the args into a Flink config and pass it to the native
deployment logic in the operator.

Gyula

On Tue, 8 Nov 2022 at 07:37, Gyula Fóra <gy...@gmail.com> wrote:

> Hi!
>
> How do you submit your yaml?
>
> It’s possible that this is not operator problem. Did you try submitting
> the deployment in json format instead?
>
> If it still doesn't work please open a JIRA ticket with the details to
> reproduce and what you have tried :)
>
> Cheers
> Gyula
>
> On Tue, 8 Nov 2022 at 04:56, liuxiangcao <xi...@gmail.com> wrote:
>
>> Hi,
>>
>> We have a job that contains `#` as part of mainArgs and it used to work
>> on Ververica. Now we are switching to our own control plane to deploy to
>> flink-operaotor and the job started to fail due to the main args string
>> getting truncated at `#` character when passed to flink application. I
>> believe this is due to characters after `#` being interpreted as comments
>> in yaml file. To support having `#` in the mainArgs, the flink operator
>> needs to escape `#` when generating k8 yaml file.
>>
>> Assuming the mainArgs contain '\"xyz#abc\".
>>
>> Here is the stack-trace:
>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>> not parse value '\"xyz' *(Note: truncated by #)*
>>
>> for key  '$internal.application.program-args'.\n\tat
>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused by: *java.lang.IllegalArgumentException: Could not split string. Quoting was not closed properly*.\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat java.base/java.util.Optional.map(Optional.java:265)\n\tat org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t... 5 more\n"},"@version":1,"source_host":"xxxxxx","message":"Could not create application program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>>
>>
>>  Can someone take a look and help fixing this issue? or I can help fixing
>> this if someone can point me in the right direction.
>>
>> --
>> Best Wishes & Regards
>> Shawn Xiangcao Liu
>>
>

Re: support escaping `#` in flink job spec in Flink-operator

Posted by Gyula Fóra <gy...@gmail.com>.
Hi!

How do you submit your yaml?

It’s possible that this is not operator problem. Did you try submitting the
deployment in json format instead?

If it still doesn't work please open a JIRA ticket with the details to
reproduce and what you have tried :)

Cheers
Gyula

On Tue, 8 Nov 2022 at 04:56, liuxiangcao <xi...@gmail.com> wrote:

> Hi,
>
> We have a job that contains `#` as part of mainArgs and it used to work on
> Ververica. Now we are switching to our own control plane to deploy to
> flink-operaotor and the job started to fail due to the main args string
> getting truncated at `#` character when passed to flink application. I
> believe this is due to characters after `#` being interpreted as comments
> in yaml file. To support having `#` in the mainArgs, the flink operator
> needs to escape `#` when generating k8 yaml file.
>
> Assuming the mainArgs contain '\"xyz#abc\".
>
> Here is the stack-trace:
> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
> not parse value '\"xyz' *(Note: truncated by #)*
>
> for key  '$internal.application.program-args'.\n\tat
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused by: *java.lang.IllegalArgumentException: Could not split string. Quoting was not closed properly*.\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat java.base/java.util.Optional.map(Optional.java:265)\n\tat org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t... 5 more\n"},"@version":1,"source_host":"xxxxxx","message":"Could not create application program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>
>
>  Can someone take a look and help fixing this issue? or I can help fixing
> this if someone can point me in the right direction.
>
> --
> Best Wishes & Regards
> Shawn Xiangcao Liu
>