You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Prasanna kumar <pr...@gmail.com> on 2020/05/28 17:52:45 UTC

Running and Maintaining Multiple Jobs

Hi,

I have a list of jobs that need to be run via flink.
For PoC we are implementing via JSON configuration file.
Sample JSON file
{
  "registryJobs": [
    { "inputTopic": "ProfileTable1",  "outputTopic": "Channel" },
    { "inputTopic": "Salestable", "outputTopic": "SalesChannel" },
    { "inputTopic": "billingsource", "outputTopic": "billing" },
    { "inputTopic": "costs", "outputTopic": "costschannel" },
    { "inputTopic": "leadsTable",  "outputTopic": "leadsChannel" },
  ]
}
But in Long run we do want to have this detail in a RDBMS.
There are many other properties for Job such as transformation,  filter ,
rules which would be captured in DB via UI.

Flink Supports Single Execution Environment. I ended up writing
JobGenerator Module which reads from this JSON and creates Jobs.

public static void Generate Jobs(Registry job, StreamExecutionEnvironment env) {

      Properties props = new Properties();
      props.put("bootstrap.servers", BOOTSTRAP_SERVER);
      props.put("client.id", "flink-example1");

      FlinkKafkaConsumer011 fkC = new
FlinkKafkaConsumer011<>(job.getInputTopic(),new SimpleStringSchema(),
props);

      DataStream<String> stream = env.addSource(fkC).name("Kafka: " +
job.getInputTopic());

      stream.map( SOMEMAPCODE );

      stream.addSink(new FlinkKafkaProducer011<>(job.getOutputTopic(),
new SimpleStringSchema(), props)).name("Kafka: " +
job.getOutputTopic());
   }


This created 5 tasks in a single Job and it is seen this way.

[image: Screen Shot 2020-05-28 at 11.15.32 PM.png]

Questions

1) Is this a good way to design ? We might end up having 500 - 1000 such
tasks in say 1 year down the lane. Or there is another way possible ?

2) We cannot afford downtime in our system. Say 5 tasks are pushed to
production. Say we need to add / update tasks later should we restart the
cluster with the new job and JAR ?

3) Now we have the job registry in files. Is it possible to read from the
DB directly and create the Jobs (DAG) dynamically without restarting it ?

Thanks,
Prasanna.

Re: Running and Maintaining Multiple Jobs

Posted by Yun Tang <my...@live.com>.
Hi Prasanna

As far as I know, Flink does not allow to submit new jobgraph without restarting it, and I actually not understand what's your 3rd question meaning.
________________________________
From: Prasanna kumar <pr...@gmail.com>
Sent: Friday, May 29, 2020 11:18
To: Yun Tang <my...@live.com>
Cc: user <us...@flink.apache.org>
Subject: Re: Running and Maintaining Multiple Jobs

Thanks Yun for your reply.

Your thoughts on the following too?

2) We cannot afford downtime in our system. Say 5 tasks are pushed to production. Say we need to add / update tasks later should we restart the cluster with the new job and JAR ?

3) Now we have the job registry in files. Is it possible to read from the DB directly and create the Jobs (DAG) dynamically without restarting it ?

Prasanna.


On Fri 29 May, 2020, 08:04 Yun Tang, <my...@live.com>> wrote:
Hi Prasanna

At year of 2018, Flink can only restart all tasks to recover the job. That's why you would found the answer that multiple jobs might be good. However, Flink supports to restart only affected pipeline instead of the whole job, a.k.a "region failover" after Flink-1.9, and make this failover strategy as default after Flink-1.10 [1].

In a nutshell, I think multiple pipelines could be acceptable now.


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#jobmanager-execution-failover-strategy

Best
Yun Tang
________________________________
From: Prasanna kumar <pr...@gmail.com>>
Sent: Friday, May 29, 2020 1:59
To: user <us...@flink.apache.org>>
Subject: Re: Running and Maintaining Multiple Jobs

Hi,

I also looked at this link. This says my approach is not good. Wanted to hear more on the same from the community.

https://stackoverflow.com/questions/52009948/multiple-jobs-or-multiple-pipelines-in-one-job-in-flink

Prasanna.

On Thu, May 28, 2020 at 11:22 PM Prasanna kumar <pr...@gmail.com>> wrote:
Hi,

I have a list of jobs that need to be run via flink.
For PoC we are implementing via JSON configuration file.
Sample JSON file
{
  "registryJobs": [
    { "inputTopic": "ProfileTable1",  "outputTopic": "Channel" },
    { "inputTopic": "Salestable", "outputTopic": "SalesChannel" },
    { "inputTopic": "billingsource", "outputTopic": "billing" },
    { "inputTopic": "costs", "outputTopic": "costschannel" },
    { "inputTopic": "leadsTable",  "outputTopic": "leadsChannel" },
  ]
}
But in Long run we do want to have this detail in a RDBMS.
There are many other properties for Job such as transformation,  filter , rules which would be captured in DB via UI.

Flink Supports Single Execution Environment. I ended up writing JobGenerator Module which reads from this JSON and creates Jobs.


public static void Generate Jobs(Registry job, StreamExecutionEnvironment env) {

      Properties props = new Properties();
      props.put("bootstrap.servers", BOOTSTRAP_SERVER);
      props.put("client.id<http://client.id>", "flink-example1");

      FlinkKafkaConsumer011 fkC = new FlinkKafkaConsumer011<>(job.getInputTopic(),new SimpleStringSchema(), props);

      DataStream<String> stream = env.addSource(fkC).name("Kafka: " + job.getInputTopic());

      stream.map( SOMEMAPCODE );

      stream.addSink(new FlinkKafkaProducer011<>(job.getOutputTopic(), new SimpleStringSchema(), props)).name("Kafka: " + job.getOutputTopic());
   }

This created 5 tasks in a single Job and it is seen this way.

[Screen Shot 2020-05-28 at 11.15.32 PM.png]

Questions

1) Is this a good way to design ? We might end up having 500 - 1000 such tasks in say 1 year down the lane. Or there is another way possible ?

2) We cannot afford downtime in our system. Say 5 tasks are pushed to production. Say we need to add / update tasks later should we restart the cluster with the new job and JAR ?

3) Now we have the job registry in files. Is it possible to read from the DB directly and create the Jobs (DAG) dynamically without restarting it ?

Thanks,
Prasanna.

Re: Running and Maintaining Multiple Jobs

Posted by Prasanna kumar <pr...@gmail.com>.
Thanks Yun for your reply.

Your thoughts on the following too?

2) We cannot afford downtime in our system. Say 5 tasks are pushed to
production. Say we need to add / update tasks later should we restart the
cluster with the new job and JAR ?

3) Now we have the job registry in files. Is it possible to read from the
DB directly and create the Jobs (DAG) dynamically without restarting it ?

Prasanna.


On Fri 29 May, 2020, 08:04 Yun Tang, <my...@live.com> wrote:

> Hi Prasanna
>
> At year of 2018, Flink can only restart all tasks to recover the job.
> That's why you would found the answer that multiple jobs might be good.
> However, Flink supports to restart only affected pipeline instead of the
> whole job, a.k.a "region failover" after Flink-1.9, and make this failover
> strategy as default after Flink-1.10 [1].
>
> In a nutshell, I think multiple pipelines could be acceptable now.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#jobmanager-execution-failover-strategy
>
> Best
> Yun Tang
> ------------------------------
> *From:* Prasanna kumar <pr...@gmail.com>
> *Sent:* Friday, May 29, 2020 1:59
> *To:* user <us...@flink.apache.org>
> *Subject:* Re: Running and Maintaining Multiple Jobs
>
> Hi,
>
> I also looked at this link. This says my approach is not good. Wanted to
> hear more on the same from the community.
>
>
> https://stackoverflow.com/questions/52009948/multiple-jobs-or-multiple-pipelines-in-one-job-in-flink
>
>
> Prasanna.
>
> On Thu, May 28, 2020 at 11:22 PM Prasanna kumar <
> prasannakumarramani@gmail.com> wrote:
>
> Hi,
>
> I have a list of jobs that need to be run via flink.
> For PoC we are implementing via JSON configuration file.
> Sample JSON file
> {
>   "registryJobs": [
>     { "inputTopic": "ProfileTable1",  "outputTopic": "Channel" },
>     { "inputTopic": "Salestable", "outputTopic": "SalesChannel" },
>     { "inputTopic": "billingsource", "outputTopic": "billing" },
>     { "inputTopic": "costs", "outputTopic": "costschannel" },
>     { "inputTopic": "leadsTable",  "outputTopic": "leadsChannel" },
>   ]
> }
> But in Long run we do want to have this detail in a RDBMS.
> There are many other properties for Job such as transformation,  filter ,
> rules which would be captured in DB via UI.
>
> Flink Supports Single Execution Environment. I ended up writing
> JobGenerator Module which reads from this JSON and creates Jobs.
>
> public static void Generate Jobs(Registry job, StreamExecutionEnvironment env) {
>
>       Properties props = new Properties();
>       props.put("bootstrap.servers", BOOTSTRAP_SERVER);
>       props.put("client.id", "flink-example1");
>
>       FlinkKafkaConsumer011 fkC = new FlinkKafkaConsumer011<>(job.getInputTopic(),new SimpleStringSchema(), props);
>
>       DataStream<String> stream = env.addSource(fkC).name("Kafka: " + job.getInputTopic());
>
>       stream.map( SOMEMAPCODE );
>
>       stream.addSink(new FlinkKafkaProducer011<>(job.getOutputTopic(), new SimpleStringSchema(), props)).name("Kafka: " + job.getOutputTopic());
>    }
>
>
> This created 5 tasks in a single Job and it is seen this way.
>
> [image: Screen Shot 2020-05-28 at 11.15.32 PM.png]
>
> Questions
>
> 1) Is this a good way to design ? We might end up having 500 - 1000 such
> tasks in say 1 year down the lane. Or there is another way possible ?
>
> 2) We cannot afford downtime in our system. Say 5 tasks are pushed to
> production. Say we need to add / update tasks later should we restart the
> cluster with the new job and JAR ?
>
> 3) Now we have the job registry in files. Is it possible to read from the
> DB directly and create the Jobs (DAG) dynamically without restarting it ?
>
> Thanks,
> Prasanna.
>
>

Re: Running and Maintaining Multiple Jobs

Posted by Yun Tang <my...@live.com>.
Hi Prasanna

At year of 2018, Flink can only restart all tasks to recover the job. That's why you would found the answer that multiple jobs might be good. However, Flink supports to restart only affected pipeline instead of the whole job, a.k.a "region failover" after Flink-1.9, and make this failover strategy as default after Flink-1.10 [1].

In a nutshell, I think multiple pipelines could be acceptable now.


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#jobmanager-execution-failover-strategy

Best
Yun Tang
________________________________
From: Prasanna kumar <pr...@gmail.com>
Sent: Friday, May 29, 2020 1:59
To: user <us...@flink.apache.org>
Subject: Re: Running and Maintaining Multiple Jobs

Hi,

I also looked at this link. This says my approach is not good. Wanted to hear more on the same from the community.

https://stackoverflow.com/questions/52009948/multiple-jobs-or-multiple-pipelines-in-one-job-in-flink

Prasanna.

On Thu, May 28, 2020 at 11:22 PM Prasanna kumar <pr...@gmail.com>> wrote:
Hi,

I have a list of jobs that need to be run via flink.
For PoC we are implementing via JSON configuration file.
Sample JSON file
{
  "registryJobs": [
    { "inputTopic": "ProfileTable1",  "outputTopic": "Channel" },
    { "inputTopic": "Salestable", "outputTopic": "SalesChannel" },
    { "inputTopic": "billingsource", "outputTopic": "billing" },
    { "inputTopic": "costs", "outputTopic": "costschannel" },
    { "inputTopic": "leadsTable",  "outputTopic": "leadsChannel" },
  ]
}
But in Long run we do want to have this detail in a RDBMS.
There are many other properties for Job such as transformation,  filter , rules which would be captured in DB via UI.

Flink Supports Single Execution Environment. I ended up writing JobGenerator Module which reads from this JSON and creates Jobs.


public static void Generate Jobs(Registry job, StreamExecutionEnvironment env) {

      Properties props = new Properties();
      props.put("bootstrap.servers", BOOTSTRAP_SERVER);
      props.put("client.id<http://client.id>", "flink-example1");

      FlinkKafkaConsumer011 fkC = new FlinkKafkaConsumer011<>(job.getInputTopic(),new SimpleStringSchema(), props);

      DataStream<String> stream = env.addSource(fkC).name("Kafka: " + job.getInputTopic());

      stream.map( SOMEMAPCODE );

      stream.addSink(new FlinkKafkaProducer011<>(job.getOutputTopic(), new SimpleStringSchema(), props)).name("Kafka: " + job.getOutputTopic());
   }

This created 5 tasks in a single Job and it is seen this way.

[Screen Shot 2020-05-28 at 11.15.32 PM.png]

Questions

1) Is this a good way to design ? We might end up having 500 - 1000 such tasks in say 1 year down the lane. Or there is another way possible ?

2) We cannot afford downtime in our system. Say 5 tasks are pushed to production. Say we need to add / update tasks later should we restart the cluster with the new job and JAR ?

3) Now we have the job registry in files. Is it possible to read from the DB directly and create the Jobs (DAG) dynamically without restarting it ?

Thanks,
Prasanna.

Re: Running and Maintaining Multiple Jobs

Posted by Prasanna kumar <pr...@gmail.com>.
Hi,

I also looked at this link. This says my approach is not good. Wanted to
hear more on the same from the community.

https://stackoverflow.com/questions/52009948/multiple-jobs-or-multiple-pipelines-in-one-job-in-flink


Prasanna.

On Thu, May 28, 2020 at 11:22 PM Prasanna kumar <
prasannakumarramani@gmail.com> wrote:

> Hi,
>
> I have a list of jobs that need to be run via flink.
> For PoC we are implementing via JSON configuration file.
> Sample JSON file
> {
>   "registryJobs": [
>     { "inputTopic": "ProfileTable1",  "outputTopic": "Channel" },
>     { "inputTopic": "Salestable", "outputTopic": "SalesChannel" },
>     { "inputTopic": "billingsource", "outputTopic": "billing" },
>     { "inputTopic": "costs", "outputTopic": "costschannel" },
>     { "inputTopic": "leadsTable",  "outputTopic": "leadsChannel" },
>   ]
> }
> But in Long run we do want to have this detail in a RDBMS.
> There are many other properties for Job such as transformation,  filter ,
> rules which would be captured in DB via UI.
>
> Flink Supports Single Execution Environment. I ended up writing
> JobGenerator Module which reads from this JSON and creates Jobs.
>
> public static void Generate Jobs(Registry job, StreamExecutionEnvironment env) {
>
>       Properties props = new Properties();
>       props.put("bootstrap.servers", BOOTSTRAP_SERVER);
>       props.put("client.id", "flink-example1");
>
>       FlinkKafkaConsumer011 fkC = new FlinkKafkaConsumer011<>(job.getInputTopic(),new SimpleStringSchema(), props);
>
>       DataStream<String> stream = env.addSource(fkC).name("Kafka: " + job.getInputTopic());
>
>       stream.map( SOMEMAPCODE );
>
>       stream.addSink(new FlinkKafkaProducer011<>(job.getOutputTopic(), new SimpleStringSchema(), props)).name("Kafka: " + job.getOutputTopic());
>    }
>
>
> This created 5 tasks in a single Job and it is seen this way.
>
> [image: Screen Shot 2020-05-28 at 11.15.32 PM.png]
>
> Questions
>
> 1) Is this a good way to design ? We might end up having 500 - 1000 such
> tasks in say 1 year down the lane. Or there is another way possible ?
>
> 2) We cannot afford downtime in our system. Say 5 tasks are pushed to
> production. Say we need to add / update tasks later should we restart the
> cluster with the new job and JAR ?
>
> 3) Now we have the job registry in files. Is it possible to read from the
> DB directly and create the Jobs (DAG) dynamically without restarting it ?
>
> Thanks,
> Prasanna.
>