You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/07/27 17:24:27 UTC

[GitHub] [incubator-seatunnel] ic4y opened a new issue, #2279: [ST-Engine][Design] TaskExecutionService and Task related design

ic4y opened a new issue, #2279:
URL: https://github.com/apache/incubator-seatunnel/issues/2279

   ### Search before asking
   
   - [X] I had searched in the [feature](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement.
   
   
   ### Description
   
   TaskExecutionServer is a service that executes Tasks and will run an instance on each node. It receives the TaskGroup from the JobMaster and runs the Task in it. And maintain TaskID->TaskContext, and the specific operations on Task are encapsulated in TaskContext. And Task holds OperationService internally, which means that Task can remotely call and communicate with other Tasks or JobMaster through OperationService.
   
   TaskGroup design :
        The tasks in a TaskGroup all run on the same node.
   <img width="432" alt="image" src="https://user-images.githubusercontent.com/83933160/181298022-3cd6a80a-3c14-427c-a19b-48cbc1e7c0d8.png">
   
   An optimization point: 
   	The data channel between tasks within the same TaskGroup uses a local Queue. And the data channel between different TaskGroups may use a distributed Queue (hazalcast Ringbuffer) because it may be executed on different nodes.
   
   Task design:
           One of the most important methods of Task is call(), and the executor drives the operation of Task by calling call() of Task. call() will have a return value of ProgressState, through which the executor can determine whether the Task has ended and whether it needs to continue to call call(). As follows.
             
   <img width="301" alt="image" src="https://user-images.githubusercontent.com/83933160/181298463-596825a0-3cd3-49fc-9fe4-e52ee3dcf27e.png">
   
    Thread Share optimization: 
          Thread Share Background: In the scenario where a large number of small tasks are synchronized, a large number of tasks will be generated. If each Task is responsible for one thread, it will waste resources by running a large number of threads. At this time, if one thread can run multiple Tasks, this situation will be greatly improved. But how can one thread execute multiple tasks at the same time? Because the Task is internally driven by calling Call() again and again, a thread can call Call() of all Tasks it is responsible for in turn. As follows.
   
   <img width="337" alt="image" src="https://user-images.githubusercontent.com/83933160/181300464-3363ef7c-b261-4f35-a5af-b44f45b52b63.png">
   
   
   This will also bring a problem, that is, if the call() execution time of a task is very long. In this way, this thread will be used all the time, causing the delay of other tasks to be very serious.
   
   For such a problem, I temporarily think of the following two optimization solutions:
   
   Option1:  Marking Thread Share
   
   Provide an marking on the Task, and mark this Task to support Thread Share. In the specific implementation of the task, marking whether the task supports thread sharing. Tasks that can be shared will share a thread for execution, and tasks that cannot be shared will be executed exclusively by a thread.
   
    Whether the Task supports thread sharing is evaluated by the specific implementer of the Task. According to the execution time of the Call method, if the execution implementation of the Call method is all at the ms level, then the Task can be marked as supporting thread sharing.
   
   Option2: Dynamic Thread Share
   
   There is a fundamental problem with the above solution one, that is, the execution time of the Call method is often not fixed, and the Task itself is not very clear about the calling time of its Call() method. Because different stages, different amounts of data, etc. will affect the execution time of Call(). It is not very appropriate for such a Task to be marked as supporting shared threads or not. Because if a thread is marked as a shareable thread, if the execution time of a call to the Call method is very long, this will cause the delay of other tasks that share the current thread to be very high. If sharing is not supported, the problem of resource waste is still not solved.
   
    So the task thread sharing can be made dynamic, and a group of tasks is executed by a thread pool (the number of tasks >> the number of threads). During the execution of thread1, if the execution time of call() of Task1 exceeds the set value (100ms), a thread thread2 will be taken out from the thread pool to execute the Call method of the next Task2. It is guaranteed that the delay of other tasks will not be too high due to the long execution time of Task1. When the call method of Task2 is executed normally within the timeout period, it will put Task2 back at the end of the task queue, and thread2 will continue to take out Task3 from the task queue to execute the Call method. When the call method of Task1 is executed, thread1 will be put back into the thread pool, and Task1 will be marked as timed out once. When a certain task's Call method executes timeout times reaches a certain limit, the task will be removed from the shared thread task queue, and a thread will be used exclusi
 vely.
   
   The related execution process is as follows:
   
   <img width="1117" alt="image" src="https://user-images.githubusercontent.com/83933160/181309581-7b8e077e-0c83-445d-908a-a4646eaecf2e.png">
   <img width="1124" alt="image" src="https://user-images.githubusercontent.com/83933160/181309620-01277e29-bcb4-4cf4-93d9-fde0477c2bee.png">
   <img width="1092" alt="image" src="https://user-images.githubusercontent.com/83933160/181309640-a2fb6a04-b2e2-40d8-81db-9517f7f4a502.png">
   <img width="1084" alt="image" src="https://user-images.githubusercontent.com/83933160/181309673-6899c9ec-ba39-4ff2-b721-5dd9810073c9.png">
   <img width="1078" alt="image" src="https://user-images.githubusercontent.com/83933160/181309694-5561b05b-5bd9-40ca-92d5-21f88a1dc8a1.png">
   <img width="1158" alt="image" src="https://user-images.githubusercontent.com/83933160/181309745-87e04a38-3a38-42ef-9e46-77d71695f28d.png">
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   ### Usage Scenario
   
   _No response_
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on issue #2279: [ST-Engine][Design] TaskExecutionService and Task related design

Posted by GitBox <gi...@apache.org>.
ic4y commented on issue #2279:
URL: https://github.com/apache/incubator-seatunnel/issues/2279#issuecomment-1197739548

   @mosence Thanks for your comment
   
   1. Is there a timeout design for the thread exclusive to task1 that times out?
          Exclusive threads are designed without timeouts, and the call method can run for any length of time. Because the Task may be a real-time task or an offline task.
          `If not: how to design overall execution execution timeout failure? `I don't understand what this means, please explain in detail.
   
   2. Is the exclusive thread separated from the original thread pool?
          This problem has not yet been specifically designed. Whether it is separated from the thread pool or not, I don’t think it will cause the task to time out, because the thread pool used here does not limit the number of threads, similar to `cachedThreadPool`, because multiple tasks may appear during the execution process. The execution time of the Call method timed out. As for how to limit the number of threads, I think the number of threads should be indirectly limited by limiting the number of tasks running.
          The management of the exclusive thread can be managed separately, and the thread will be automatically released after the execution of the task that the thread is responsible for is completed.
   
   ————————————————————————————————————
   
   1、超时的task1独享的线程是否有超时设计?
         独享线程是没有超时设计的,call方法可以运行任意时长。应为Task可能是实时任务或者离线任务。
         `如果没有:如何设计整体执行执行超时失败?`这个没有理解到什么意思,麻烦详细讲一下
   
   
   2、独享的线程是从原来的线程池里分离出来的么?
         这个问题目前还没具体设计,是不是从线程池里分离出来我认为都不能导致任务超时,因为这里线程池是不限制线程数量类似于`cachedThreadPool`,因为执行过程中可能出现多个Task Call方法执行时间超时。至于如何显示线程的数量,我认为是应该通过限制Task的运行数量来间接限制线程的数量。
         独享线程的管理可以单独管理,当线程负责的Task执行完成后会自动释放线程。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on issue #2279: [ST-Engine][Design] TaskExecutionService and Task related design

Posted by GitBox <gi...@apache.org>.
ic4y commented on issue #2279:
URL: https://github.com/apache/incubator-seatunnel/issues/2279#issuecomment-1197947780

   @mosence 
   
   I understand the exact opposite
         I think in the offline task scenario, the task is divided into many tasks. These tasks are executed in batches with a fixed number of threads, which is the most efficient way, because a task will be executed by one thread, and there will be no thread scheduling, switching and waiting during the execution process.
   
   In real-time synchronization scenarios (unbounded data processing), shared threads can bring optimization. For example, in the scenario of real-time synchronization of a large number of small tables (thousands of thousands), in fact, the amount of data will not be very large, but the number of tasks will be very large, resulting in a large number of tasks. At this time, it is definitely not cost-effective for a task to start a thread, and java does not Good coroutine implementation, so sharing threads is an optimized way.
   
   You mentioned that there must be at least one process that continuously ingests data. I don’t think this is necessary, because the design of Task is to drive the execution by calling the Call method again and again, and a thread can be responsible for calling the call method of multiple Tasks.
   
   -------------------------------------------------------------
   
   我的理解恰恰相反
   我认为在离线任务场景下,任务被分为众多的Task。这些Task被固定的线程数分批执行完成,这是最高效的做法,因为一个Task会由一个线程执行完成,执行过程中不会涉及线程的调度与切换和等待。
   
   在实时同步场景(无界数据处理)下,共享线程才能带来优化。例如大量小表(几千上万)实时同步的场景,其实数据量不会很大但任务数会非常多,造成Task数量很多,这时候在一个Task开一个线程肯定是不划算的,加上java没有很好的协程实现,所以共享线程是一个优化的方式。
   
   你提到的必须有最少一个进程持续进行数据摄入,这个我认为不是必须的,因为Task的设计是通过一次次调用Call方法去驱动执行,一个线程完全可以负责多个Task的call方法的调用。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on issue #2279: [ST-Engine][Design] TaskExecutionService and Task related design

Posted by GitBox <gi...@apache.org>.
ic4y commented on issue #2279:
URL: https://github.com/apache/incubator-seatunnel/issues/2279#issuecomment-1197078644

   Chinese and English design pictures:
   
   
   ![TaskExecutionServer相关设计 drawio](https://user-images.githubusercontent.com/83933160/181310713-a01bd628-84bb-4dfd-8861-f5267f62a57d.png)
   
   ![TaskExecutionServer  Related design drawio](https://user-images.githubusercontent.com/83933160/181311013-6b13eac8-de02-4c21-8575-2b2aab953f74.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on issue #2279: [ST-Engine][Design] TaskExecutionService and Task related design

Posted by GitBox <gi...@apache.org>.
ic4y commented on issue #2279:
URL: https://github.com/apache/incubator-seatunnel/issues/2279#issuecomment-1200616244

   @mosence 
   Thanks for your comment. The `dynamic thread share` is coding now. When finished, let's continue the discussion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] mosence commented on issue #2279: [ST-Engine][Design] TaskExecutionService and Task related design

Posted by GitBox <gi...@apache.org>.
mosence commented on issue #2279:
URL: https://github.com/apache/incubator-seatunnel/issues/2279#issuecomment-1198806815

   @ic4y 
   所以这是针对减少task调用延迟的优化?因为当task数量大于线程池现有线程数量时,没法避免线程的创建。
   把延迟的线程移到额外的伸缩线程空间,来让线程池能够申请新线程来执行排队中的任务?
   为啥不把线程池的最大容量调高呢?如果一定要改的话,我觉得设计支持可伸缩线程池会更简单方便。
   
   另外,我觉得共享的内容应该是连接池而不是线程池。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [ST-Engine][Design] TaskExecutionService and Task related design [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X closed issue #2279: [ST-Engine][Design]  TaskExecutionService and Task related design
URL: https://github.com/apache/seatunnel/issues/2279


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] mosence commented on issue #2279: [ST-Engine][Design] TaskExecutionService and Task related design

Posted by GitBox <gi...@apache.org>.
mosence commented on issue #2279:
URL: https://github.com/apache/incubator-seatunnel/issues/2279#issuecomment-1197630734

   有几个问题想问下:
   1、超时的task1独享的线程是否有超时设计?
   如果有:那么这个 Task有永远完成不了的情况。
   如果没有:如何设计整体执行执行超时失败?
   2、独享的线程是从原来的线程池里分离出来的么?
   如果是:那么3个线程池,有三个超时导致独享的Task也会导致任务堵塞。
   如果不是:那么怎么管理这些独享线程?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] mosence commented on issue #2279: [ST-Engine][Design] TaskExecutionService and Task related design

Posted by GitBox <gi...@apache.org>.
mosence commented on issue #2279:
URL: https://github.com/apache/incubator-seatunnel/issues/2279#issuecomment-1197802851

   @ic4y 
   谢谢你的回答。
   关于第一个问题的“If not”,可能是我对于这种设计为了解决的问题理解不够。
   1、我理解中的无界数据处理,必须有最少一个进程持续进行数据摄入,而数据划分窗口由窗口线程或者逻辑划分数据。这没有线程共享的问题。
   2、所以线程共享问题只存在有界数据的处理,当需要把有界数据拆分小任务,进行数据摄入时,才会有为了性能进行共享线程。使得原有的每个小任务并行线程处理拆分成固定线程的串行处理。那么为了解决有界数据,必然会有超时这种概念来快速失败以达到更早发现数据处理缓慢原因所在。
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org