You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tez.apache.org by "Yingda Chen (JIRA)" <ji...@apache.org> on 2018/10/05 19:27:00 UTC

[jira] [Created] (TEZ-3997) Enable CONCURRENT edge

Yingda Chen created TEZ-3997:
--------------------------------

             Summary: Enable CONCURRENT edge
                 Key: TEZ-3997
                 URL: https://issues.apache.org/jira/browse/TEZ-3997
             Project: Apache Tez
          Issue Type: New Feature
            Reporter: Yingda Chen


A better formatted (and commentable) google doc with figures can be found at 

[https://docs.google.com/document/d/17WRi2hwQGb-ms0-yHMp4OJf6yjmbh5aBy98N6z-tUgY/edit?usp=sharing]

 
h1. *Enable CONCURRENT edge in Tez*

 
h2. *Motivation*

Tez was designed, from day one, to be a unifying framework for building data processing application[1], with the promise to support different workloads. Yet the focus on Tez has largely placed on supporting batch data processing, such as Hive/Pig. For those applications, edge SchedulingType is usually modeled as SEQUENTIAL, with the between-vertices shuffle implemented on top of Tez APIs. We believe that there are legitimate needs to fully enable the CONCURRENT SchedulingType, which break away from the assumption that destination vertex can only be scheduled after (part of) the the tasks in source vertex have been completed. 

There are various scenarios where CONCURRENT scheduling type can be helpful, such as the gang scheduling of the whole DAG, or a refined version that gang-scheduled a sub-graph of a DAG, known as “bubble scheduling”[2].  In addition, we have found that for Tez to truly unify workloads other than conventional MR or SQL-like applications, the need of CONCURRENT scheduling become more pressing. For example, a parameter-server application[3] can be modeled as a DAG below, where *PS* denotes the vertex that hosts parameter-servers, and *W* denotes the vertex that hosts workers responsible for heavy-lifting data-processing. There are two fundamental assumptions that must be satisfied for parameter-server to work:

 

see [google doc|https://docs.google.com/document/d/17WRi2hwQGb-ms0-yHMp4OJf6yjmbh5aBy98N6z-tUgY/edit?usp=sharing] for figure

Fig. 1, Parameter-server modeled as a DAG with concurrent edge

 
 # All servers (i.e., all tasks in PS vertex) must be up and running before any worker (task in W vertex) can make meaningful progress
 # All servers must run concurrently with the workers through the lifetime of the job

 

Note that one salient common trait shared by the above example is that the EPHEMERAL data source type comes hand-in-hand with the CONCURRENT scheduling type. While this is what we have found to be true in many practical scenarios, the original design in Tez that provides orthogonal  DataSourceType and Scheduling remains more descriptive, and the proposed changes here would keep that intact. 

 

Overall, we believe that the fundamental design of Tez framework, such as the pluggale Edge/Vertex managers and versatile edge types, provides the customizability needed to enable the various scenarios described above, and we propose to make the following changes.

 
h2. *Proposed Changes*

To address the above issues, we propose the following changes:
 # (No API change) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType.
 # (Some API change/addition) Extend VertexManagerPlugin interface to allow for relevant events notification.
 # Enable downstream vertex connecting to an EPHEMERAL data source, to reason about network connections of upstream tasks. 
 # Allow mixture of CONCURRENT/SEQUENTIAL incoming edges on the same vertex

 

The details for the proposed changes are provided in the following sections.
 * *Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType*

 
|Note:There is no API change in this proposed change. The majority of this change will be lifting some existing constraints against CONCURRENT edge type, and addition of a VertexMangerPlugin implementation.|

 

This includes enabling the CONCURRENT SchedulingType as a valid edge property, by removing all the sanity check against CONCURRENT during DAG construction/execution. A new VertexManagerPlugin (namely VertexManagerWithConcurrentInput) will be implemented for vertex with incoming concurrent edge(s). 

In addition, we will assume in this change that 
 * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
 * No shuffle or data movement is handled by Tez framework when two vertices are connected through a CONCURRENT edge. Instead, runtime should be responsible for handling all the data-plane communications (as proposed in [1]).

Note that the above assumptions are common for scenarios such as whole-DAG or sub-graph gang scheduling, but they may be relaxed in later implementation, which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.

 

Most of the (meaningful) scheduling decisions today in Tez are made based on the notion of (or an extended version of) source task completion. This will no longer be true in presence of CONCURRENT edge. Instead, events such as source vertex configured, or source task running will become more relevant when making scheduling decision for two vertices connected via a CONCURRENT edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to describe the “scheduling timing” for the downstream vertex in such scenarios. 
|public enum ConcurrentSchedulingType{
   /** * trigger downstream vertex tasks scheduling by "configured" event of upstream vertices */
  SOURCE_VERTEX_CONFIGURED,
   /** * trigger downstream vertex tasks scheduling by "running" event of upstream tasks */ 
  SOURCE_TASK_STARTED 
}|

 

Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the scheduling type, which suffice for scenarios of whole-DAG or sub-graph gang-scheduling, where we want (all the tasks in) the downstream vertex to be scheduled together with (all the tasks) in the upstream vertex. In this case, we can leverage the existing onVertexStateUpdated() interface of VextexMangerPlugin to collect relevant information to assist the scheduling decision, and *there is no additional API change necessary*. However, in more subtle case such as the parameter-server example described in Fig. 1, other scheduling type would be more relevant, therefore the placeholder for *ConcurrentSchedulingType* will be introduced in this change as part of the infrastructure work.

 

Finally, since we assume that all communications between two vertices connected via CONCURRENT edge are handled by application runtime, a CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all DME/VME handling.
 #  *Extend VertexManagerPlugin interface to allow for relevant events notification*

For concurrent connection, the downstream and upstream vertices would be running concurrently, and in some cases, they would be scheduled at the same time as well, such as (sub-graph) gang scheduling. However, *this is not always true*. In the example in Fig. 1, tasks in PS vertex should be running before tasks in W vertex should be scheduled. Since otherwise if the resource requests for PS cannot be fulfilled first, W will be spinning in vain. In other examples, as long as part of tasks in upstream vertex are running, we can start scheduling downstream tasks. 

 

In other words, if we put this into the context of existing interface/implementation of VertexMangerPlugin, we can see strong duality of “OnSourceTaskRunning” for concurrent connection vs the “OnSourceTaskCompleted” for (existing) sequential connection. Therefore, we propose an addition of “_onConcurrentSourceTaskRunning(TaskAttemptIdentifer attempt)_” interface to the VertexManager Plugin, with default implementation being not supported.

 

This change will also include the logic to add source task running event and to send such events to downstream vertices. To reduce unnecessary event traffic, we will limit the sending of such events to CONCURRENT edge, and when the ConcurrentSchedulingType is specified to be  SOURCE_TASK_STARTED .

 
 #  *Enable downstream vertex connecting to an EPHEMERAL data source, to reason about network connections of upstream tasks.*

Another property that is usually shared with CONCURRENT on the same edge is EPHEMERAL data source. When two vertices are running concurrently, direct communications between tasks in those vertices become possible, and oftentimes necessary, throughout the lifetime of the running task. This can be articulated by an EPHEMERAL data sources, and this change aims to support such scenarios, which are readily found in real-time applications (such as interactive query) and/or customized applications that would like to control their own data communications (such as parameter-server).

 

This change will allow Tez to be the central orchestrator that gathers necessary network information from all upstream tasks, compiles them together and send it to downstream tasks. Particularly, the following changes are planned:
 # For two vertices connected via an edge with both CONCURRENT scheduling type and EPHEMERAL data source type, the task in upstream vertex will open network port, and send an VertexManagerEvent(VME) immediately upon running. The payload of VME includes necessary information to communicate to this task through direct network communication (such as ip and port). The vertex manager of the downstream vertex, typed VertexManagerWithConcurrentInputs, will receive these VMEs, and are responsible for aggregate (including de-dup if necessary) all information together in onVertexManagerEventReceived(). 
 # Once all VMEs have been received, a CustomProcessorEvent will be constructed with a payload that includes the aggregated information, and be routed to downstream tasks.

The change will introduce additional optional entries in VertexManagerEventPayload and a new custom payload that will be embedded into CustomProcessorEvent. 

 

Upon completion of functional feature in this change, additional feature such as handling of failover in CONCURRENT/EPHEMERAL edge will be addressed in future umbrea JIRAs. 

 
 #  *Allow mixture of CONCURRENT/SEQUENTIAL incoming edges on the same vertex*

In the above two changes, we assume that a vertex’s incoming edges should have the same edge property in terms of Scheduling Type, i.e., they are either all SEQUENTIAL, or, all CONCURRENT.

We shall extend beyond this assumption in this change to allow mixture of different incoming edge types, as exemplified in Fig.2. 

see [google doc|https://docs.google.com/document/d/17WRi2hwQGb-ms0-yHMp4OJf6yjmbh5aBy98N6z-tUgY/edit?usp=sharing] for figure

Fig. 2 Vertex with mixture of input edges

This change will mainly focus on enriching the VertexMangagerPlugin implementation that we introduced in our first change, namely, the VertexManagerWithConcurrentInputs. No API change is expected with this change.

 
h2. Reference

[1] Apache Tez: A Unifying Framework for Modeling and Building Data Processing Applications SIGMOD’15 http://dl.acm.org/authorize?N97131

[2] Bubble Execution: Resource-aware Reliable Analytics at Cloud Scale , VLDB 2018 [http://www.vldb.org/pvldb/vol11/p746-yin.pdf]

[3] [https://www.tensorflow.org/deploy/distributed]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)