You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tez.apache.org by "Jeff Zhang (JIRA)" <ji...@apache.org> on 2015/06/18 09:45:02 UTC

[jira] [Comment Edited] (TEZ-1273) Refactor DAGAppMaster to state machine based

    [ https://issues.apache.org/jira/browse/TEZ-1273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591400#comment-14591400 ] 

Jeff Zhang edited comment on TEZ-1273 at 6/18/15 7:44 AM:
----------------------------------------------------------

bq. writeLock is used in the statemachine handle() call but readLock is not used anywhere. Doesn't all read access to data that is updated within the state transitions need to be protected by a read lock?
I think currently DAGAppMaster expose itself through RunningAppContext to be used by other components. Ideally RunningAppContext should be protected by read lock, but it would easily bring dead lock issue. One example is that DAGAppMaster#getAMState, I use a volatile field state in DAGAppMaster but not get it from the state machine object every time to avoid deadlock. Given that most of fields accessed in RunningAppContext won't change after AM is started properly, it should be safe without read lock. But in the future, we do need to consider other better ways to access RunningAppContext.

bq. would it make the code less complex if we had diff transitions for dag_succeded, dag_failed, dag_internal_error, dag_killed?
I'm afraid not. They share lots of common code in DAGAppMaster#dagComplete, only different on the following part. And this lead less code change, easy for review. 
{code}
 switch(dagFinishedEvent.getDAGState()) {
      case SUCCEEDED:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          successfulDAGs.incrementAndGet();
        }
        break;
      case FAILED:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          failedDAGs.incrementAndGet();
        }
        
        break;
      case KILLED:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          killedDAGs.incrementAndGet();
        }
        break;
      case ERROR:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          failedDAGs.incrementAndGet();
        }
{code}

bq. this transition seems a bit confusing. We should make it more clear as to what it is trying to do. Also the comment on submit dag in non-session mode does not seem right.
bq. Related to the above, we have the rpc/api layer and the state machine layer. Some of the calls need to be synchronous and others are handed off the state machine. This leads a possibility of race conditions such as 2 dags being submitted at the same time while the AM is in an idle state. Sending an event to the state machine here would likely not work. We may need to explicitly state to RUNNING from IDLE to prevent such races. Any thoughts on handling this?
Introduce a new event type DAG_SUBMITTED to represent that a new DAG is submitted through rpc and rename NEW_DAG_SUBMITTED to DAG_PREPARE to notify components that new dag is submitted. 

bq. Why is an internal error ignorable? Can this cause the AM to remain infinitely in a terminating state?
Fixed.

bq. catch alls hide bugs. We should remove the above.
Fixed.

bq. does work within serviceInit/serviceStart/serviceStop need to be within a write lock?
Previously they are in synchronized block. I think this is to avoid that case that DAGAppMasterShutdownHook is called when it is in initialing or starting. This could prevent these 3 methods invoked at the the same time in different thread. 
Also prevent shutdownTezAM & submitDAGToAppMaster invoked at the same time (they are also both in synchronized block) write lock here is just to replace the synchronized keyword. 

bq. can serviceStop be triggered by something outside of the state machine?
Yes, it is possible when DAGAppMasterShutdownHook is invoked when AM receive terminate signal. 

bq. precondition checks are not useful for a state machine. This will throw an exception from the handle function and lead to an internal error and the AM shutting down.
Fixed.

bq. For ShutdownWhenRunningTransition, shouldn't the sessionStopped flag be set? What will happen after the dag completion ( after kill ) comes back? In this case, won't the AM go back to idle state for a session?

bq. confusing comment. Not matching code.
Fixed.

bq. Also, if the version mismatch error occurs and causes AM to go into ERROR state - how does the AM unregister from RM and update diagnostics?
diagnostics has already been updated when version compares.  And it will unregister from RM
{code}
     versionMismatchDiagnostics = "Incompatible versions found"
          + ", clientVersion=" + clientVersion
          + ", AMVersion=" + dagVersionInfo.getVersion();
      addDiagnostic(versionMismatchDiagnostics);
{code}

{code}
if (versionMismatch) {
      // Short-circuit and return as no DAG should not be run
      LOG.info("Version Mismatch, shutting down AM");
      this.taskSchedulerEventHandler.setShouldUnregisterFlag();
      shutdownHandler.shutdown();
      return DAGAppMasterState.ERROR;
    }
{code}

{code}
sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.SESSION_TIMEOUT));
{code}
bq. does this introduce a race with submitDag() ?
Fix it by handle session time event synchronously. 


bq. DrainDispatcher.java in tez-dag/src/test/ - any reason why we need this?
It is just for unit test. Used in TestMockDAGAppMaster

bq. LocalClient changes?
Minor change on LocalClient because we introduce a new DAGAppMasterState RECOVERING.

bq. TestMemoryWithEvents
Due to change on MockTezClient to simulate recovering. 


was (Author: zjffdu):
bq. writeLock is used in the statemachine handle() call but readLock is not used anywhere. Doesn't all read access to data that is updated within the state transitions need to be protected by a read lock?
I think currently DAGAppMaster expose itself through RunningAppContext to be used by other components. Ideally RunningAppContext should be protected by read lock, but it would easily bring dead lock issue. One example is that DAGAppMaster#getAMState, I use a volatile field state in DAGAppMaster but not get it from the state machine object every time to avoid deadlock. Given that most of fields accessed in RunningAppContext won't change after AM is started properly, it should be safe without read lock. But in the future, we do need to consider other better ways to access RunningAppContext.

bq. would it make the code less complex if we had diff transitions for dag_succeded, dag_failed, dag_internal_error, dag_killed?
I'm afraid not. They share lots of common code in DAGAppMaster#dagComplete, only different on the following part. And this lead less code change, easy for review. 
{code}
 switch(dagFinishedEvent.getDAGState()) {
      case SUCCEEDED:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          successfulDAGs.incrementAndGet();
        }
        break;
      case FAILED:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          failedDAGs.incrementAndGet();
        }
        
        break;
      case KILLED:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          killedDAGs.incrementAndGet();
        }
        break;
      case ERROR:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          failedDAGs.incrementAndGet();
        }
{code}

bq. this transition seems a bit confusing. We should make it more clear as to what it is trying to do. Also the comment on submit dag in non-session mode does not seem right.
bq. Related to the above, we have the rpc/api layer and the state machine layer. Some of the calls need to be synchronous and others are handed off the state machine. This leads a possibility of race conditions such as 2 dags being submitted at the same time while the AM is in an idle state. Sending an event to the state machine here would likely not work. We may need to explicitly state to RUNNING from IDLE to prevent such races. Any thoughts on handling this?
Introduce a new event type DAG_SUBMITTED to represent that a new DAG is submitted through rpc and rename NEW_DAG_SUBMITTED to DAG_PREPARE to notify components that new dag is submitted. 

bq. Why is an internal error ignorable? Can this cause the AM to remain infinitely in a terminating state?
Fixed.

bq. catch alls hide bugs. We should remove the above.
Fixed.

bq. does work within serviceInit/serviceStart/serviceStop need to be within a write lock?
originally they are in synchronized block. I think this is to avoid that multiple thread access serviceStop which may cause conflicts. Also prevent shutdownTezAM & submitDAGToAppMaster invoked at the same time (they are both in synchronized block) write lock here is just to replace the synchronized keyword. 

bq. can serviceStop be triggered by something outside of the state machine?
Yes, it is possible when DAGAppMasterShutdownHook is invoked when AM receive terminate signal. 

bq. precondition checks are not useful for a state machine. This will throw an exception from the handle function and lead to an internal error and the AM shutting down.
Fixed.

bq. For ShutdownWhenRunningTransition, shouldn't the sessionStopped flag be set? What will happen after the dag completion ( after kill ) comes back? In this case, won't the AM go back to idle state for a session?

bq. confusing comment. Not matching code.
Fixed.

bq. Also, if the version mismatch error occurs and causes AM to go into ERROR state - how does the AM unregister from RM and update diagnostics?
diagnostics has already been updated when version compares.  And it will unregister from RM
{code}
     versionMismatchDiagnostics = "Incompatible versions found"
          + ", clientVersion=" + clientVersion
          + ", AMVersion=" + dagVersionInfo.getVersion();
      addDiagnostic(versionMismatchDiagnostics);
{code}

{code}
if (versionMismatch) {
      // Short-circuit and return as no DAG should not be run
      LOG.info("Version Mismatch, shutting down AM");
      this.taskSchedulerEventHandler.setShouldUnregisterFlag();
      shutdownHandler.shutdown();
      return DAGAppMasterState.ERROR;
    }
{code}

{code}
sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.SESSION_TIMEOUT));
{code}
bq. does this introduce a race with submitDag() ?
Fix it by handle session time event synchronously. 


bq. DrainDispatcher.java in tez-dag/src/test/ - any reason why we need this?
It is just for unit test. Used in TestMockDAGAppMaster

bq. LocalClient changes?
Minor change on LocalClient because we introduce a new DAGAppMasterState RECOVERING.

bq. TestMemoryWithEvents
Due to change on MockTezClient to simulate recovering. 

> Refactor DAGAppMaster to state machine based
> --------------------------------------------
>
>                 Key: TEZ-1273
>                 URL: https://issues.apache.org/jira/browse/TEZ-1273
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Jeff Zhang
>            Assignee: Jeff Zhang
>         Attachments: DAGAppMaster_3.pdf, DAGAppMaster_4.pdf, DAGAppMaster_5.pdf, TEZ-1273-3.patch, TEZ-1273-4.patch, TEZ-1273-5.patch, TEZ-1273-6.patch, TEZ-1273-7.patch, TEZ-1273-8.patch, TEZ-1273-9.patch, Tez-1273-2.patch, Tez-1273.patch, dag_app_master.pdf, dag_app_master2.pdf
>
>
> Almost all our entities (Vertex, Task etc) are state machine based and written using a formal state machine. But DAGAppMaster is not written on a formal state machine even though it has a state machine based behavior. This jira is for refactoring it into state machine based



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)