You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "Junping Du (JIRA)" <ji...@apache.org> on 2017/01/06 00:58:00 UTC
[jira] [Updated] (MAPREDUCE-6259) IllegalArgumentException due to
missing job submit time
[ https://issues.apache.org/jira/browse/MAPREDUCE-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Junping Du updated MAPREDUCE-6259:
----------------------------------
Fix Version/s: 2.8.0
> IllegalArgumentException due to missing job submit time
> -------------------------------------------------------
>
> Key: MAPREDUCE-6259
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-6259
> Project: Hadoop Map/Reduce
> Issue Type: Bug
> Components: jobhistoryserver
> Reporter: zhihai xu
> Assignee: zhihai xu
> Fix For: 2.8.0, 2.7.1, 3.0.0-alpha1
>
> Attachments: MAPREDUCE-6259.000.patch
>
>
> -1 job submit time cause IllegalArgumentException when parse the Job history file name and JOB_INIT_FAILED cause -1 job submit time in JobIndexInfo.
> We found the following job history file name which cause IllegalArgumentException when parse the job status in the job history file name.
> {code}
> job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist
> {code}
> The stack trace for the IllegalArgumentException is
> {code}
> 2015-02-10 04:54:01,863 WARN org.apache.hadoop.mapreduce.v2.hs.PartialJob: Exception while parsing job state. Defaulting to KILLED
> java.lang.IllegalArgumentException: No enum constant org.apache.hadoop.mapreduce.v2.api.records.JobState.0
> at java.lang.Enum.valueOf(Enum.java:236)
> at org.apache.hadoop.mapreduce.v2.api.records.JobState.valueOf(JobState.java:21)
> at org.apache.hadoop.mapreduce.v2.hs.PartialJob.getState(PartialJob.java:82)
> at org.apache.hadoop.mapreduce.v2.hs.PartialJob.<init>(PartialJob.java:59)
> at org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage.getAllPartialJobs(CachedHistoryStorage.java:159)
> at org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage.getPartialJobs(CachedHistoryStorage.java:173)
> at org.apache.hadoop.mapreduce.v2.hs.JobHistory.getPartialJobs(JobHistory.java:284)
> at org.apache.hadoop.mapreduce.v2.hs.webapp.HsWebServices.getJobs(HsWebServices.java:212)
> at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60)
> at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$TypeOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:185)
> at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)
> at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:288)
> at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
> at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)
> at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
> at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)
> at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1469)
> at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1400)
> at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1349)
> at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1339)
> at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:416)
> at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:537)
> at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:886)
> at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834)
> at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795)
> at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163)
> at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58)
> at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118)
> at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113)
> at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> at org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109)
> at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> at org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1223)
> at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
> at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
> at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
> at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
> at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
> at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
> at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:767)
> at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
> at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
> at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
> at org.mortbay.jetty.Server.handle(Server.java:326)
> at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
> at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)
> at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)
> at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
> at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
> at org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410)
> at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
> {code}
> when IOException happened in JobImpl#setup, the Job submit time in JobHistoryEventHandler#MetaInfo#JobIndexInfo will not be changed and the Job submit time will be its [initial value -1|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java#L1185].
> {code}
> this.jobIndexInfo =
> new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
> queueName);
> {code}
> The following is the sequences to get -1 job submit time:
> 1.
> a job is created at MRAppMaster#serviceStart and the new job is at state JobStateInternal.NEW after created
> {code}
> job = createJob(getConfig(), forcedState, shutDownMessage);
> {code}
> 2.
> JobEventType.JOB_INIT is sent to JobImpl from MRAppMaster#serviceStart
> {code}
> JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
> // Send init to the job (this does NOT trigger job execution)
> // This is a synchronous call, not an event through dispatcher. We want
> // job-init to be done completely here.
> jobEventDispatcher.handle(initJobEvent);
> {code}
> 3.
> after JobImpl received JobEventType.JOB_INIT, it call InitTransition#transition
> {code}
> .addTransition
> (JobStateInternal.NEW,
> EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
> JobEventType.JOB_INIT,
> new InitTransition())
> {code}
> 4.
> then the exception happen from setup(job) in InitTransition#transition before JobSubmittedEvent is handled.
> JobSubmittedEvent will update the job submit time. Due to the exception, the submit time is still the initial value -1.
> This is the code InitTransition#transition
> {code}
> public JobStateInternal transition(JobImpl job, JobEvent event) {
> job.metrics.submittedJob(job);
> job.metrics.preparingJob(job);
> if (job.newApiCommitter) {
> job.jobContext = new JobContextImpl(job.conf, job.oldJobId);
> } else {
> job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(job.conf, job.oldJobId);
> }
> try {
> setup(job);
> job.fs = job.getFileSystem(job.conf);
> //log to job history
> JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
> job.conf.get(MRJobConfig.JOB_NAME, "test"),
> job.conf.get(MRJobConfig.USER_NAME, "mapred"),
> job.appSubmitTime,
> job.remoteJobConfFile.toString(),
> job.jobACLs, job.queueName,
> job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
> job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
> job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
> getWorkflowAdjacencies(job.conf),
> job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
> job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
> //TODO JH Verify jobACLs, UserName via UGI?
> TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
> job.numMapTasks = taskSplitMetaInfo.length;
> job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
> if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
> job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
> } else if (job.numMapTasks == 0) {
> job.reduceWeight = 0.9f;
> } else if (job.numReduceTasks == 0) {
> job.mapWeight = 0.9f;
> } else {
> job.mapWeight = job.reduceWeight = 0.45f;
> }
> checkTaskLimits();
> long inputLength = 0;
> for (int i = 0; i < job.numMapTasks; ++i) {
> inputLength += taskSplitMetaInfo[i].getInputDataLength();
> }
> job.makeUberDecision(inputLength);
>
> job.taskAttemptCompletionEvents =
> new ArrayList<TaskAttemptCompletionEvent>(
> job.numMapTasks + job.numReduceTasks + 10);
> job.mapAttemptCompletionEvents =
> new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
> job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
> job.numMapTasks + job.numReduceTasks + 10);
> job.allowedMapFailuresPercent =
> job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
> job.allowedReduceFailuresPercent =
> job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
> // create the Tasks but don't start them yet
> createMapTasks(job, inputLength, taskSplitMetaInfo);
> createReduceTasks(job);
> job.metrics.endPreparingJob(job);
> return JobStateInternal.INITED;
> } catch (Exception e) {
> LOG.warn("Job init failed", e);
> job.metrics.endPreparingJob(job);
> job.addDiagnostic("Job init failed : "
> + StringUtils.stringifyException(e));
> // Leave job in the NEW state. The MR AM will detect that the state is
> // not INITED and send a JOB_INIT_FAILED event.
> return JobStateInternal.NEW;
> }
> }
> {code}
> This is the code JobImpl#setup
> {code}
> protected void setup(JobImpl job) throws IOException {
> String oldJobIDString = job.oldJobId.toString();
> String user =
> UserGroupInformation.getCurrentUser().getShortUserName();
> Path path = MRApps.getStagingAreaDir(job.conf, user);
> if(LOG.isDebugEnabled()) {
> LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
> }
> job.remoteJobSubmitDir =
> FileSystem.get(job.conf).makeQualified(
> new Path(path, oldJobIDString));
> job.remoteJobConfFile =
> new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
> // Prepare the TaskAttemptListener server for authentication of Containers
> // TaskAttemptListener gets the information via jobTokenSecretManager.
> JobTokenIdentifier identifier =
> new JobTokenIdentifier(new Text(oldJobIDString));
> job.jobToken =
> new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
> job.jobToken.setService(identifier.getJobId());
> // Add it to the jobTokenSecretManager so that TaskAttemptListener server
> // can authenticate containers(tasks)
> job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
> LOG.info("Adding job token for " + oldJobIDString
> + " to jobTokenSecretManager");
> // If the job client did not setup the shuffle secret then reuse
> // the job token secret for the shuffle.
> if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
> LOG.warn("Shuffle secret key missing from job credentials."
> + " Using job token secret as shuffle secret.");
> TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
> job.jobCredentials);
> }
> }
> {code}
> 5.
> Due to the IOException from JobImpl#setup, the new job is still at state JobStateInternal.NEW
> {code}
> } catch (Exception e) {
> LOG.warn("Job init failed", e);
> job.metrics.endPreparingJob(job);
> job.addDiagnostic("Job init failed : "
> + StringUtils.stringifyException(e));
> // Leave job in the NEW state. The MR AM will detect that the state is
> // not INITED and send a JOB_INIT_FAILED event.
> return JobStateInternal.NEW;
> }
> {code}
> At the following code of MRAppMaster#serviceStart, The MR AM detect the state is not INITED and send a JOB_INIT_FAILED event.
> {code}
> // If job is still not initialized, an error happened during
> // initialization. Must complete starting all of the services so failure
> // events can be processed.
> initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);
> if (initFailed) {
> JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
> jobEventDispatcher.handle(initFailedEvent);
> } else {
> // All components have started, start the job.
> startJobs();
> }
> {code}
> 6.
> After JobImpl receives the JOB_INIT_FAILED, it will call InitFailedTransition#transition and enter state JobStateInternal.FAIL_ABORT
> {code}
> .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
> JobEventType.JOB_INIT_FAILED,
> new InitFailedTransition())
> {code}
> 7.
> JobImpl will send CommitterJobAbortEvent in InitFailedTransition#transition
> {code}
> public void transition(JobImpl job, JobEvent event) {
> job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
> job.jobContext,
> org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
> }
> {code}
> 8.
> CommitterJobAbortEvent will be handled by CommitterEventHandler#handleJobAbort which will send JobAbortCompletedEvent(JobEventType.JOB_ABORT_COMPLETED)
> {code}
> protected void handleJobAbort(CommitterJobAbortEvent event) {
> cancelJobCommit();
> try {
> committer.abortJob(event.getJobContext(), event.getFinalState());
> } catch (Exception e) {
> LOG.warn("Could not abort job", e);
> }
> context.getEventHandler().handle(new JobAbortCompletedEvent(
> event.getJobID(), event.getFinalState()));
> }
> {code}
> 9.
> After JobImpl receives the JOB_ABORT_COMPLETED, it will call JobAbortCompletedTransition#transition and enter state JobStateInternal.FAILED
> {code}
> .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
> JobEventType.JOB_ABORT_COMPLETED,
> new JobAbortCompletedTransition())
> {code}
> 10.
> JobAbortCompletedTransition#transition will call JobImpl#unsuccessfulFinish which will send JobUnsuccessfulCompletionEvent with finish time.
> {code}
> public void transition(JobImpl job, JobEvent event) {
> JobStateInternal finalState = JobStateInternal.valueOf(
> ((JobAbortCompletedEvent) event).getFinalState().name());
> job.unsuccessfulFinish(finalState);
> }
> private void unsuccessfulFinish(JobStateInternal finalState) {
> if (finishTime == 0) setFinishTime();
> cleanupProgress = 1.0f;
> JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
> new JobUnsuccessfulCompletionEvent(oldJobId,
> finishTime,
> succeededMapTaskCount,
> succeededReduceTaskCount,
> finalState.toString(),
> diagnostics);
> eventHandler.handle(new JobHistoryEvent(jobId,
> unsuccessfulJobEvent));
> finished(finalState);
> }
> {code}
> 11.
> JobUnsuccessfulCompletionEvent will be handled by JobHistoryEventHandler#handleEvent with type EventType.JOB_FAILED
> Based on the following code, you can see the JobIndexInfo#finishTime is set correctly but JobIndexInfo#submitTime and JobIndexInfo#jobStartTime are still -1.
> {code}
> if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
> || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
> try {
> JobUnsuccessfulCompletionEvent jucEvent =
> (JobUnsuccessfulCompletionEvent) event
> .getHistoryEvent();
> mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
> mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
> mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
> mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
> closeEventWriter(event.getJobID());
> processDoneFiles(event.getJobID());
> } catch (IOException e) {
> throw new YarnRuntimeException(e);
> }
> }
> {code}
> The error job history file name in our log is "job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist"
> Based on the filename, you can see submitTime is -1, finishTime is 1423572836007 and jobStartTime is 1423572836007.
> The jobStartTime is not -1, and jobStartTime is the same as finishTime.
> It is because jobStartTime is handled specially in FileNameIndexUtils#getDoneFileName:
> {code}
> //JobStartTime
> if (indexInfo.getJobStartTime() >= 0) {
> sb.append(indexInfo.getJobStartTime());
> } else {
> sb.append(indexInfo.getFinishTime());
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-help@hadoop.apache.org