You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "Junping Du (JIRA)" <ji...@apache.org> on 2017/01/06 00:58:00 UTC

[jira] [Updated] (MAPREDUCE-6259) IllegalArgumentException due to missing job submit time

     [ https://issues.apache.org/jira/browse/MAPREDUCE-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Junping Du updated MAPREDUCE-6259:
----------------------------------
    Fix Version/s: 2.8.0

> IllegalArgumentException due to missing job submit time
> -------------------------------------------------------
>
>                 Key: MAPREDUCE-6259
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6259
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: jobhistoryserver
>            Reporter: zhihai xu
>            Assignee: zhihai xu
>             Fix For: 2.8.0, 2.7.1, 3.0.0-alpha1
>
>         Attachments: MAPREDUCE-6259.000.patch
>
>
> -1 job submit time cause IllegalArgumentException when parse the Job history file name and JOB_INIT_FAILED cause -1 job submit time in JobIndexInfo.
> We found the following job history file name which cause IllegalArgumentException when parse the job status in the job history file name.
> {code}
> job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist
> {code}
> The stack trace for the IllegalArgumentException is
> {code}
> 2015-02-10 04:54:01,863 WARN org.apache.hadoop.mapreduce.v2.hs.PartialJob: Exception while parsing job state. Defaulting to KILLED
> java.lang.IllegalArgumentException: No enum constant org.apache.hadoop.mapreduce.v2.api.records.JobState.0
> 	at java.lang.Enum.valueOf(Enum.java:236)
> 	at org.apache.hadoop.mapreduce.v2.api.records.JobState.valueOf(JobState.java:21)
> 	at org.apache.hadoop.mapreduce.v2.hs.PartialJob.getState(PartialJob.java:82)
> 	at org.apache.hadoop.mapreduce.v2.hs.PartialJob.<init>(PartialJob.java:59)
> 	at org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage.getAllPartialJobs(CachedHistoryStorage.java:159)
> 	at org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage.getPartialJobs(CachedHistoryStorage.java:173)
> 	at org.apache.hadoop.mapreduce.v2.hs.JobHistory.getPartialJobs(JobHistory.java:284)
> 	at org.apache.hadoop.mapreduce.v2.hs.webapp.HsWebServices.getJobs(HsWebServices.java:212)
> 	at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60)
> 	at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$TypeOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:185)
> 	at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)
> 	at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:288)
> 	at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
> 	at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)
> 	at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
> 	at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)
> 	at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1469)
> 	at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1400)
> 	at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1349)
> 	at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1339)
> 	at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:416)
> 	at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:537)
> 	at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:886)
> 	at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834)
> 	at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795)
> 	at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163)
> 	at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58)
> 	at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118)
> 	at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113)
> 	at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> 	at org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109)
> 	at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> 	at org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1223)
> 	at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> 	at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
> 	at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> 	at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
> 	at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> 	at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
> 	at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
> 	at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
> 	at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:767)
> 	at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
> 	at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
> 	at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
> 	at org.mortbay.jetty.Server.handle(Server.java:326)
> 	at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
> 	at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)
> 	at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)
> 	at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
> 	at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
> 	at org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410)
> 	at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
> {code}
> when IOException happened in JobImpl#setup, the Job submit time in JobHistoryEventHandler#MetaInfo#JobIndexInfo will not be changed and the Job submit time will be its [initial value -1|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java#L1185].
> {code}
>       this.jobIndexInfo =
>           new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
>                            queueName);
> {code}
> The following is the sequences to get -1 job submit time:
> 1. 
> a job is created at MRAppMaster#serviceStart and  the new job is at state JobStateInternal.NEW after created
> {code}
>     job = createJob(getConfig(), forcedState, shutDownMessage);
> {code}
> 2.
> JobEventType.JOB_INIT is sent to JobImpl from MRAppMaster#serviceStart
> {code}
>       JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
>       // Send init to the job (this does NOT trigger job execution)
>       // This is a synchronous call, not an event through dispatcher. We want
>       // job-init to be done completely here.
>       jobEventDispatcher.handle(initJobEvent);
> {code}
> 3.
> after JobImpl received JobEventType.JOB_INIT, it call InitTransition#transition
> {code}
>           .addTransition
>               (JobStateInternal.NEW,
>               EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
>               JobEventType.JOB_INIT,
>               new InitTransition())
> {code}
> 4.
> then the exception happen from setup(job) in InitTransition#transition before JobSubmittedEvent is handled.
> JobSubmittedEvent will update the job submit time. Due to the exception, the submit time is still the initial value -1.
> This is the code InitTransition#transition
> {code}
> public JobStateInternal transition(JobImpl job, JobEvent event) {
>       job.metrics.submittedJob(job);
>       job.metrics.preparingJob(job);
>       if (job.newApiCommitter) {
>         job.jobContext = new JobContextImpl(job.conf, job.oldJobId);
>       } else {
>         job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(job.conf, job.oldJobId);
>       }
>       try {
>         setup(job);
>         job.fs = job.getFileSystem(job.conf);
>         //log to job history
>         JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
>               job.conf.get(MRJobConfig.JOB_NAME, "test"), 
>             job.conf.get(MRJobConfig.USER_NAME, "mapred"),
>             job.appSubmitTime,
>             job.remoteJobConfFile.toString(),
>             job.jobACLs, job.queueName,
>             job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
>             job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
>             job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
>             getWorkflowAdjacencies(job.conf),
>             job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
>         job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
>         //TODO JH Verify jobACLs, UserName via UGI?
>         TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
>         job.numMapTasks = taskSplitMetaInfo.length;
>         job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
>         if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
>           job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
>         } else if (job.numMapTasks == 0) {
>           job.reduceWeight = 0.9f;
>         } else if (job.numReduceTasks == 0) {
>           job.mapWeight = 0.9f;
>         } else {
>           job.mapWeight = job.reduceWeight = 0.45f;
>         }
>         checkTaskLimits();
>         long inputLength = 0;
>         for (int i = 0; i < job.numMapTasks; ++i) {
>           inputLength += taskSplitMetaInfo[i].getInputDataLength();
>         }
>         job.makeUberDecision(inputLength);
>         
>         job.taskAttemptCompletionEvents =
>             new ArrayList<TaskAttemptCompletionEvent>(
>                 job.numMapTasks + job.numReduceTasks + 10);
>         job.mapAttemptCompletionEvents =
>             new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
>         job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
>             job.numMapTasks + job.numReduceTasks + 10);
>         job.allowedMapFailuresPercent =
>             job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
>         job.allowedReduceFailuresPercent =
>             job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
>         // create the Tasks but don't start them yet
>         createMapTasks(job, inputLength, taskSplitMetaInfo);
>         createReduceTasks(job);
>         job.metrics.endPreparingJob(job);
>         return JobStateInternal.INITED;
>       } catch (Exception e) {
>         LOG.warn("Job init failed", e);
>         job.metrics.endPreparingJob(job);
>         job.addDiagnostic("Job init failed : "
>             + StringUtils.stringifyException(e));
>         // Leave job in the NEW state. The MR AM will detect that the state is
>         // not INITED and send a JOB_INIT_FAILED event.
>         return JobStateInternal.NEW;
>       }
>     }
> {code}
> This is the code JobImpl#setup
> {code}
>     protected void setup(JobImpl job) throws IOException {
>       String oldJobIDString = job.oldJobId.toString();
>       String user = 
>         UserGroupInformation.getCurrentUser().getShortUserName();
>       Path path = MRApps.getStagingAreaDir(job.conf, user);
>       if(LOG.isDebugEnabled()) {
>         LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
>       }
>       job.remoteJobSubmitDir =
>           FileSystem.get(job.conf).makeQualified(
>               new Path(path, oldJobIDString));
>       job.remoteJobConfFile =
>           new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
>       // Prepare the TaskAttemptListener server for authentication of Containers
>       // TaskAttemptListener gets the information via jobTokenSecretManager.
>       JobTokenIdentifier identifier =
>           new JobTokenIdentifier(new Text(oldJobIDString));
>       job.jobToken =
>           new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
>       job.jobToken.setService(identifier.getJobId());
>       // Add it to the jobTokenSecretManager so that TaskAttemptListener server
>       // can authenticate containers(tasks)
>       job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
>       LOG.info("Adding job token for " + oldJobIDString
>           + " to jobTokenSecretManager");
>       // If the job client did not setup the shuffle secret then reuse
>       // the job token secret for the shuffle.
>       if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
>         LOG.warn("Shuffle secret key missing from job credentials."
>             + " Using job token secret as shuffle secret.");
>         TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
>             job.jobCredentials);
>       }
>     }
> {code}
> 5.
> Due to the IOException from  JobImpl#setup, the new job is still at state JobStateInternal.NEW
> {code}
>       } catch (Exception e) {
>         LOG.warn("Job init failed", e);
>         job.metrics.endPreparingJob(job);
>         job.addDiagnostic("Job init failed : "
>             + StringUtils.stringifyException(e));
>         // Leave job in the NEW state. The MR AM will detect that the state is
>         // not INITED and send a JOB_INIT_FAILED event.
>         return JobStateInternal.NEW;
>       }
> {code}
> At the following code of MRAppMaster#serviceStart, The MR AM detect the state is not INITED and send a JOB_INIT_FAILED event.
> {code}
>       // If job is still not initialized, an error happened during
>       // initialization. Must complete starting all of the services so failure
>       // events can be processed.
>       initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);
>     if (initFailed) {
>       JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
>       jobEventDispatcher.handle(initFailedEvent);
>     } else {
>       // All components have started, start the job.
>       startJobs();
>     }
> {code}
> 6.
> After JobImpl receives the JOB_INIT_FAILED, it will call InitFailedTransition#transition and enter state JobStateInternal.FAIL_ABORT
> {code}
>           .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
>               JobEventType.JOB_INIT_FAILED,
>               new InitFailedTransition())
> {code}
> 7.
> JobImpl will send CommitterJobAbortEvent in  InitFailedTransition#transition 
> {code}
>     public void transition(JobImpl job, JobEvent event) {
>         job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
>                 job.jobContext,
>                 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
>     }
> {code}
> 8.
> CommitterJobAbortEvent will be handled by CommitterEventHandler#handleJobAbort which will send JobAbortCompletedEvent(JobEventType.JOB_ABORT_COMPLETED)
> {code}
>     protected void handleJobAbort(CommitterJobAbortEvent event) {
>       cancelJobCommit();
>       try {
>         committer.abortJob(event.getJobContext(), event.getFinalState());
>       } catch (Exception e) {
>         LOG.warn("Could not abort job", e);
>       }
>       context.getEventHandler().handle(new JobAbortCompletedEvent(
>           event.getJobID(), event.getFinalState()));
>     }
> {code}
> 9.
> After JobImpl receives the JOB_ABORT_COMPLETED, it will call JobAbortCompletedTransition#transition and enter state JobStateInternal.FAILED
> {code}
>           .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
>               JobEventType.JOB_ABORT_COMPLETED,
>               new JobAbortCompletedTransition())
> {code}
> 10.
> JobAbortCompletedTransition#transition will call JobImpl#unsuccessfulFinish which will send JobUnsuccessfulCompletionEvent with finish time.
> {code}
>     public void transition(JobImpl job, JobEvent event) {
>       JobStateInternal finalState = JobStateInternal.valueOf(
>           ((JobAbortCompletedEvent) event).getFinalState().name());
>       job.unsuccessfulFinish(finalState);
>     }
>   private void unsuccessfulFinish(JobStateInternal finalState) {
>       if (finishTime == 0) setFinishTime();
>       cleanupProgress = 1.0f;
>       JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
>           new JobUnsuccessfulCompletionEvent(oldJobId,
>               finishTime,
>               succeededMapTaskCount,
>               succeededReduceTaskCount,
>               finalState.toString(),
>               diagnostics);
>       eventHandler.handle(new JobHistoryEvent(jobId,
>           unsuccessfulJobEvent));
>       finished(finalState);
>   }
> {code}
> 11.
> JobUnsuccessfulCompletionEvent will be handled by JobHistoryEventHandler#handleEvent with type EventType.JOB_FAILED
> Based on the following code, you can see the JobIndexInfo#finishTime is set correctly but JobIndexInfo#submitTime and  JobIndexInfo#jobStartTime are still -1.
> {code}
>       if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
>           || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
>         try {
>           JobUnsuccessfulCompletionEvent jucEvent = 
>               (JobUnsuccessfulCompletionEvent) event
>               .getHistoryEvent();
>           mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
>           mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
>           mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
>           mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
>           closeEventWriter(event.getJobID());
>           processDoneFiles(event.getJobID());
>         } catch (IOException e) {
>           throw new YarnRuntimeException(e);
>         }
>       }
> {code}
> The error job history file name in our log is "job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist"
> Based on the filename, you can see submitTime is -1, finishTime is 1423572836007 and jobStartTime is 1423572836007.
> The jobStartTime is not -1, and  jobStartTime is the same as  finishTime.
> It is because jobStartTime is handled specially in FileNameIndexUtils#getDoneFileName:
> {code}
>     //JobStartTime
>     if (indexInfo.getJobStartTime() >= 0) {
>       sb.append(indexInfo.getJobStartTime());
>     } else {
>       sb.append(indexInfo.getFinishTime());
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-help@hadoop.apache.org