You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "zhihai xu (JIRA)" <ji...@apache.org> on 2015/02/13 06:22:11 UTC

[jira] [Updated] (MAPREDUCE-6259) -1 submit time cause IllegalArgumentException when parse the Job history file name and JOB_INIT_FAILED cause -1 submit time in JobIndexInfo.

     [ https://issues.apache.org/jira/browse/MAPREDUCE-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

zhihai xu updated MAPREDUCE-6259:
---------------------------------
    Description: 
-1 submit time cause IllegalArgumentException when parse the Job history file name and JOB_INIT_FAILED cause -1 submit time in JobIndexInfo.
We found the following job history file name which cause IllegalArgumentException when parse the job status in the job history file name.
{code}
job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist
{code}

when IOException happened in JobImpl#setup, the Job submit time in JobHistoryEventHandler#MetaInfo#JobIndexInfo will not be changed and the Job submit time will be its [initial value -1|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java#L1185].
{code}
      this.jobIndexInfo =
          new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
                           queueName);
{code}

The following is the sequence to get -1 submit time:
1. 
a job is created at MRAppMaster#serviceStart and  the new job is at state JobStateInternal.NEW after created
{code}
    job = createJob(getConfig(), forcedState, shutDownMessage);
{code}

2.
JobEventType.JOB_INIT is sent to JobImpl from MRAppMaster#serviceStart
{code}
      JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
      // Send init to the job (this does NOT trigger job execution)
      // This is a synchronous call, not an event through dispatcher. We want
      // job-init to be done completely here.
      jobEventDispatcher.handle(initJobEvent);
{code}

3.
after JobImpl received JobEventType.JOB_INIT, it call InitTransition#transition
{code}
          .addTransition
              (JobStateInternal.NEW,
              EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
              JobEventType.JOB_INIT,
              new InitTransition())
{code}

4.
then the exception happen from setup(job) in InitTransition#transition before JobSubmittedEvent is handled.
JobSubmittedEvent will update the job submit time. Due to the exception, the submit time is still the initial value -1.
This is the code InitTransition#transition
{code}
public JobStateInternal transition(JobImpl job, JobEvent event) {
      job.metrics.submittedJob(job);
      job.metrics.preparingJob(job);
      if (job.newApiCommitter) {
        job.jobContext = new JobContextImpl(job.conf, job.oldJobId);
      } else {
        job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(job.conf, job.oldJobId);
      }
      try {
        setup(job);
        job.fs = job.getFileSystem(job.conf);
        //log to job history
        JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
              job.conf.get(MRJobConfig.JOB_NAME, "test"), 
            job.conf.get(MRJobConfig.USER_NAME, "mapred"),
            job.appSubmitTime,
            job.remoteJobConfFile.toString(),
            job.jobACLs, job.queueName,
            job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
            job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
            job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
            getWorkflowAdjacencies(job.conf),
            job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
        job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
        //TODO JH Verify jobACLs, UserName via UGI?

        TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
        job.numMapTasks = taskSplitMetaInfo.length;
        job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);

        if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
          job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
        } else if (job.numMapTasks == 0) {
          job.reduceWeight = 0.9f;
        } else if (job.numReduceTasks == 0) {
          job.mapWeight = 0.9f;
        } else {
          job.mapWeight = job.reduceWeight = 0.45f;
        }

        checkTaskLimits();

        long inputLength = 0;
        for (int i = 0; i < job.numMapTasks; ++i) {
          inputLength += taskSplitMetaInfo[i].getInputDataLength();
        }

        job.makeUberDecision(inputLength);
        
        job.taskAttemptCompletionEvents =
            new ArrayList<TaskAttemptCompletionEvent>(
                job.numMapTasks + job.numReduceTasks + 10);
        job.mapAttemptCompletionEvents =
            new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
        job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
            job.numMapTasks + job.numReduceTasks + 10);

        job.allowedMapFailuresPercent =
            job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
        job.allowedReduceFailuresPercent =
            job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);

        // create the Tasks but don't start them yet
        createMapTasks(job, inputLength, taskSplitMetaInfo);
        createReduceTasks(job);

        job.metrics.endPreparingJob(job);
        return JobStateInternal.INITED;
      } catch (Exception e) {
        LOG.warn("Job init failed", e);
        job.metrics.endPreparingJob(job);
        job.addDiagnostic("Job init failed : "
            + StringUtils.stringifyException(e));
        // Leave job in the NEW state. The MR AM will detect that the state is
        // not INITED and send a JOB_INIT_FAILED event.
        return JobStateInternal.NEW;
      }
    }
{code}

This is the code JobImpl#setup
{code}
    protected void setup(JobImpl job) throws IOException {

      String oldJobIDString = job.oldJobId.toString();
      String user = 
        UserGroupInformation.getCurrentUser().getShortUserName();
      Path path = MRApps.getStagingAreaDir(job.conf, user);
      if(LOG.isDebugEnabled()) {
        LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
      }

      job.remoteJobSubmitDir =
          FileSystem.get(job.conf).makeQualified(
              new Path(path, oldJobIDString));
      job.remoteJobConfFile =
          new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);

      // Prepare the TaskAttemptListener server for authentication of Containers
      // TaskAttemptListener gets the information via jobTokenSecretManager.
      JobTokenIdentifier identifier =
          new JobTokenIdentifier(new Text(oldJobIDString));
      job.jobToken =
          new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
      job.jobToken.setService(identifier.getJobId());
      // Add it to the jobTokenSecretManager so that TaskAttemptListener server
      // can authenticate containers(tasks)
      job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
      LOG.info("Adding job token for " + oldJobIDString
          + " to jobTokenSecretManager");

      // If the job client did not setup the shuffle secret then reuse
      // the job token secret for the shuffle.
      if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
        LOG.warn("Shuffle secret key missing from job credentials."
            + " Using job token secret as shuffle secret.");
        TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
            job.jobCredentials);
      }
    }
{code}

5.
Due to the IOException from  JobImpl#setup, the new job is still at state JobStateInternal.NEW
{code}
      } catch (Exception e) {
        LOG.warn("Job init failed", e);
        job.metrics.endPreparingJob(job);
        job.addDiagnostic("Job init failed : "
            + StringUtils.stringifyException(e));
        // Leave job in the NEW state. The MR AM will detect that the state is
        // not INITED and send a JOB_INIT_FAILED event.
        return JobStateInternal.NEW;
      }
{code}
At the following code of MRAppMaster#serviceStart, The MR AM detect the state is not INITED and send a JOB_INIT_FAILED event.
{code}
      // If job is still not initialized, an error happened during
      // initialization. Must complete starting all of the services so failure
      // events can be processed.
      initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);
    if (initFailed) {
      JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
      jobEventDispatcher.handle(initFailedEvent);
    } else {
      // All components have started, start the job.
      startJobs();
    }
{code}

6.
After JobImpl receives the JOB_INIT_FAILED, it will call InitFailedTransition#transition and enter state JobStateInternal.FAIL_ABORT
{code}
          .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
              JobEventType.JOB_INIT_FAILED,
              new InitFailedTransition())
{code}

7.
JobImpl will send CommitterJobAbortEvent in  InitFailedTransition#transition 
{code}
    public void transition(JobImpl job, JobEvent event) {
        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
                job.jobContext,
                org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
    }
{code}

8.
CommitterJobAbortEvent will be handled by CommitterEventHandler#handleJobAbort which will send JobAbortCompletedEvent(JobEventType.JOB_ABORT_COMPLETED)
{code}
    protected void handleJobAbort(CommitterJobAbortEvent event) {
      cancelJobCommit();
      try {
        committer.abortJob(event.getJobContext(), event.getFinalState());
      } catch (Exception e) {
        LOG.warn("Could not abort job", e);
      }
      context.getEventHandler().handle(new JobAbortCompletedEvent(
          event.getJobID(), event.getFinalState()));
    }
{code}

9.
After JobImpl receives the JOB_ABORT_COMPLETED, it will call JobAbortCompletedTransition#transition and enter state JobStateInternal.FAILED
{code}
          .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
              JobEventType.JOB_ABORT_COMPLETED,
              new JobAbortCompletedTransition())
{code}

10.
JobAbortCompletedTransition#transition will call JobImpl#unsuccessfulFinish which will send JobUnsuccessfulCompletionEvent with finish time.
{code}
    public void transition(JobImpl job, JobEvent event) {
      JobStateInternal finalState = JobStateInternal.valueOf(
          ((JobAbortCompletedEvent) event).getFinalState().name());
      job.unsuccessfulFinish(finalState);
    }
  private void unsuccessfulFinish(JobStateInternal finalState) {
      if (finishTime == 0) setFinishTime();
      cleanupProgress = 1.0f;
      JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
          new JobUnsuccessfulCompletionEvent(oldJobId,
              finishTime,
              succeededMapTaskCount,
              succeededReduceTaskCount,
              finalState.toString(),
              diagnostics);
      eventHandler.handle(new JobHistoryEvent(jobId,
          unsuccessfulJobEvent));
      finished(finalState);
  }
{code}

11.
JobUnsuccessfulCompletionEvent will be handled by JobHistoryEventHandler#handleEvent with type EventType.JOB_FAILED
Based on the following code, you can see the JobIndexInfo#finishTime is set correctly but JobIndexInfo#submitTime and  JobIndexInfo#jobStartTime are still -1.
{code}
      if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
          || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
        try {
          JobUnsuccessfulCompletionEvent jucEvent = 
              (JobUnsuccessfulCompletionEvent) event
              .getHistoryEvent();
          mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
          mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
          mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
          mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
          closeEventWriter(event.getJobID());
          processDoneFiles(event.getJobID());
        } catch (IOException e) {
          throw new YarnRuntimeException(e);
        }
      }
{code}

The error job history file name in our log is "job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist"
Based on the filename, you can see submitTime is -1, finishTime is 1423572836007 and jobStartTime is 1423572836007.
The jobStartTime is not -1, and  jobStartTime is the same as  finishTime.
It is because jobStartTime is handled specially in FileNameIndexUtils#getDoneFileName:
{code}
    //JobStartTime
    if (indexInfo.getJobStartTime() >= 0) {
      sb.append(indexInfo.getJobStartTime());
    } else {
      sb.append(indexInfo.getFinishTime());
    }
{code}



  was:
-1 submit time cause IllegalArgumentException when parse the Job history file name and JOB_INIT_FAILED cause -1 submit time in JobIndexInfo.
We found the following job history file name which cause IllegalArgumentException when parse the job status in the job history file name.

when IOException happened in JobImpl#setup, the Job submit time in JobHistoryEventHandler#MetaInfo#JobIndexInfo will not be changed and the Job submit time will be its [initial value -1|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java#L1185].
{code}
      this.jobIndexInfo =
          new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
                           queueName);
{code}

The following is the sequence to get -1 submit time:
1. 
a job is created at MRAppMaster#serviceStart and  the new job is at state JobStateInternal.NEW after created
{code}
    job = createJob(getConfig(), forcedState, shutDownMessage);
{code}

2.
JobEventType.JOB_INIT is sent to JobImpl from MRAppMaster#serviceStart
{code}
      JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
      // Send init to the job (this does NOT trigger job execution)
      // This is a synchronous call, not an event through dispatcher. We want
      // job-init to be done completely here.
      jobEventDispatcher.handle(initJobEvent);
{code}

3.
after JobImpl received JobEventType.JOB_INIT, it call InitTransition#transition
{code}
          .addTransition
              (JobStateInternal.NEW,
              EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
              JobEventType.JOB_INIT,
              new InitTransition())
{code}

4.
then the exception happen from setup(job) in InitTransition#transition before JobSubmittedEvent is handled.
JobSubmittedEvent will update the job submit time. Due to the exception, the submit time is still the initial value -1.
This is the code InitTransition#transition
{code}
public JobStateInternal transition(JobImpl job, JobEvent event) {
      job.metrics.submittedJob(job);
      job.metrics.preparingJob(job);
      if (job.newApiCommitter) {
        job.jobContext = new JobContextImpl(job.conf, job.oldJobId);
      } else {
        job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(job.conf, job.oldJobId);
      }
      try {
        setup(job);
        job.fs = job.getFileSystem(job.conf);
        //log to job history
        JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
              job.conf.get(MRJobConfig.JOB_NAME, "test"), 
            job.conf.get(MRJobConfig.USER_NAME, "mapred"),
            job.appSubmitTime,
            job.remoteJobConfFile.toString(),
            job.jobACLs, job.queueName,
            job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
            job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
            job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
            getWorkflowAdjacencies(job.conf),
            job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
        job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
        //TODO JH Verify jobACLs, UserName via UGI?

        TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
        job.numMapTasks = taskSplitMetaInfo.length;
        job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);

        if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
          job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
        } else if (job.numMapTasks == 0) {
          job.reduceWeight = 0.9f;
        } else if (job.numReduceTasks == 0) {
          job.mapWeight = 0.9f;
        } else {
          job.mapWeight = job.reduceWeight = 0.45f;
        }

        checkTaskLimits();

        long inputLength = 0;
        for (int i = 0; i < job.numMapTasks; ++i) {
          inputLength += taskSplitMetaInfo[i].getInputDataLength();
        }

        job.makeUberDecision(inputLength);
        
        job.taskAttemptCompletionEvents =
            new ArrayList<TaskAttemptCompletionEvent>(
                job.numMapTasks + job.numReduceTasks + 10);
        job.mapAttemptCompletionEvents =
            new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
        job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
            job.numMapTasks + job.numReduceTasks + 10);

        job.allowedMapFailuresPercent =
            job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
        job.allowedReduceFailuresPercent =
            job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);

        // create the Tasks but don't start them yet
        createMapTasks(job, inputLength, taskSplitMetaInfo);
        createReduceTasks(job);

        job.metrics.endPreparingJob(job);
        return JobStateInternal.INITED;
      } catch (Exception e) {
        LOG.warn("Job init failed", e);
        job.metrics.endPreparingJob(job);
        job.addDiagnostic("Job init failed : "
            + StringUtils.stringifyException(e));
        // Leave job in the NEW state. The MR AM will detect that the state is
        // not INITED and send a JOB_INIT_FAILED event.
        return JobStateInternal.NEW;
      }
    }
{code}

This is the code JobImpl#setup
{code}
    protected void setup(JobImpl job) throws IOException {

      String oldJobIDString = job.oldJobId.toString();
      String user = 
        UserGroupInformation.getCurrentUser().getShortUserName();
      Path path = MRApps.getStagingAreaDir(job.conf, user);
      if(LOG.isDebugEnabled()) {
        LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
      }

      job.remoteJobSubmitDir =
          FileSystem.get(job.conf).makeQualified(
              new Path(path, oldJobIDString));
      job.remoteJobConfFile =
          new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);

      // Prepare the TaskAttemptListener server for authentication of Containers
      // TaskAttemptListener gets the information via jobTokenSecretManager.
      JobTokenIdentifier identifier =
          new JobTokenIdentifier(new Text(oldJobIDString));
      job.jobToken =
          new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
      job.jobToken.setService(identifier.getJobId());
      // Add it to the jobTokenSecretManager so that TaskAttemptListener server
      // can authenticate containers(tasks)
      job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
      LOG.info("Adding job token for " + oldJobIDString
          + " to jobTokenSecretManager");

      // If the job client did not setup the shuffle secret then reuse
      // the job token secret for the shuffle.
      if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
        LOG.warn("Shuffle secret key missing from job credentials."
            + " Using job token secret as shuffle secret.");
        TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
            job.jobCredentials);
      }
    }
{code}

5.
Due to the IOException from  JobImpl#setup, the new job is still at state JobStateInternal.NEW
{code}
      } catch (Exception e) {
        LOG.warn("Job init failed", e);
        job.metrics.endPreparingJob(job);
        job.addDiagnostic("Job init failed : "
            + StringUtils.stringifyException(e));
        // Leave job in the NEW state. The MR AM will detect that the state is
        // not INITED and send a JOB_INIT_FAILED event.
        return JobStateInternal.NEW;
      }
{code}
At the following code of MRAppMaster#serviceStart, The MR AM detect the state is not INITED and send a JOB_INIT_FAILED event.
{code}
      // If job is still not initialized, an error happened during
      // initialization. Must complete starting all of the services so failure
      // events can be processed.
      initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);
    if (initFailed) {
      JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
      jobEventDispatcher.handle(initFailedEvent);
    } else {
      // All components have started, start the job.
      startJobs();
    }
{code}

6.
After JobImpl receives the JOB_INIT_FAILED, it will call InitFailedTransition#transition and enter state JobStateInternal.FAIL_ABORT
{code}
          .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
              JobEventType.JOB_INIT_FAILED,
              new InitFailedTransition())
{code}

7.
JobImpl will send CommitterJobAbortEvent in  InitFailedTransition#transition 
{code}
    public void transition(JobImpl job, JobEvent event) {
        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
                job.jobContext,
                org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
    }
{code}

8.
CommitterJobAbortEvent will be handled by CommitterEventHandler#handleJobAbort which will send JobAbortCompletedEvent(JobEventType.JOB_ABORT_COMPLETED)
{code}
    protected void handleJobAbort(CommitterJobAbortEvent event) {
      cancelJobCommit();
      try {
        committer.abortJob(event.getJobContext(), event.getFinalState());
      } catch (Exception e) {
        LOG.warn("Could not abort job", e);
      }
      context.getEventHandler().handle(new JobAbortCompletedEvent(
          event.getJobID(), event.getFinalState()));
    }
{code}

9.
After JobImpl receives the JOB_ABORT_COMPLETED, it will call JobAbortCompletedTransition#transition and enter state JobStateInternal.FAILED
{code}
          .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
              JobEventType.JOB_ABORT_COMPLETED,
              new JobAbortCompletedTransition())
{code}

10.
JobAbortCompletedTransition#transition will call JobImpl#unsuccessfulFinish which will send JobUnsuccessfulCompletionEvent with finish time.
{code}
    public void transition(JobImpl job, JobEvent event) {
      JobStateInternal finalState = JobStateInternal.valueOf(
          ((JobAbortCompletedEvent) event).getFinalState().name());
      job.unsuccessfulFinish(finalState);
    }
  private void unsuccessfulFinish(JobStateInternal finalState) {
      if (finishTime == 0) setFinishTime();
      cleanupProgress = 1.0f;
      JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
          new JobUnsuccessfulCompletionEvent(oldJobId,
              finishTime,
              succeededMapTaskCount,
              succeededReduceTaskCount,
              finalState.toString(),
              diagnostics);
      eventHandler.handle(new JobHistoryEvent(jobId,
          unsuccessfulJobEvent));
      finished(finalState);
  }
{code}

11.
JobUnsuccessfulCompletionEvent will be handled by JobHistoryEventHandler#handleEvent with type EventType.JOB_FAILED
Based on the following code, you can see the JobIndexInfo#finishTime is set correctly but JobIndexInfo#submitTime and  JobIndexInfo#jobStartTime are still -1.
{code}
      if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
          || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
        try {
          JobUnsuccessfulCompletionEvent jucEvent = 
              (JobUnsuccessfulCompletionEvent) event
              .getHistoryEvent();
          mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
          mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
          mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
          mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
          closeEventWriter(event.getJobID());
          processDoneFiles(event.getJobID());
        } catch (IOException e) {
          throw new YarnRuntimeException(e);
        }
      }
{code}

The error job history file name in our log is "job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist"
Based on the filename, you can see submitTime is -1, finishTime is 1423572836007 and jobStartTime is 1423572836007.
The jobStartTime is not -1, and  jobStartTime is the same as  finishTime.
It is because jobStartTime is handled specially in FileNameIndexUtils#getDoneFileName:
{code}
    //JobStartTime
    if (indexInfo.getJobStartTime() >= 0) {
      sb.append(indexInfo.getJobStartTime());
    } else {
      sb.append(indexInfo.getFinishTime());
    }
{code}




> -1 submit time cause IllegalArgumentException when parse the Job history file name and JOB_INIT_FAILED cause -1 submit time in JobIndexInfo.
> --------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-6259
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6259
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: jobhistoryserver
>            Reporter: zhihai xu
>            Assignee: zhihai xu
>
> -1 submit time cause IllegalArgumentException when parse the Job history file name and JOB_INIT_FAILED cause -1 submit time in JobIndexInfo.
> We found the following job history file name which cause IllegalArgumentException when parse the job status in the job history file name.
> {code}
> job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist
> {code}
> when IOException happened in JobImpl#setup, the Job submit time in JobHistoryEventHandler#MetaInfo#JobIndexInfo will not be changed and the Job submit time will be its [initial value -1|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java#L1185].
> {code}
>       this.jobIndexInfo =
>           new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
>                            queueName);
> {code}
> The following is the sequence to get -1 submit time:
> 1. 
> a job is created at MRAppMaster#serviceStart and  the new job is at state JobStateInternal.NEW after created
> {code}
>     job = createJob(getConfig(), forcedState, shutDownMessage);
> {code}
> 2.
> JobEventType.JOB_INIT is sent to JobImpl from MRAppMaster#serviceStart
> {code}
>       JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
>       // Send init to the job (this does NOT trigger job execution)
>       // This is a synchronous call, not an event through dispatcher. We want
>       // job-init to be done completely here.
>       jobEventDispatcher.handle(initJobEvent);
> {code}
> 3.
> after JobImpl received JobEventType.JOB_INIT, it call InitTransition#transition
> {code}
>           .addTransition
>               (JobStateInternal.NEW,
>               EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
>               JobEventType.JOB_INIT,
>               new InitTransition())
> {code}
> 4.
> then the exception happen from setup(job) in InitTransition#transition before JobSubmittedEvent is handled.
> JobSubmittedEvent will update the job submit time. Due to the exception, the submit time is still the initial value -1.
> This is the code InitTransition#transition
> {code}
> public JobStateInternal transition(JobImpl job, JobEvent event) {
>       job.metrics.submittedJob(job);
>       job.metrics.preparingJob(job);
>       if (job.newApiCommitter) {
>         job.jobContext = new JobContextImpl(job.conf, job.oldJobId);
>       } else {
>         job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(job.conf, job.oldJobId);
>       }
>       try {
>         setup(job);
>         job.fs = job.getFileSystem(job.conf);
>         //log to job history
>         JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
>               job.conf.get(MRJobConfig.JOB_NAME, "test"), 
>             job.conf.get(MRJobConfig.USER_NAME, "mapred"),
>             job.appSubmitTime,
>             job.remoteJobConfFile.toString(),
>             job.jobACLs, job.queueName,
>             job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
>             job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
>             job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
>             getWorkflowAdjacencies(job.conf),
>             job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
>         job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
>         //TODO JH Verify jobACLs, UserName via UGI?
>         TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
>         job.numMapTasks = taskSplitMetaInfo.length;
>         job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
>         if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
>           job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
>         } else if (job.numMapTasks == 0) {
>           job.reduceWeight = 0.9f;
>         } else if (job.numReduceTasks == 0) {
>           job.mapWeight = 0.9f;
>         } else {
>           job.mapWeight = job.reduceWeight = 0.45f;
>         }
>         checkTaskLimits();
>         long inputLength = 0;
>         for (int i = 0; i < job.numMapTasks; ++i) {
>           inputLength += taskSplitMetaInfo[i].getInputDataLength();
>         }
>         job.makeUberDecision(inputLength);
>         
>         job.taskAttemptCompletionEvents =
>             new ArrayList<TaskAttemptCompletionEvent>(
>                 job.numMapTasks + job.numReduceTasks + 10);
>         job.mapAttemptCompletionEvents =
>             new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
>         job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
>             job.numMapTasks + job.numReduceTasks + 10);
>         job.allowedMapFailuresPercent =
>             job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
>         job.allowedReduceFailuresPercent =
>             job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
>         // create the Tasks but don't start them yet
>         createMapTasks(job, inputLength, taskSplitMetaInfo);
>         createReduceTasks(job);
>         job.metrics.endPreparingJob(job);
>         return JobStateInternal.INITED;
>       } catch (Exception e) {
>         LOG.warn("Job init failed", e);
>         job.metrics.endPreparingJob(job);
>         job.addDiagnostic("Job init failed : "
>             + StringUtils.stringifyException(e));
>         // Leave job in the NEW state. The MR AM will detect that the state is
>         // not INITED and send a JOB_INIT_FAILED event.
>         return JobStateInternal.NEW;
>       }
>     }
> {code}
> This is the code JobImpl#setup
> {code}
>     protected void setup(JobImpl job) throws IOException {
>       String oldJobIDString = job.oldJobId.toString();
>       String user = 
>         UserGroupInformation.getCurrentUser().getShortUserName();
>       Path path = MRApps.getStagingAreaDir(job.conf, user);
>       if(LOG.isDebugEnabled()) {
>         LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
>       }
>       job.remoteJobSubmitDir =
>           FileSystem.get(job.conf).makeQualified(
>               new Path(path, oldJobIDString));
>       job.remoteJobConfFile =
>           new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
>       // Prepare the TaskAttemptListener server for authentication of Containers
>       // TaskAttemptListener gets the information via jobTokenSecretManager.
>       JobTokenIdentifier identifier =
>           new JobTokenIdentifier(new Text(oldJobIDString));
>       job.jobToken =
>           new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
>       job.jobToken.setService(identifier.getJobId());
>       // Add it to the jobTokenSecretManager so that TaskAttemptListener server
>       // can authenticate containers(tasks)
>       job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
>       LOG.info("Adding job token for " + oldJobIDString
>           + " to jobTokenSecretManager");
>       // If the job client did not setup the shuffle secret then reuse
>       // the job token secret for the shuffle.
>       if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
>         LOG.warn("Shuffle secret key missing from job credentials."
>             + " Using job token secret as shuffle secret.");
>         TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
>             job.jobCredentials);
>       }
>     }
> {code}
> 5.
> Due to the IOException from  JobImpl#setup, the new job is still at state JobStateInternal.NEW
> {code}
>       } catch (Exception e) {
>         LOG.warn("Job init failed", e);
>         job.metrics.endPreparingJob(job);
>         job.addDiagnostic("Job init failed : "
>             + StringUtils.stringifyException(e));
>         // Leave job in the NEW state. The MR AM will detect that the state is
>         // not INITED and send a JOB_INIT_FAILED event.
>         return JobStateInternal.NEW;
>       }
> {code}
> At the following code of MRAppMaster#serviceStart, The MR AM detect the state is not INITED and send a JOB_INIT_FAILED event.
> {code}
>       // If job is still not initialized, an error happened during
>       // initialization. Must complete starting all of the services so failure
>       // events can be processed.
>       initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);
>     if (initFailed) {
>       JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
>       jobEventDispatcher.handle(initFailedEvent);
>     } else {
>       // All components have started, start the job.
>       startJobs();
>     }
> {code}
> 6.
> After JobImpl receives the JOB_INIT_FAILED, it will call InitFailedTransition#transition and enter state JobStateInternal.FAIL_ABORT
> {code}
>           .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
>               JobEventType.JOB_INIT_FAILED,
>               new InitFailedTransition())
> {code}
> 7.
> JobImpl will send CommitterJobAbortEvent in  InitFailedTransition#transition 
> {code}
>     public void transition(JobImpl job, JobEvent event) {
>         job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
>                 job.jobContext,
>                 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
>     }
> {code}
> 8.
> CommitterJobAbortEvent will be handled by CommitterEventHandler#handleJobAbort which will send JobAbortCompletedEvent(JobEventType.JOB_ABORT_COMPLETED)
> {code}
>     protected void handleJobAbort(CommitterJobAbortEvent event) {
>       cancelJobCommit();
>       try {
>         committer.abortJob(event.getJobContext(), event.getFinalState());
>       } catch (Exception e) {
>         LOG.warn("Could not abort job", e);
>       }
>       context.getEventHandler().handle(new JobAbortCompletedEvent(
>           event.getJobID(), event.getFinalState()));
>     }
> {code}
> 9.
> After JobImpl receives the JOB_ABORT_COMPLETED, it will call JobAbortCompletedTransition#transition and enter state JobStateInternal.FAILED
> {code}
>           .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
>               JobEventType.JOB_ABORT_COMPLETED,
>               new JobAbortCompletedTransition())
> {code}
> 10.
> JobAbortCompletedTransition#transition will call JobImpl#unsuccessfulFinish which will send JobUnsuccessfulCompletionEvent with finish time.
> {code}
>     public void transition(JobImpl job, JobEvent event) {
>       JobStateInternal finalState = JobStateInternal.valueOf(
>           ((JobAbortCompletedEvent) event).getFinalState().name());
>       job.unsuccessfulFinish(finalState);
>     }
>   private void unsuccessfulFinish(JobStateInternal finalState) {
>       if (finishTime == 0) setFinishTime();
>       cleanupProgress = 1.0f;
>       JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
>           new JobUnsuccessfulCompletionEvent(oldJobId,
>               finishTime,
>               succeededMapTaskCount,
>               succeededReduceTaskCount,
>               finalState.toString(),
>               diagnostics);
>       eventHandler.handle(new JobHistoryEvent(jobId,
>           unsuccessfulJobEvent));
>       finished(finalState);
>   }
> {code}
> 11.
> JobUnsuccessfulCompletionEvent will be handled by JobHistoryEventHandler#handleEvent with type EventType.JOB_FAILED
> Based on the following code, you can see the JobIndexInfo#finishTime is set correctly but JobIndexInfo#submitTime and  JobIndexInfo#jobStartTime are still -1.
> {code}
>       if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
>           || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
>         try {
>           JobUnsuccessfulCompletionEvent jucEvent = 
>               (JobUnsuccessfulCompletionEvent) event
>               .getHistoryEvent();
>           mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
>           mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
>           mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
>           mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
>           closeEventWriter(event.getJobID());
>           processDoneFiles(event.getJobID());
>         } catch (IOException e) {
>           throw new YarnRuntimeException(e);
>         }
>       }
> {code}
> The error job history file name in our log is "job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist"
> Based on the filename, you can see submitTime is -1, finishTime is 1423572836007 and jobStartTime is 1423572836007.
> The jobStartTime is not -1, and  jobStartTime is the same as  finishTime.
> It is because jobStartTime is handled specially in FileNameIndexUtils#getDoneFileName:
> {code}
>     //JobStartTime
>     if (indexInfo.getJobStartTime() >= 0) {
>       sb.append(indexInfo.getJobStartTime());
>     } else {
>       sb.append(indexInfo.getFinishTime());
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)