You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@helix.apache.org by Yi Chen <yi...@airbnb.com> on 2018/11/03 18:24:33 UTC

Re: How to abstract "dynamic" tasks in the Task Framework?

Hi Hunter,

I was able to figure out the reason why my task code stopped working after
a few minutes: I was using the embedded controller which was not configured
right, and the controller stopped working. I have a few follow-up questions:

[1] Since my tasks are dynamic (since I spin up one task for each query
result from a dependency), does it mean I should not use the "recurrent
workflow"? My understanding is that the recurrent workflow is one
long-running workflow that repeat on the same data set.

[2] I am creating a one-off workflow with a regular interval, so that each
workflow include different task set depends on the upstream dependency. I
would like the workflow to be cleaned up before the next run. What is the
best practice for such case? I am using the following three lines to wait
for the termination of the workflow:
* driver.pollForWorkflowState(workflowName, TIMED_OUT, FAILED, STOPPED,
ABORTED, COMPLETED);
* driver.waitToStop(workflowName, 5000);
* driver.delete(workflowName, false);

I am getting many error messages like the following. I suspect this is
caused by a force deletion that's not clean?

2018-11-03 13:19:44 WARN  ZkClient:1164  Failed to delete path
> /jobserver-staging/PROPERTYSTORE/TaskRebalancer/ClusterMonitor_ListApplications!
> org.I0Itec.zkclient.exception.ZkException:
> org.apache.zookeeper.KeeperException$NotEmptyException: KeeperErrorCode =
> Directory not empty for
> /server-staging/PROPERTYSTORE/TaskRebalancer/ClusterMonitor_ListApplications


Thanks,
Yi

On Tue, Oct 16, 2018 at 12:01 AM Hunter Lee <na...@gmail.com> wrote:

> Hi Yi,
>
> Are you calling pollForWorkflowState for TaskState.COMPLETED? Note that if
> you have set up a generic workflow, it is going to end up in COMPLETED
> state, but if you have a JobQueue (a special kind of workflow that never
> dies), it will never get to the COMPLETED state. If you're not familiar how
> these two are different, send a quick note and I could try explain further.
> The HelixException you're getting is probably due to the polling API timing
> out.
>
> It all depends on how you set up a workflow, but I get a feeling that your
> first task might not be finishing. I do not see any blatant
> misunderstanding as far as how to use Task Framework from what you
> described, but if I were you, I'd try to see what state these workflows,
> jobs, and tasks are actually in.
>
> You could use ZooInspector to take a quick look at Helix internals. You
> connect to the ZooKeeper address, and in the Helix's directory structure,
> you want to look at /PROPERTYSTORE/TaskRebalancer/<your workflow or job
> name>. This is where what's so called workflow and job "contexts" are
> stored - contexts contain run-time details pertaining to workflows and
> jobs. Individual job states will be listed in WorkflowContext, and
> individual task states will be in the mapField of JobContext.
>
> Let me know if this is helpful,
> Hunter
>
> On Mon, Oct 15, 2018 at 4:51 PM Yi Chen <yi...@airbnb.com> wrote:
>
>> Hi Hunter,
>>
>> Thanks for the response. I am trying to get a "Hello World" example
>> working to test the Task Framework, by following a few unit tests in the
>> source code. Here is my setup:
>>
>> [1] In the same Helix cluster, I have two nodes that participant in
>> leader election, one of them becomes the leader. The leader is responsible
>> for starting and stoping the Task Framework workflows. Every few minutes,
>> the leader will contact an external API, to fetch a list of work items, and
>> start a workflow with one job, which contains one TaskConfig per work item.
>> [2] In the same Helix cluster, I added a few worker nodes, their sole
>> responsibility is to act as the Helix Participants to run the distributed
>> tasks, managed by the Task Framework.
>>
>> For some reason, I can only get the example working if there is only one
>> task in the workflow. If I have two or more tasks in the job, or if I have
>> multiple workflows, each contains one task, the workflows always get stuck
>> in the IN_PROGRESS state, causing HelixException when I run
>> driver.pollForWorkflowState(workflowName). Do you know what can be missing?
>>
>> Thanks,
>> Yi
>>
>> On Thu, Oct 11, 2018 at 3:26 PM Hunter Lee <na...@gmail.com> wrote:
>>
>>> Hi Yi -
>>>
>>> If I understand your use case correctly, ideally, you'd want the number
>>> of tasks in job_2 to be dynamic, and more specifically, the number of tasks
>>> in job_2 would directly depend on the result of job_1. Here are some ideas:
>>>
>>> 1. Using Task Framework user content store to enable dynamic submission
>>> of workflows.
>>> If you aren't familiar with Task Framework's user content store, it is a
>>> feature that allows the workflow/job/task logic to store key-value pairs,
>>> given the caveat that the lifecycle of the data corresponds to the
>>> lifecycle of the workflow/job/task (the API requires you to specify the
>>> scope).
>>> For example, have job_1 temporarily store the result and job_2's task
>>> logic could use the result to spawn single-tasked workflows.
>>>
>>> 2. If you are already modeling your databases as Helix generic
>>> resources, you could make your jobs targeted with the target resource name,
>>> partition states, and a command. For targeted jobs, tasks will be
>>> dynamically generated to target the said resources. However, if you're not
>>> already modeling your DBs as Helix resources, this might not be as
>>> straightforward as it sounds.
>>>
>>> Hope this helps and perhaps others could chime in as well if they have
>>> any ideas,
>>> Hunter
>>>
>>> On Thu, Oct 11, 2018 at 11:26 AM Yi Chen <yi...@airbnb.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am new to the Task Framework and need help understanding a few
>>>> concepts. What is the best practice for jobs with dependencies, while the
>>>> number of tasks also depend on the parent job?
>>>>
>>>> For example, the job_1 is to list all databases, and job_2 is to list
>>>> all tables for all databases found from the result of job_1. The workflow
>>>> examples I found either define the tasks statically, or starting a fixed
>>>> number of tasks for a job.
>>>>
>>>> If I understand correctly, since I don't know exactly how many tasks I
>>>> need in job_2, I should do my best guess and use a larger number as the
>>>> number of partitions. For example, when I start the workflow, I can
>>>> configure the job_2 to run 10 tasks, no matter how many databases exists.
>>>> If there are 100 databases exists as the result of job_1, Helix Task
>>>> Framework will somehow assign 5 databases to each task. Is this correct?
>>>>
>>>> Thanks,
>>>> Yi
>>>>
>>>

Re: How to abstract "dynamic" tasks in the Task Framework?

Posted by Hunter Lee <na...@gmail.com>.
Yi,

Glad you were able to figure things out.

[1] For recurrent workflows, think a cron job. Recurrence is in terms of
scheduling, but Task Framework does not require that the workflows be on
the same data sets (it makes no such guarantees). If you are modelling it
as a targeted job, then yes, you could say it works on the same targeted
resources every time it "recurs".
[2] The recommended way for workflows to be cleaned up is by setting the
"expiry" field to the workflow's config. After workflows hit terminal
states, this expiry will apply and after expiry, the Controller will
automatically remove the workflows.
Note that for workflows, terminal states (as in states that are final and
once in such states, workflows will never be in a different state) are [
TIMED_OUT, FAILED, COMPLETED ]. Stopped isn't terminal because it can be
resumed.

What you have implemented (the 3 lines) also should work, but it incurs the
cost of another thread that has to wait on the workflow statuses so expiry
might be the way to go.

Lastly, the warn log you see from ZkClient is mostly harmless. I do not
know what version you are using but a log fix has been checked in recently
and should be in the latest open source release of Helix. Note that
driver.delete(workflowName, false) is not a force deletion (since the flag
is set to false) and asynchronous, and the controller will get to it next
round of its pipeline run.

Hope this helps,
Hunter

On Sat, Nov 3, 2018 at 11:25 AM Yi Chen <yi...@airbnb.com> wrote:

> Hi Hunter,
>
> I was able to figure out the reason why my task code stopped working after
> a few minutes: I was using the embedded controller which was not configured
> right, and the controller stopped working. I have a few follow-up questions:
>
> [1] Since my tasks are dynamic (since I spin up one task for each query
> result from a dependency), does it mean I should not use the "recurrent
> workflow"? My understanding is that the recurrent workflow is one
> long-running workflow that repeat on the same data set.
>
> [2] I am creating a one-off workflow with a regular interval, so that each
> workflow include different task set depends on the upstream dependency. I
> would like the workflow to be cleaned up before the next run. What is the
> best practice for such case? I am using the following three lines to wait
> for the termination of the workflow:
> * driver.pollForWorkflowState(workflowName, TIMED_OUT, FAILED, STOPPED,
> ABORTED, COMPLETED);
> * driver.waitToStop(workflowName, 5000);
> * driver.delete(workflowName, false);
>
> I am getting many error messages like the following. I suspect this is
> caused by a force deletion that's not clean?
>
> 2018-11-03 13:19:44 WARN  ZkClient:1164  Failed to delete path
>> /jobserver-staging/PROPERTYSTORE/TaskRebalancer/ClusterMonitor_ListApplications!
>> org.I0Itec.zkclient.exception.ZkException:
>> org.apache.zookeeper.KeeperException$NotEmptyException: KeeperErrorCode =
>> Directory not empty for
>> /server-staging/PROPERTYSTORE/TaskRebalancer/ClusterMonitor_ListApplications
>
>
> Thanks,
> Yi
>
> On Tue, Oct 16, 2018 at 12:01 AM Hunter Lee <na...@gmail.com> wrote:
>
>> Hi Yi,
>>
>> Are you calling pollForWorkflowState for TaskState.COMPLETED? Note that
>> if you have set up a generic workflow, it is going to end up in COMPLETED
>> state, but if you have a JobQueue (a special kind of workflow that never
>> dies), it will never get to the COMPLETED state. If you're not familiar how
>> these two are different, send a quick note and I could try explain further.
>> The HelixException you're getting is probably due to the polling API timing
>> out.
>>
>> It all depends on how you set up a workflow, but I get a feeling that
>> your first task might not be finishing. I do not see any blatant
>> misunderstanding as far as how to use Task Framework from what you
>> described, but if I were you, I'd try to see what state these workflows,
>> jobs, and tasks are actually in.
>>
>> You could use ZooInspector to take a quick look at Helix internals. You
>> connect to the ZooKeeper address, and in the Helix's directory structure,
>> you want to look at /PROPERTYSTORE/TaskRebalancer/<your workflow or job
>> name>. This is where what's so called workflow and job "contexts" are
>> stored - contexts contain run-time details pertaining to workflows and
>> jobs. Individual job states will be listed in WorkflowContext, and
>> individual task states will be in the mapField of JobContext.
>>
>> Let me know if this is helpful,
>> Hunter
>>
>> On Mon, Oct 15, 2018 at 4:51 PM Yi Chen <yi...@airbnb.com> wrote:
>>
>>> Hi Hunter,
>>>
>>> Thanks for the response. I am trying to get a "Hello World" example
>>> working to test the Task Framework, by following a few unit tests in the
>>> source code. Here is my setup:
>>>
>>> [1] In the same Helix cluster, I have two nodes that participant in
>>> leader election, one of them becomes the leader. The leader is responsible
>>> for starting and stoping the Task Framework workflows. Every few minutes,
>>> the leader will contact an external API, to fetch a list of work items, and
>>> start a workflow with one job, which contains one TaskConfig per work item.
>>> [2] In the same Helix cluster, I added a few worker nodes, their sole
>>> responsibility is to act as the Helix Participants to run the distributed
>>> tasks, managed by the Task Framework.
>>>
>>> For some reason, I can only get the example working if there is only one
>>> task in the workflow. If I have two or more tasks in the job, or if I have
>>> multiple workflows, each contains one task, the workflows always get stuck
>>> in the IN_PROGRESS state, causing HelixException when I run
>>> driver.pollForWorkflowState(workflowName). Do you know what can be missing?
>>>
>>> Thanks,
>>> Yi
>>>
>>> On Thu, Oct 11, 2018 at 3:26 PM Hunter Lee <na...@gmail.com> wrote:
>>>
>>>> Hi Yi -
>>>>
>>>> If I understand your use case correctly, ideally, you'd want the number
>>>> of tasks in job_2 to be dynamic, and more specifically, the number of tasks
>>>> in job_2 would directly depend on the result of job_1. Here are some ideas:
>>>>
>>>> 1. Using Task Framework user content store to enable dynamic submission
>>>> of workflows.
>>>> If you aren't familiar with Task Framework's user content store, it is
>>>> a feature that allows the workflow/job/task logic to store key-value pairs,
>>>> given the caveat that the lifecycle of the data corresponds to the
>>>> lifecycle of the workflow/job/task (the API requires you to specify the
>>>> scope).
>>>> For example, have job_1 temporarily store the result and job_2's task
>>>> logic could use the result to spawn single-tasked workflows.
>>>>
>>>> 2. If you are already modeling your databases as Helix generic
>>>> resources, you could make your jobs targeted with the target resource name,
>>>> partition states, and a command. For targeted jobs, tasks will be
>>>> dynamically generated to target the said resources. However, if you're not
>>>> already modeling your DBs as Helix resources, this might not be as
>>>> straightforward as it sounds.
>>>>
>>>> Hope this helps and perhaps others could chime in as well if they have
>>>> any ideas,
>>>> Hunter
>>>>
>>>> On Thu, Oct 11, 2018 at 11:26 AM Yi Chen <yi...@airbnb.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am new to the Task Framework and need help understanding a few
>>>>> concepts. What is the best practice for jobs with dependencies, while the
>>>>> number of tasks also depend on the parent job?
>>>>>
>>>>> For example, the job_1 is to list all databases, and job_2 is to list
>>>>> all tables for all databases found from the result of job_1. The workflow
>>>>> examples I found either define the tasks statically, or starting a fixed
>>>>> number of tasks for a job.
>>>>>
>>>>> If I understand correctly, since I don't know exactly how many tasks I
>>>>> need in job_2, I should do my best guess and use a larger number as the
>>>>> number of partitions. For example, when I start the workflow, I can
>>>>> configure the job_2 to run 10 tasks, no matter how many databases exists.
>>>>> If there are 100 databases exists as the result of job_1, Helix Task
>>>>> Framework will somehow assign 5 databases to each task. Is this correct?
>>>>>
>>>>> Thanks,
>>>>> Yi
>>>>>
>>>>