You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by black chase <ch...@gmail.com> on 2019/05/23 15:41:56 UTC

How many task managers to launch for a job?

Hi,

I am redesigning the scheduler of the JobManager to place tasks of a job
across TaskManagers accroding to a scheduling policy.

I am reading the Flip-6 proposal and found that the common case is "one
TaskManager launchs one slot", and "one Flink cluster serves one job". But
I did not find how many TaskManagers to launch in a computing node. Is
there any common practice for this ?

-- 
Best Regards!
Pengcheng Duan

Re: How many task managers to launch for a job?

Posted by black chase <ch...@gmail.com>.
OK, I see. Thank you very much Song
Best Regards
Chase

On Mon, May 27, 2019 at 4:01 AM Xintong Song <to...@gmail.com> wrote:

> Well, it depends on how many resource are needed for one pipeline of you
> job and how many resource are are configured for each TaskExecutor. In
> addition, the resource of each TaskManager also depends on the job's
> resource needs and your environment. So having one slot for each
> TaskManager would be a simple choice because it avoids tuning these two
> relevant factors at the same time.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sat, May 25, 2019 at 4:54 AM black chase <ch...@gmail.com>
> wrote:
>
>> Hi Song,
>> You said "In that way, the total slots (or number of TaskManagers if you
>> config on slot for each TaskManager)", do you imply that one taskmanager
>> contains one slot?
>> Do you have some experience on how many slots to spawn for one
>> TaskManager?
>> I read the Flip-6, it says "For the sake of simplicity, the following
>> talks about “slots”, but one can think simply of “TaskManager” instead,
>> for the common case of a one-slot TaskManager.".
>> It seems the common practice is to have one slot for one taskmanager.
>>
>> Best,
>> Chase
>>
>>
>> On Fri, May 24, 2019 at 11:28 AM black chase <ch...@gmail.com>
>> wrote:
>>
>>> Yes true. I am trying to figure out how the TaskManagers are distributed
>>> across physical machines by Mesos and YARN. Maybe I shoud start a new
>>> thread for help.
>>> Thank you Song
>>> Best,
>>> Pengcheng
>>>
>>> On Fri, May 24, 2019 at 10:52 AM Xintong Song <to...@gmail.com>
>>> wrote:
>>>
>>>> As far as I know, Flink does not have any requirements on how the
>>>> TaskManagers are distributed across physical machines. So I think it really
>>>> depends on the scheduling policy of the Mesos cluster. I'm not an expert on
>>>> Mesos, so correct me if I was wrong.
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Fri, May 24, 2019 at 4:18 PM black chase <ch...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Song,
>>>>> Thank you for the clarification.
>>>>> Now I know TaskManagers are automatically allocated. Yet, I am still
>>>>> not very clear how the TMs are allocated.
>>>>> I'm guessing the allocation process would be:
>>>>> On the job side, I have a job with each operator parallelism=5. Since
>>>>> one TaskManager has one slot. It means this job would need 5 TaskManagers
>>>>> becase one pipeline would need one slot according to the task scheduling
>>>>> policy.
>>>>> On the Mesos side, let's say currently there are no available
>>>>> TaskManagers. Then Mesos would spawn 5 new TaskManagers for this job. Is it?
>>>>> If what I am guessing is right, then let's say we have 4 physical
>>>>> computing nodes for the Flink TaskManagers. How would Mesos place the 5 new
>>>>> TaskManagers into the 4 physical computing nodes? Is it juts a Round-Robin
>>>>> fashion?
>>>>> Kind regards
>>>>> Chase
>>>>>
>>>>>
>>>>>
>>>>> On Fri, May 24, 2019 at 4:10 AM Xintong Song <to...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi black,
>>>>>>
>>>>>> If you are running Flink on Yarn or Mesos, Flink will automatically
>>>>>> allocate resource and launch new TaskManagers as needed.
>>>>>>
>>>>>> If you are using Flink standalone mode, then the easiest way is to
>>>>>> enable slot sharing and set all the vertices into the same group (which is
>>>>>> by default). In that way, the total slots (or number of TaskManagers if you
>>>>>> config on slot for each TaskManager) needed for running the job would be
>>>>>> the maximum parallelism of the job graph vertices. Further information on
>>>>>> slot sharing could be found here
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html#task-slots-and-resources>
>>>>>> .
>>>>>>
>>>>>> Thank you~
>>>>>>
>>>>>> Xintong Song
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, May 23, 2019 at 11:49 PM black chase <ch...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am redesigning the scheduler of the JobManager to place tasks of a
>>>>>>> job across TaskManagers accroding to a scheduling policy.
>>>>>>>
>>>>>>> I am reading the Flip-6 proposal and found that the common case is
>>>>>>> "one TaskManager launchs one slot", and "one Flink cluster serves one job".
>>>>>>> But I did not find how many TaskManagers to launch in a computing node. Is
>>>>>>> there any common practice for this ?
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards!
>>>>>>> Pengcheng Duan
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards!
>>>>>
>>>>
>>>
>>> --
>>> Best Regards!
>>>
>>
>>
>> --
>> Best Regards!
>>
>

-- 
Best Regards!

Re: How many task managers to launch for a job?

Posted by Xintong Song <to...@gmail.com>.
Well, it depends on how many resource are needed for one pipeline of you
job and how many resource are are configured for each TaskExecutor. In
addition, the resource of each TaskManager also depends on the job's
resource needs and your environment. So having one slot for each
TaskManager would be a simple choice because it avoids tuning these two
relevant factors at the same time.

Thank you~

Xintong Song



On Sat, May 25, 2019 at 4:54 AM black chase <ch...@gmail.com> wrote:

> Hi Song,
> You said "In that way, the total slots (or number of TaskManagers if you
> config on slot for each TaskManager)", do you imply that one taskmanager
> contains one slot?
> Do you have some experience on how many slots to spawn for one
> TaskManager?
> I read the Flip-6, it says "For the sake of simplicity, the following
> talks about “slots”, but one can think simply of “TaskManager” instead,
> for the common case of a one-slot TaskManager.".
> It seems the common practice is to have one slot for one taskmanager.
>
> Best,
> Chase
>
>
> On Fri, May 24, 2019 at 11:28 AM black chase <ch...@gmail.com>
> wrote:
>
>> Yes true. I am trying to figure out how the TaskManagers are distributed
>> across physical machines by Mesos and YARN. Maybe I shoud start a new
>> thread for help.
>> Thank you Song
>> Best,
>> Pengcheng
>>
>> On Fri, May 24, 2019 at 10:52 AM Xintong Song <to...@gmail.com>
>> wrote:
>>
>>> As far as I know, Flink does not have any requirements on how the
>>> TaskManagers are distributed across physical machines. So I think it really
>>> depends on the scheduling policy of the Mesos cluster. I'm not an expert on
>>> Mesos, so correct me if I was wrong.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Fri, May 24, 2019 at 4:18 PM black chase <ch...@gmail.com>
>>> wrote:
>>>
>>>> Hi Song,
>>>> Thank you for the clarification.
>>>> Now I know TaskManagers are automatically allocated. Yet, I am still
>>>> not very clear how the TMs are allocated.
>>>> I'm guessing the allocation process would be:
>>>> On the job side, I have a job with each operator parallelism=5. Since
>>>> one TaskManager has one slot. It means this job would need 5 TaskManagers
>>>> becase one pipeline would need one slot according to the task scheduling
>>>> policy.
>>>> On the Mesos side, let's say currently there are no available
>>>> TaskManagers. Then Mesos would spawn 5 new TaskManagers for this job. Is it?
>>>> If what I am guessing is right, then let's say we have 4 physical
>>>> computing nodes for the Flink TaskManagers. How would Mesos place the 5 new
>>>> TaskManagers into the 4 physical computing nodes? Is it juts a Round-Robin
>>>> fashion?
>>>> Kind regards
>>>> Chase
>>>>
>>>>
>>>>
>>>> On Fri, May 24, 2019 at 4:10 AM Xintong Song <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi black,
>>>>>
>>>>> If you are running Flink on Yarn or Mesos, Flink will automatically
>>>>> allocate resource and launch new TaskManagers as needed.
>>>>>
>>>>> If you are using Flink standalone mode, then the easiest way is to
>>>>> enable slot sharing and set all the vertices into the same group (which is
>>>>> by default). In that way, the total slots (or number of TaskManagers if you
>>>>> config on slot for each TaskManager) needed for running the job would be
>>>>> the maximum parallelism of the job graph vertices. Further information on
>>>>> slot sharing could be found here
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html#task-slots-and-resources>
>>>>> .
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 23, 2019 at 11:49 PM black chase <ch...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am redesigning the scheduler of the JobManager to place tasks of a
>>>>>> job across TaskManagers accroding to a scheduling policy.
>>>>>>
>>>>>> I am reading the Flip-6 proposal and found that the common case is
>>>>>> "one TaskManager launchs one slot", and "one Flink cluster serves one job".
>>>>>> But I did not find how many TaskManagers to launch in a computing node. Is
>>>>>> there any common practice for this ?
>>>>>>
>>>>>> --
>>>>>> Best Regards!
>>>>>> Pengcheng Duan
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best Regards!
>>>>
>>>
>>
>> --
>> Best Regards!
>>
>
>
> --
> Best Regards!
>

Re: How many task managers to launch for a job?

Posted by black chase <ch...@gmail.com>.
Hi Song,
You said "In that way, the total slots (or number of TaskManagers if you
config on slot for each TaskManager)", do you imply that one taskmanager
contains one slot?
Do you have some experience on how many slots to spawn for one TaskManager?
I read the Flip-6, it says "For the sake of simplicity, the following talks
about “slots”, but one can think simply of “TaskManager” instead, for the
common case of a one-slot TaskManager.".
It seems the common practice is to have one slot for one taskmanager.

Best,
Chase


On Fri, May 24, 2019 at 11:28 AM black chase <ch...@gmail.com>
wrote:

> Yes true. I am trying to figure out how the TaskManagers are distributed
> across physical machines by Mesos and YARN. Maybe I shoud start a new
> thread for help.
> Thank you Song
> Best,
> Pengcheng
>
> On Fri, May 24, 2019 at 10:52 AM Xintong Song <to...@gmail.com>
> wrote:
>
>> As far as I know, Flink does not have any requirements on how the
>> TaskManagers are distributed across physical machines. So I think it really
>> depends on the scheduling policy of the Mesos cluster. I'm not an expert on
>> Mesos, so correct me if I was wrong.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, May 24, 2019 at 4:18 PM black chase <ch...@gmail.com>
>> wrote:
>>
>>> Hi Song,
>>> Thank you for the clarification.
>>> Now I know TaskManagers are automatically allocated. Yet, I am still not
>>> very clear how the TMs are allocated.
>>> I'm guessing the allocation process would be:
>>> On the job side, I have a job with each operator parallelism=5. Since
>>> one TaskManager has one slot. It means this job would need 5 TaskManagers
>>> becase one pipeline would need one slot according to the task scheduling
>>> policy.
>>> On the Mesos side, let's say currently there are no available
>>> TaskManagers. Then Mesos would spawn 5 new TaskManagers for this job. Is it?
>>> If what I am guessing is right, then let's say we have 4 physical
>>> computing nodes for the Flink TaskManagers. How would Mesos place the 5 new
>>> TaskManagers into the 4 physical computing nodes? Is it juts a Round-Robin
>>> fashion?
>>> Kind regards
>>> Chase
>>>
>>>
>>>
>>> On Fri, May 24, 2019 at 4:10 AM Xintong Song <to...@gmail.com>
>>> wrote:
>>>
>>>> Hi black,
>>>>
>>>> If you are running Flink on Yarn or Mesos, Flink will automatically
>>>> allocate resource and launch new TaskManagers as needed.
>>>>
>>>> If you are using Flink standalone mode, then the easiest way is to
>>>> enable slot sharing and set all the vertices into the same group (which is
>>>> by default). In that way, the total slots (or number of TaskManagers if you
>>>> config on slot for each TaskManager) needed for running the job would be
>>>> the maximum parallelism of the job graph vertices. Further information on
>>>> slot sharing could be found here
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html#task-slots-and-resources>
>>>> .
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Thu, May 23, 2019 at 11:49 PM black chase <ch...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am redesigning the scheduler of the JobManager to place tasks of a
>>>>> job across TaskManagers accroding to a scheduling policy.
>>>>>
>>>>> I am reading the Flip-6 proposal and found that the common case is
>>>>> "one TaskManager launchs one slot", and "one Flink cluster serves one job".
>>>>> But I did not find how many TaskManagers to launch in a computing node. Is
>>>>> there any common practice for this ?
>>>>>
>>>>> --
>>>>> Best Regards!
>>>>> Pengcheng Duan
>>>>>
>>>>
>>>
>>> --
>>> Best Regards!
>>>
>>
>
> --
> Best Regards!
>


-- 
Best Regards!

Re: How many task managers to launch for a job?

Posted by black chase <ch...@gmail.com>.
Yes true. I am trying to figure out how the TaskManagers are distributed
across physical machines by Mesos and YARN. Maybe I shoud start a new
thread for help.
Thank you Song
Best,
Pengcheng

On Fri, May 24, 2019 at 10:52 AM Xintong Song <to...@gmail.com> wrote:

> As far as I know, Flink does not have any requirements on how the
> TaskManagers are distributed across physical machines. So I think it really
> depends on the scheduling policy of the Mesos cluster. I'm not an expert on
> Mesos, so correct me if I was wrong.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, May 24, 2019 at 4:18 PM black chase <ch...@gmail.com>
> wrote:
>
>> Hi Song,
>> Thank you for the clarification.
>> Now I know TaskManagers are automatically allocated. Yet, I am still not
>> very clear how the TMs are allocated.
>> I'm guessing the allocation process would be:
>> On the job side, I have a job with each operator parallelism=5. Since one
>> TaskManager has one slot. It means this job would need 5 TaskManagers
>> becase one pipeline would need one slot according to the task scheduling
>> policy.
>> On the Mesos side, let's say currently there are no available
>> TaskManagers. Then Mesos would spawn 5 new TaskManagers for this job. Is it?
>> If what I am guessing is right, then let's say we have 4 physical
>> computing nodes for the Flink TaskManagers. How would Mesos place the 5 new
>> TaskManagers into the 4 physical computing nodes? Is it juts a Round-Robin
>> fashion?
>> Kind regards
>> Chase
>>
>>
>>
>> On Fri, May 24, 2019 at 4:10 AM Xintong Song <to...@gmail.com>
>> wrote:
>>
>>> Hi black,
>>>
>>> If you are running Flink on Yarn or Mesos, Flink will automatically
>>> allocate resource and launch new TaskManagers as needed.
>>>
>>> If you are using Flink standalone mode, then the easiest way is to
>>> enable slot sharing and set all the vertices into the same group (which is
>>> by default). In that way, the total slots (or number of TaskManagers if you
>>> config on slot for each TaskManager) needed for running the job would be
>>> the maximum parallelism of the job graph vertices. Further information on
>>> slot sharing could be found here
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html#task-slots-and-resources>
>>> .
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Thu, May 23, 2019 at 11:49 PM black chase <ch...@gmail.com>
>>> wrote:
>>>
>>>>
>>>> Hi,
>>>>
>>>> I am redesigning the scheduler of the JobManager to place tasks of a
>>>> job across TaskManagers accroding to a scheduling policy.
>>>>
>>>> I am reading the Flip-6 proposal and found that the common case is "one
>>>> TaskManager launchs one slot", and "one Flink cluster serves one job". But
>>>> I did not find how many TaskManagers to launch in a computing node. Is
>>>> there any common practice for this ?
>>>>
>>>> --
>>>> Best Regards!
>>>> Pengcheng Duan
>>>>
>>>
>>
>> --
>> Best Regards!
>>
>

-- 
Best Regards!

Re: How many task managers to launch for a job?

Posted by Xintong Song <to...@gmail.com>.
As far as I know, Flink does not have any requirements on how the
TaskManagers are distributed across physical machines. So I think it really
depends on the scheduling policy of the Mesos cluster. I'm not an expert on
Mesos, so correct me if I was wrong.

Thank you~

Xintong Song



On Fri, May 24, 2019 at 4:18 PM black chase <ch...@gmail.com> wrote:

> Hi Song,
> Thank you for the clarification.
> Now I know TaskManagers are automatically allocated. Yet, I am still not
> very clear how the TMs are allocated.
> I'm guessing the allocation process would be:
> On the job side, I have a job with each operator parallelism=5. Since one
> TaskManager has one slot. It means this job would need 5 TaskManagers
> becase one pipeline would need one slot according to the task scheduling
> policy.
> On the Mesos side, let's say currently there are no available
> TaskManagers. Then Mesos would spawn 5 new TaskManagers for this job. Is it?
> If what I am guessing is right, then let's say we have 4 physical
> computing nodes for the Flink TaskManagers. How would Mesos place the 5 new
> TaskManagers into the 4 physical computing nodes? Is it juts a Round-Robin
> fashion?
> Kind regards
> Chase
>
>
>
> On Fri, May 24, 2019 at 4:10 AM Xintong Song <to...@gmail.com>
> wrote:
>
>> Hi black,
>>
>> If you are running Flink on Yarn or Mesos, Flink will automatically
>> allocate resource and launch new TaskManagers as needed.
>>
>> If you are using Flink standalone mode, then the easiest way is to enable
>> slot sharing and set all the vertices into the same group (which is by
>> default). In that way, the total slots (or number of TaskManagers if you
>> config on slot for each TaskManager) needed for running the job would be
>> the maximum parallelism of the job graph vertices. Further information on
>> slot sharing could be found here
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html#task-slots-and-resources>
>> .
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, May 23, 2019 at 11:49 PM black chase <ch...@gmail.com>
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am redesigning the scheduler of the JobManager to place tasks of a job
>>> across TaskManagers accroding to a scheduling policy.
>>>
>>> I am reading the Flip-6 proposal and found that the common case is "one
>>> TaskManager launchs one slot", and "one Flink cluster serves one job". But
>>> I did not find how many TaskManagers to launch in a computing node. Is
>>> there any common practice for this ?
>>>
>>> --
>>> Best Regards!
>>> Pengcheng Duan
>>>
>>
>
> --
> Best Regards!
>

Re: How many task managers to launch for a job?

Posted by black chase <ch...@gmail.com>.
Hi Song,
Thank you for the clarification.
Now I know TaskManagers are automatically allocated. Yet, I am still not
very clear how the TMs are allocated.
I'm guessing the allocation process would be:
On the job side, I have a job with each operator parallelism=5. Since one
TaskManager has one slot. It means this job would need 5 TaskManagers
becase one pipeline would need one slot according to the task scheduling
policy.
On the Mesos side, let's say currently there are no available TaskManagers.
Then Mesos would spawn 5 new TaskManagers for this job. Is it?
If what I am guessing is right, then let's say we have 4 physical computing
nodes for the Flink TaskManagers. How would Mesos place the 5 new
TaskManagers into the 4 physical computing nodes? Is it juts a Round-Robin
fashion?
Kind regards
Chase



On Fri, May 24, 2019 at 4:10 AM Xintong Song <to...@gmail.com> wrote:

> Hi black,
>
> If you are running Flink on Yarn or Mesos, Flink will automatically
> allocate resource and launch new TaskManagers as needed.
>
> If you are using Flink standalone mode, then the easiest way is to enable
> slot sharing and set all the vertices into the same group (which is by
> default). In that way, the total slots (or number of TaskManagers if you
> config on slot for each TaskManager) needed for running the job would be
> the maximum parallelism of the job graph vertices. Further information on
> slot sharing could be found here
> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html#task-slots-and-resources>
> .
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, May 23, 2019 at 11:49 PM black chase <ch...@gmail.com>
> wrote:
>
>>
>> Hi,
>>
>> I am redesigning the scheduler of the JobManager to place tasks of a job
>> across TaskManagers accroding to a scheduling policy.
>>
>> I am reading the Flip-6 proposal and found that the common case is "one
>> TaskManager launchs one slot", and "one Flink cluster serves one job". But
>> I did not find how many TaskManagers to launch in a computing node. Is
>> there any common practice for this ?
>>
>> --
>> Best Regards!
>> Pengcheng Duan
>>
>

-- 
Best Regards!

Re: How many task managers to launch for a job?

Posted by Xintong Song <to...@gmail.com>.
Hi black,

If you are running Flink on Yarn or Mesos, Flink will automatically
allocate resource and launch new TaskManagers as needed.

If you are using Flink standalone mode, then the easiest way is to enable
slot sharing and set all the vertices into the same group (which is by
default). In that way, the total slots (or number of TaskManagers if you
config on slot for each TaskManager) needed for running the job would be
the maximum parallelism of the job graph vertices. Further information on
slot sharing could be found here
<https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html#task-slots-and-resources>
.

Thank you~

Xintong Song



On Thu, May 23, 2019 at 11:49 PM black chase <ch...@gmail.com>
wrote:

>
> Hi,
>
> I am redesigning the scheduler of the JobManager to place tasks of a job
> across TaskManagers accroding to a scheduling policy.
>
> I am reading the Flip-6 proposal and found that the common case is "one
> TaskManager launchs one slot", and "one Flink cluster serves one job". But
> I did not find how many TaskManagers to launch in a computing node. Is
> there any common practice for this ?
>
> --
> Best Regards!
> Pengcheng Duan
>

Re: Re: How can i just implement a crontab function using flink?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
I  tried。 But the  MyProcessWindowFunction still not tigged when there's no event in the window

Any insight on this?


source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Map>() {
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(System.currentTimeMillis() - 10000);
    }

    @Override
    public long extractTimestamp(Map map, long l) {
        return System.currentTimeMillis();
    }
}).windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction());



wanglei2@geekplus.com.cn
 
From: Puneet Kinra
Date: 2019-05-24 17:02
To: wanglei2@geekplus.com.cn
CC: user
Subject: Re: How can i just implement a crontab function using flink?
There is concept of periodic watermarker , you can use that
if you are working on eventtime.

On Fri, May 24, 2019 at 1:51 PM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:

I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But  i still need to execute the funtion.

How can i implement it ? 



wanglei2@geekplus.com.cn
 


-- 
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
e-mail :puneet.kinra@customercentria.com


Re: Unable to restore state value after job failed using RocksDBStateBackend

Posted by Simon Su <ba...@163.com>.
Hi Lei Wang


Actually it will not work, job recovery from checkpoint by using jobid to detect the snapshot directory, but when restart it in intellj and didn’t set any configurations, jobid will regenerate and it will regard as a new job, so you get the null state every time, you can follow by https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html


Thanks,
Simon


On 06/25/2019 21:43,wanglei2@geekplus.com.cn<wa...@geekplus.com.cn> wrote:


I  start and cancel it just in my intellij idea development environment.
    
First click the run button, then click the red stop button, and then click the run button again. 


Let me google about the savepoint.


Thanks,
Lei Wang




wanglei2@geekplus.com.cn
 
From: Stephan Ewen
Date: 2019-06-25 20:36
To: user
Subject: Re: Unable to restore state value after job failed using RocksDBStateBackend
If you manually cancel and restart the job, state is only carried forward if you use a savepoint.
Can you check if that is what you are doing?


On Tue, Jun 25, 2019 at 2:21 PM Simon Su <ba...@163.com> wrote:



Hi wanglei


 Can you post how you restart the job ? 


Thanks,
Simon
On 06/25/2019 20:11,wanglei2@geekplus.com.cn<wa...@geekplus.com.cn> wrote:
public class StateProcessTest extends KeyedProcessFunction<String, Tuple2<String, Long>, String> {

private transient ValueState<Tuple2<Long,Long>> state;
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {


        Tuple2<Long, Long> stateValue = state.value();

if(stateValue == null){
log.info("##########  initialize");
            stateValue = new Tuple2(34l,56l);
        }
state.update(stateValue);

    }

@Override
public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of(
new TypeHint<Tuple2<Long, Long>>() {}));
state = getRuntimeContext().getState(descriptor);
    }
}






Every time I restarted the job,   The stateValue is still null.




wanglei2@geekplus.com.cn
 

Re: Re: Unable to restore state value after job failed using RocksDBStateBackend

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
I  start and cancel it just in my intellij idea development environment.
    
First click the run button, then click the red stop button, and then click the run button again. 

Let me google about the savepoint.

Thanks,
Lei Wang




wanglei2@geekplus.com.cn
 
From: Stephan Ewen
Date: 2019-06-25 20:36
To: user
Subject: Re: Unable to restore state value after job failed using RocksDBStateBackend
If you manually cancel and restart the job, state is only carried forward if you use a savepoint.
Can you check if that is what you are doing?

On Tue, Jun 25, 2019 at 2:21 PM Simon Su <ba...@163.com> wrote:

Hi wanglei

 Can you post how you restart the job ? 

Thanks,
Simon
On 06/25/2019 20:11,wanglei2@geekplus.com.cn<wa...@geekplus.com.cn> wrote: 
public class StateProcessTest extends KeyedProcessFunction<String, Tuple2<String, Long>, String> {
    
    private transient ValueState<Tuple2<Long,Long>> state;
    public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {


        Tuple2<Long, Long> stateValue = state.value();

        if(stateValue == null){
            log.info("##########  initialize");
            stateValue = new Tuple2(34l,56l);
        }
        state.update(stateValue);
  
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of(
                new TypeHint<Tuple2<Long, Long>>() {}));
        state = getRuntimeContext().getState(descriptor);
    }
}



Every time I restarted the job,   The stateValue is still null.




wanglei2@geekplus.com.cn
 

Re: Unable to restore state value after job failed using RocksDBStateBackend

Posted by Stephan Ewen <se...@apache.org>.
If you manually cancel and restart the job, state is only carried forward
if you use a savepoint.
Can you check if that is what you are doing?

On Tue, Jun 25, 2019 at 2:21 PM Simon Su <ba...@163.com> wrote:

>
> Hi wanglei
>
>  Can you post how you restart the job ?
>
> Thanks,
> Simon
> On 06/25/2019 20:11,wanglei2@geekplus.com.cn<wa...@geekplus.com.cn>
> <wa...@geekplus.com.cn> wrote:
>
> public class StateProcessTest extends KeyedProcessFunction<String, Tuple2<String, Long>, String> {
>
>     private transient ValueState<Tuple2<Long,Long>> state;
>     public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {
>
>
>         Tuple2<Long, Long> stateValue = state.value();
>
>         if(stateValue == null){
>             log.info("##########  initialize");
>             stateValue = new Tuple2(34l,56l);
>         }
>         state.update(stateValue);
>
>     }
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of(
>                 new TypeHint<Tuple2<Long, Long>>() {}));
>         state = getRuntimeContext().getState(descriptor);
>     }
> }
>
>
>
>
> Every time I restarted the job,   The stateValue is still null.
>
>
> ------------------------------
> wanglei2@geekplus.com.cn
>
>
>
>

Re:Unable to restore state value after job failed using RocksDBStateBackend

Posted by Simon Su <ba...@163.com>.

Hi wanglei


 Can you post how you restart the job ? 


Thanks,
Simon
On 06/25/2019 20:11,wanglei2@geekplus.com.cn<wa...@geekplus.com.cn> wrote:
public class StateProcessTest extends KeyedProcessFunction<String, Tuple2<String, Long>, String> {

private transient ValueState<Tuple2<Long,Long>> state;
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {


        Tuple2<Long, Long> stateValue = state.value();

if(stateValue == null){
log.info("##########  initialize");
            stateValue = new Tuple2(34l,56l);
        }
state.update(stateValue);

    }

@Override
public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of(
new TypeHint<Tuple2<Long, Long>>() {}));
state = getRuntimeContext().getState(descriptor);
    }
}






Every time I restarted the job,   The stateValue is still null.




wanglei2@geekplus.com.cn
 

Unable to restore state value after job failed using RocksDBStateBackend

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
public class StateProcessTest extends KeyedProcessFunction<String, Tuple2<String, Long>, String> {
    
    private transient ValueState<Tuple2<Long,Long>> state;
    public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {


        Tuple2<Long, Long> stateValue = state.value();

        if(stateValue == null){
            log.info("##########  initialize");
            stateValue = new Tuple2(34l,56l);
        }
        state.update(stateValue);
  
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of(
                new TypeHint<Tuple2<Long, Long>>() {}));
        state = getRuntimeContext().getState(descriptor);
    }
}



Every time I restarted the job,   The stateValue is still null.




wanglei2@geekplus.com.cn
 

Re: How to trigger the window function even there's no message input in this window?

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

As far as I know, this is currently impossible.

You can workaround this issue by maybe implementing your own custom post processing operator/flatMap function, that would:
- track the output of window operator
- register processing time timer with some desired timeout
- every time the processing time timer fires, your code would check if window operator has emitted something in the last X seconds period. If not, it could emit some default element

Piotrek

> On 14 Jun 2019, at 12:08, wanglei2@geekplus.com.cn wrote:
> 
> 
> windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction());
> How can i trigger the MyProcessWindowFunction even there's no input during this window time? 
> 
> wanglei2@geekplus.com.cn <ma...@geekplus.com.cn>

How to trigger the window function even there's no message input in this window?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction());How can i trigger the MyProcessWindowFunction even there's no input during this window time? 



wanglei2@geekplus.com.cn


Re: Re: How can I add config file as classpath in taskmgr node when submitting a flink job?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Thanks. Let me have a try


wanglei2@geekplus.com.cn
 
From: Yang Wang
Date: 2019-05-28 09:47
To: wanglei2@geekplus.com.cn
CC: user
Subject: Re: How can I add config file as classpath in taskmgr node when submitting a flink job?
Hi, wangleiYou could use the flink distributed cache to register some config files and then access them in your task.1. Register a cached fileStreamExecutionEnvironment.registerCachedFile(inputFile.toString(), "test_data", false);2. Access the file in your taskfinal Path testFile = getRuntimeContext().getDistributedCache().getFile("test_data").toPath();

wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> 于2019年5月26日周日 上午12:06写道:

When starting  a single node java application, I can add some config file to it.

How can i implenment it when submitting a flink job? The config file need to be read from taskMgr node and used to initialize some classess.





wanglei2@geekplus.com.cn

Re: How can I add config file as classpath in taskmgr node when submitting a flink job?

Posted by Yang Wang <da...@gmail.com>.
Hi, wanglei

You could use the flink distributed cache to register some config
files and then access them in your task.

1. Register a cached file

StreamExecutionEnvironment.registerCachedFile(inputFile.toString(),
"test_data", false);

2. Access the file in your task

final Path testFile =
getRuntimeContext().getDistributedCache().getFile("test_data").toPath();


wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> 于2019年5月26日周日 上午12:06写道:

>
> When starting  a single node java application, I can add some config file
> to it.
>
> How can i implenment it when submitting a flink job? The config file need
> to be read from taskMgr node and used to initialize some classess.
>
>
>
> ------------------------------
> wanglei2@geekplus.com.cn
>

How can I add config file as classpath in taskmgr node when submitting a flink job?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
When starting  a single node java application, I can add some config file to it.

How can i implenment it when submitting a flink job? The config file need to be read from taskMgr node and used to initialize some classess.





wanglei2@geekplus.com.cn

Re: Re: How can i just implement a crontab function using flink?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Thanks. got it 



wanglei2@geekplus.com.cn
 
From: Puneet Kinra
Date: 2019-05-24 17:02
To: wanglei2@geekplus.com.cn
CC: user
Subject: Re: How can i just implement a crontab function using flink?
There is concept of periodic watermarker , you can use that
if you are working on eventtime.

On Fri, May 24, 2019 at 1:51 PM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:

I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But  i still need to execute the funtion.

How can i implement it ? 



wanglei2@geekplus.com.cn
 


-- 
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
e-mail :puneet.kinra@customercentria.com


Re: How can i just implement a crontab function using flink?

Posted by Puneet Kinra <pu...@customercentria.com>.
There is concept of periodic watermarker , you can use that
if you are working on eventtime.

On Fri, May 24, 2019 at 1:51 PM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:

>
> I want to do something every one minute.
>
> Using TumblingWindow, the function will not be triigged if there's no
> message received during this minute. But  i still need to execute the
> funtion.
>
> How can i implement it ?
>
> ------------------------------
> wanglei2@geekplus.com.cn
>
>
>
>

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
<pu...@customercentria.com>*

*e-mail :puneet.kinra@customercentria.com
<pu...@customercentria.com>*

Re: Re: How can i just implement a crontab function using flink?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Thanks, it's a alternative solution.



wanglei2@geekplus.com.cn
 
From: Jörn Franke
Date: 2019-05-24 16:31
To: wanglei2@geekplus.com.cn
CC: user
Subject: Re: How can i just implement a crontab function using flink?
Just sent a dummy event from the source system every minute

Am 24.05.2019 um 10:20 schrieb "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>:


I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But  i still need to execute the funtion.

How can i implement it ? 



wanglei2@geekplus.com.cn
 

Re: How can i just implement a crontab function using flink?

Posted by Jörn Franke <jo...@gmail.com>.
Just sent a dummy event from the source system every minute

> Am 24.05.2019 um 10:20 schrieb "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>:
> 
> 
> I want to do something every one minute.
> 
> Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But  i still need to execute the funtion.
> 
> How can i implement it ? 
> 
> wanglei2@geekplus.com.cn
>  

How can i just implement a crontab function using flink?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But  i still need to execute the funtion.

How can i implement it ? 



wanglei2@geekplus.com.cn