You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sharil Shafie <si...@gmail.com> on 2023/05/10 12:19:28 UTC

Flink Kubernetes Operator: Table Not Inserted When Using Parallel Job

Hi,

I use the Real Time Reporting with the Table API
<https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/try-flink/table_api/>
example to apply them in kubernetes by using flink kubernetes operator 1.4.

When I use the job parallelism equal to 2, the spend_report table won't be
inserted and kept empty. However, when I set parallelism to 1, the table
gets inserted.

The problem is that there is no exception (that I can find) that indicates
why the table is not inserted. The job is shown to be running fine, where
they are bytes and records received (refer to attached file). There is
however a difference - where there is an info on 'Low Watermark' for
setting parallelism to 1 and none when parallelism was 2.

I have done this for both mysql and postgres and got the same outcome.

I am fairly new to kubernetes and flink. Which part do I miss?

The relevant files are below:

   - SpendReport.java <https://pastebin.com/HUMhWqUM> (taken from flink
   playgrounds with modification)
   - deployment_with_job.yaml <https://pastebin.com/8BTyega7>(taken from
   example in Kubernetes Operator repo with modification).
   - Log when Parallelism = 2 <https://pastebin.com/U04QZg75>
   - Log when Parallelism = 1 <https://pastebin.com/L0CC24Ke>
   - Printscreen of task output for both settings (as attached).


Regards.

Re: Flink Kubernetes Operator: Table Not Inserted When Using Parallel Job

Posted by Sharil Shafie <si...@gmail.com>.
Update:

I haven't tried your first suggestion but I managed to get it working by
setting up the source idle timeout.

Thanks for your help.

Regards.


On Thu, May 11, 2023 at 12:34 PM Sharil Shafie <si...@gmail.com> wrote:

> Hi,
>
> I believe only one partition based on the log output below. I didn't make
> any changes to the DataGenerator from the flink playground.
>
> 2023-05-10 11:55:33,082 WARN
>  org.apache.kafka.clients.admin.AdminClientConfig             [] - These
> configurations '[key.deserializer, commit.offsets.on.checkpoint,
> value.deserializer, enable.auto.commit, client.id.prefix,
> partition.discovery.interval.ms, auto.offset.reset]' were supplied but
> are not used yet.
> 2023-05-10 11:55:33,091 INFO  org.apache.kafka.common.utils.AppInfoParser
>                  [] - Kafka version: 3.4.0
> 2023-05-10 11:55:33,093 INFO  org.apache.kafka.common.utils.AppInfoParser
>                  [] - Kafka commitId: 2e1947d240607d53
> 2023-05-10 11:55:33,094 INFO  org.apache.kafka.common.utils.AppInfoParser
>                  [] - Kafka startTimeMs: 1683719733083
> 2023-05-10 11:55:33,180 INFO
>  org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
> [] - Starting the KafkaSourceEnumerator for consumer group null without
> periodic partition discovery.
> 2023-05-10 11:55:33,682 INFO  org.apache.flink.configuration.Configuration
>                 [] - Config uses fallback configuration key
> 'kubernetes.service-account' instead of key
> 'kubernetes.taskmanager.service-account'
> 2023-05-10 11:55:34,074 INFO
>  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Creating
> new TaskManager pod with name flink-deploy-taskmanager-1-1 and resource
> <2048,1.0>.
> 2023-05-10 11:55:36,367 INFO
>  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> flink-deploy-taskmanager-1-1 is created.
> 2023-05-10 11:55:36,767 INFO
>  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: flink-deploy-taskmanager-1-1
> 2023-05-10 11:55:36,772 INFO
>  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker flink-deploy-taskmanager-1-1 with resource spec
> WorkerResourceSpec {cpuCores=1.0, taskHeapSize=537.600mb (563714445 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=158.720mb (166429984 bytes),
> managedMemSize=634.880mb (665719939 bytes), numSlots=4}.
> 2023-05-10 11:55:38,006 INFO
>  org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
> [] - Discovered new partitions: [transactions-0]
>
>
> Thanks for the suggestion. I'll have a look into that.
>
> Regards.
>
>
> On Thu, May 11, 2023 at 10:55 AM Weihua Hu <hu...@gmail.com> wrote:
>
>> Hi,
>>
>> How many partitions does your kafka topic have?
>>
>> One possibility is that the kafka topic has only one partition,
>> and when the source parallelism is set to 2, one of the source
>> tasks cannot consume data and generate the watermark, so
>> the downstream operator cannot align the watermark and cannot
>> produce the data. [1]
>>
>> You can check the record send of source sub-tasks.
>> If only one sub-task is outputting, you can set source idle timeout[2]
>> to avoid always waiting for the watermark.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-idle-timeout
>>
>> Best,
>> Weihua
>>
>>
>> On Wed, May 10, 2023 at 8:20 PM Sharil Shafie <si...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I use the Real Time Reporting with the Table API
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/try-flink/table_api/>
>>> example to apply them in kubernetes by using flink kubernetes operator 1.4.
>>>
>>> When I use the job parallelism equal to 2, the spend_report table won't
>>> be inserted and kept empty. However, when I set parallelism to 1, the table
>>> gets inserted.
>>>
>>> The problem is that there is no exception (that I can find) that
>>> indicates why the table is not inserted. The job is shown to be running
>>> fine, where they are bytes and records received (refer to attached file).
>>> There is however a difference - where there is an info on 'Low Watermark'
>>> for setting parallelism to 1 and none when parallelism was 2.
>>>
>>> I have done this for both mysql and postgres and got the same outcome.
>>>
>>> I am fairly new to kubernetes and flink. Which part do I miss?
>>>
>>> The relevant files are below:
>>>
>>>    - SpendReport.java <https://pastebin.com/HUMhWqUM> (taken from flink
>>>    playgrounds with modification)
>>>    - deployment_with_job.yaml <https://pastebin.com/8BTyega7>(taken
>>>    from example in Kubernetes Operator repo with modification).
>>>    - Log when Parallelism = 2 <https://pastebin.com/U04QZg75>
>>>    - Log when Parallelism = 1 <https://pastebin.com/L0CC24Ke>
>>>    - Printscreen of task output for both settings (as attached).
>>>
>>>
>>> Regards.
>>>
>>>

Re: Flink Kubernetes Operator: Table Not Inserted When Using Parallel Job

Posted by Sharil Shafie <si...@gmail.com>.
Hi,

I believe only one partition based on the log output below. I didn't make
any changes to the DataGenerator from the flink playground.

2023-05-10 11:55:33,082 WARN
 org.apache.kafka.clients.admin.AdminClientConfig             [] - These
configurations '[key.deserializer, commit.offsets.on.checkpoint,
value.deserializer, enable.auto.commit, client.id.prefix,
partition.discovery.interval.ms, auto.offset.reset]' were supplied but are
not used yet.
2023-05-10 11:55:33,091 INFO  org.apache.kafka.common.utils.AppInfoParser
               [] - Kafka version: 3.4.0
2023-05-10 11:55:33,093 INFO  org.apache.kafka.common.utils.AppInfoParser
               [] - Kafka commitId: 2e1947d240607d53
2023-05-10 11:55:33,094 INFO  org.apache.kafka.common.utils.AppInfoParser
               [] - Kafka startTimeMs: 1683719733083
2023-05-10 11:55:33,180 INFO
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
[] - Starting the KafkaSourceEnumerator for consumer group null without
periodic partition discovery.
2023-05-10 11:55:33,682 INFO  org.apache.flink.configuration.Configuration
                [] - Config uses fallback configuration key
'kubernetes.service-account' instead of key
'kubernetes.taskmanager.service-account'
2023-05-10 11:55:34,074 INFO
 org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Creating
new TaskManager pod with name flink-deploy-taskmanager-1-1 and resource
<2048,1.0>.
2023-05-10 11:55:36,367 INFO
 org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
flink-deploy-taskmanager-1-1 is created.
2023-05-10 11:55:36,767 INFO
 org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
new TaskManager pod: flink-deploy-taskmanager-1-1
2023-05-10 11:55:36,772 INFO
 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Requested worker flink-deploy-taskmanager-1-1 with resource spec
WorkerResourceSpec {cpuCores=1.0, taskHeapSize=537.600mb (563714445 bytes),
taskOffHeapSize=0 bytes, networkMemSize=158.720mb (166429984 bytes),
managedMemSize=634.880mb (665719939 bytes), numSlots=4}.
2023-05-10 11:55:38,006 INFO
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
[] - Discovered new partitions: [transactions-0]


Thanks for the suggestion. I'll have a look into that.

Regards.


On Thu, May 11, 2023 at 10:55 AM Weihua Hu <hu...@gmail.com> wrote:

> Hi,
>
> How many partitions does your kafka topic have?
>
> One possibility is that the kafka topic has only one partition,
> and when the source parallelism is set to 2, one of the source
> tasks cannot consume data and generate the watermark, so
> the downstream operator cannot align the watermark and cannot
> produce the data. [1]
>
> You can check the record send of source sub-tasks.
> If only one sub-task is outputting, you can set source idle timeout[2]
> to avoid always waiting for the watermark.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-idle-timeout
>
> Best,
> Weihua
>
>
> On Wed, May 10, 2023 at 8:20 PM Sharil Shafie <si...@gmail.com> wrote:
>
>> Hi,
>>
>> I use the Real Time Reporting with the Table API
>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/try-flink/table_api/>
>> example to apply them in kubernetes by using flink kubernetes operator 1.4.
>>
>> When I use the job parallelism equal to 2, the spend_report table won't
>> be inserted and kept empty. However, when I set parallelism to 1, the table
>> gets inserted.
>>
>> The problem is that there is no exception (that I can find) that
>> indicates why the table is not inserted. The job is shown to be running
>> fine, where they are bytes and records received (refer to attached file).
>> There is however a difference - where there is an info on 'Low Watermark'
>> for setting parallelism to 1 and none when parallelism was 2.
>>
>> I have done this for both mysql and postgres and got the same outcome.
>>
>> I am fairly new to kubernetes and flink. Which part do I miss?
>>
>> The relevant files are below:
>>
>>    - SpendReport.java <https://pastebin.com/HUMhWqUM> (taken from flink
>>    playgrounds with modification)
>>    - deployment_with_job.yaml <https://pastebin.com/8BTyega7>(taken from
>>    example in Kubernetes Operator repo with modification).
>>    - Log when Parallelism = 2 <https://pastebin.com/U04QZg75>
>>    - Log when Parallelism = 1 <https://pastebin.com/L0CC24Ke>
>>    - Printscreen of task output for both settings (as attached).
>>
>>
>> Regards.
>>
>>

Re: Flink Kubernetes Operator: Table Not Inserted When Using Parallel Job

Posted by Weihua Hu <hu...@gmail.com>.
Hi,

How many partitions does your kafka topic have?

One possibility is that the kafka topic has only one partition,
and when the source parallelism is set to 2, one of the source
tasks cannot consume data and generate the watermark, so
the downstream operator cannot align the watermark and cannot
produce the data. [1]

You can check the record send of source sub-tasks.
If only one sub-task is outputting, you can set source idle timeout[2]
to avoid always waiting for the watermark.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-idle-timeout

Best,
Weihua


On Wed, May 10, 2023 at 8:20 PM Sharil Shafie <si...@gmail.com> wrote:

> Hi,
>
> I use the Real Time Reporting with the Table API
> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/try-flink/table_api/>
> example to apply them in kubernetes by using flink kubernetes operator 1.4.
>
> When I use the job parallelism equal to 2, the spend_report table won't be
> inserted and kept empty. However, when I set parallelism to 1, the table
> gets inserted.
>
> The problem is that there is no exception (that I can find) that indicates
> why the table is not inserted. The job is shown to be running fine, where
> they are bytes and records received (refer to attached file). There is
> however a difference - where there is an info on 'Low Watermark' for
> setting parallelism to 1 and none when parallelism was 2.
>
> I have done this for both mysql and postgres and got the same outcome.
>
> I am fairly new to kubernetes and flink. Which part do I miss?
>
> The relevant files are below:
>
>    - SpendReport.java <https://pastebin.com/HUMhWqUM> (taken from flink
>    playgrounds with modification)
>    - deployment_with_job.yaml <https://pastebin.com/8BTyega7>(taken from
>    example in Kubernetes Operator repo with modification).
>    - Log when Parallelism = 2 <https://pastebin.com/U04QZg75>
>    - Log when Parallelism = 1 <https://pastebin.com/L0CC24Ke>
>    - Printscreen of task output for both settings (as attached).
>
>
> Regards.
>
>