You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tez.apache.org by "Bikas Saha (JIRA)" <ji...@apache.org> on 2014/03/02 21:53:31 UTC

[jira] [Comment Edited] (TEZ-847) Support basic AM recovery

    [ https://issues.apache.org/jira/browse/TEZ-847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13917539#comment-13917539 ] 

Bikas Saha edited comment on TEZ-847 at 3/2/14 8:52 PM:
--------------------------------------------------------

Do we need to expose recovering status for the Vertex externally?
{code}+      case VERTEX_RECOVERING:
+        return VertexStatus.State.RECOVERING;{code}

We need to be careful about changing proto field numbers with recovery since an upgrade will probably cause AM restart and recovery. Probably add a note on this.
{code}-  VERTEX_ERROR = 7;
-  VERTEX_TERMINATING = 8;
+  VERTEX_RECOVERING = 2;
+  VERTEX_INITED = 3;
+  VERTEX_RUNNING = 4;{code}

Wondering if there is anything in the recovery logic that prevents us from running multiple concurrent DAGs? I am sure there will be some initial setup changes where we recover multiple DAG's. But saving recovery data and recovering multiple DAGs should not be hindered by anything in the recovery design.

If TODO is done then please remove, else please link to jira is that tracks the work
{code}+          // Recoverying dag
+          // TODO check if DAG completed, if yes update summary?
+          // nothing to do at this point, go to IDLE
+          // else if not complete{code}

Wont this depend on the AM retry number? We can/should call startDAG only if its the first attempt. Thereafter, we should only recover the DAG or fail. Its not safe to rerun the DAG in case the previous attempt actually finished the DAG. What are the cases in which DAG recovery parsing returns a null value? The AM may hang in such cases because nothing will trigger completion.
{code}-      startDAG();
+      DAG recoveredDAG = null;
+      if (appAttemptID.getAttemptId() != 0) {
+        recoveredDAG = recoverDAG();
+      }
+      if (recoveredDAG == null) {
+        dagCounter.set(0);
+        startDAG();
+      }{code}

On that node, dont see this being used anywhere?
{code} +  protected boolean isLastAMRetry = false;{code}

What if the stream is incomplete? Available > 0 but not enough data to populate the proto?
{code}+    while (inputStream.available() > 0) {
+      RecoveryProtos.SummaryEventProto proto =
+          RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream);
+      LOG.info("[SUMMARY]"
+          + " dagId=" + proto.getDagId()
+          + ", timestamp=" + proto.getTimestamp()
+          + ", event=" + HistoryEventType.values()[proto.getEventType()]);
+    }{code}

Do we want to read int or byte? return value is int in both cases though. read() reads a byte.
{code}+      int eventTypeOrdinal = inputStream.read();{code}

Some comments in RecoveryParser or writing code would help understand the layout of the recovery data on the FS. 
Why is the recovery path have attempt id? Why cannot all attempts share the same app directory for recovery since each attempt should only be appending new information to the recovery log?
{code}+  private Path getAttemptRecoveryDataDir(Path recoveryDataDir,
+      int attemptId) {
+    return new Path(recoveryDataDir, Integer.toString(attemptId));
+  }{code}

Saving review. More coming.



was (Author: bikassaha):
Do we need to expose recovering status for the Vertex externally?
{code}+      case VERTEX_RECOVERING:
+        return VertexStatus.State.RECOVERING;{code}

We need to be careful about changing proto field numbers with recovery since an upgrade will probably cause AM restart and recovery. Probably add a note on this.
{code}-  VERTEX_ERROR = 7;
-  VERTEX_TERMINATING = 8;
+  VERTEX_RECOVERING = 2;
+  VERTEX_INITED = 3;
+  VERTEX_RUNNING = 4;{code}

This maybe unrelated to the code but it made me think if there is something in the recovery logic that prevents us from running multiple concurrent DAGs? I am sure there will be some initial setup changes where we recover multiple DAG's. But saving recovery data and recovering multiple DAGs should not be hindered by anything in the recovery design.

If TODO is done then please remove, else please link to jira is that tracks the work
{code}+          // Recoverying dag
+          // TODO check if DAG completed, if yes update summary?
+          // nothing to do at this point, go to IDLE
+          // else if not complete{code}

Wont this depend on the AM retry number? We can/should call startDAG only if its the first attempt. Thereafter, we should only recover the DAG or fail. Its not safe to rerun the DAG in case the previous attempt actually finished the DAG. What are the cases in which DAG recovery parsing returns a null value?
{code}-      startDAG();
+      DAG recoveredDAG = null;
+      if (appAttemptID.getAttemptId() != 0) {
+        recoveredDAG = recoverDAG();
+      }
+      if (recoveredDAG == null) {
+        dagCounter.set(0);
+        startDAG();
+      }{code}

On that node, dont see this being used anywhere?
{code} +  protected boolean isLastAMRetry = false;{code}

What if the stream is incomplete? Available > 0 but not enough data to populate the proto?
{code}+    while (inputStream.available() > 0) {
+      RecoveryProtos.SummaryEventProto proto =
+          RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream);
+      LOG.info("[SUMMARY]"
+          + " dagId=" + proto.getDagId()
+          + ", timestamp=" + proto.getTimestamp()
+          + ", event=" + HistoryEventType.values()[proto.getEventType()]);
+    }{code}

Do we want to read int or byte? return value is int in both cases though. read() reads a byte.
{code}+      int eventTypeOrdinal = inputStream.read();{code}

Some comments in RecoveryParser or writing code would help understand the layout of the recovery data on the FS. 
Why is the recovery path have attempt id? Why cannot all attempts share the same app directory for recovery since each attempt should only be appending new information to the recovery log?
{code}+  private Path getAttemptRecoveryDataDir(Path recoveryDataDir,
+      int attemptId) {
+    return new Path(recoveryDataDir, Integer.toString(attemptId));
+  }{code}

Saving review. More coming.


> Support basic AM recovery
> -------------------------
>
>                 Key: TEZ-847
>                 URL: https://issues.apache.org/jira/browse/TEZ-847
>             Project: Apache Tez
>          Issue Type: Sub-task
>            Reporter: Hitesh Shah
>            Assignee: Hitesh Shah
>         Attachments: TEZ-847.1.patch, TEZ-847.2.patch, TEZ-847.3.patch, TEZ-847.4.patch, TEZ-847.5.patch, TEZ-847.6.patch, TEZ-847.7.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.2#6252)