You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/18 10:51:19 UTC

[2/4] flink git commit: [FLINK-7703] Port JobExceptionsHandler to new REST endpoint

[FLINK-7703] Port JobExceptionsHandler to new REST endpoint

This closes #4834.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb528a11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb528a11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb528a11

Branch: refs/heads/master
Commit: cb528a114b4f4bac04620f0dd6aeead773de0d0e
Parents: e14f2db
Author: zjureel <zj...@gmail.com>
Authored: Wed Oct 18 11:39:49 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 18 12:51:00 2017 +0200

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      |  11 ++
 .../rest/handler/job/JobExceptionsHandler.java  | 100 +++++++++++++
 .../rest/messages/JobExceptionsHeaders.java     |  70 +++++++++
 .../rest/messages/JobExceptionsInfo.java        | 141 +++++++++++++++++++
 .../messages/JobExceptionsInfoNoRootTest.java   |  52 +++++++
 .../rest/messages/JobExceptionsInfoTest.java    |  52 +++++++
 6 files changed, 426 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index c23bb98..ac4897b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
@@ -53,6 +54,7 @@ import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
 import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
+import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
@@ -207,6 +209,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			executor,
 			checkpointStatsCache);
 
+		JobExceptionsHandler jobExceptionsHandler = new JobExceptionsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			JobExceptionsHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -233,6 +243,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler));
 		handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), jobPlanHandler));
 		handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler));
+		handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
 
 		BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout);
 		handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
new file mode 100644
index 0000000..feabbea
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler serving the job exceptions.
+ */
+public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExceptionsInfo, JobMessageParameters> {
+
+	static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
+
+	public JobExceptionsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			MessageHeaders<EmptyRequestBody, JobExceptionsInfo, JobMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor) {
+
+		super(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			messageHeaders,
+			executionGraphCache,
+			executor);
+	}
+
+	@Override
+	protected JobExceptionsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) {
+		ErrorInfo rootException = executionGraph.getFailureCause();
+		String rootExceptionMessage = null;
+		Long rootTimestamp = null;
+		if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
+			rootExceptionMessage = rootException.getExceptionAsString();
+			rootTimestamp = rootException.getTimestamp();
+		}
+
+		List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new ArrayList<>();
+		boolean truncated = false;
+		for (AccessExecutionVertex task : executionGraph.getAllExecutionVertices()) {
+			String t = task.getFailureCauseAsString();
+			if (t != null && !t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
+				if (taskExceptionList.size() >= MAX_NUMBER_EXCEPTION_TO_REPORT) {
+					truncated = true;
+					break;
+				}
+
+				TaskManagerLocation location = task.getCurrentAssignedResourceLocation();
+				String locationString = location != null ?
+					location.getFQDNHostname() + ':' + location.dataPort() : "(unassigned)";
+				long timestamp = task.getStateTimestamp(ExecutionState.FAILED);
+				taskExceptionList.add(new JobExceptionsInfo.ExecutionExceptionInfo(
+					t,
+					task.getTaskNameWithSubtaskIndex(),
+					locationString,
+					timestamp == 0 ? -1 : timestamp));
+			}
+		}
+
+		return new JobExceptionsInfo(rootExceptionMessage, rootTimestamp, taskExceptionList, truncated);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java
new file mode 100644
index 0000000..7b924b3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobExceptionsHandler}.
+ */
+public class JobExceptionsHeaders implements MessageHeaders<EmptyRequestBody, JobExceptionsInfo, JobMessageParameters> {
+
+	private static final JobExceptionsHeaders INSTANCE = new JobExceptionsHeaders();
+
+	public static final String URL = "/jobs/:jobid/exceptions";
+
+	private JobExceptionsHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<JobExceptionsInfo> getResponseClass() {
+		return JobExceptionsInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public JobMessageParameters getUnresolvedMessageParameters() {
+		return new JobMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static JobExceptionsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
new file mode 100644
index 0000000..83b1134
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobExceptionsHandler}.
+ */
+public class JobExceptionsInfo implements ResponseBody {
+
+	public static final String FIELD_NAME_ROOT_EXCEPTION = "root-exception";
+	public static final String FIELD_NAME_TIMESTAMP = "timestamp";
+	public static final String FIELD_NAME_ALL_EXCEPTIONS = "all-exceptions";
+	public static final String FIELD_NAME_TRUNCATED = "truncated";
+
+	@JsonProperty(FIELD_NAME_ROOT_EXCEPTION)
+	private final String rootException;
+
+	@JsonProperty(FIELD_NAME_TIMESTAMP)
+	private final Long rootTimestamp;
+
+	@JsonProperty(FIELD_NAME_ALL_EXCEPTIONS)
+	private final List<ExecutionExceptionInfo> allExceptions;
+
+	@JsonProperty(FIELD_NAME_TRUNCATED)
+	private final boolean truncated;
+
+	@JsonCreator
+	public JobExceptionsInfo(
+		@JsonProperty(FIELD_NAME_ROOT_EXCEPTION) String rootException,
+		@JsonProperty(FIELD_NAME_TIMESTAMP) Long rootTimestamp,
+		@JsonProperty(FIELD_NAME_ALL_EXCEPTIONS) List<ExecutionExceptionInfo> allExceptions,
+		@JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) {
+		this.rootException = rootException;
+		this.rootTimestamp = rootTimestamp;
+		this.allExceptions = Preconditions.checkNotNull(allExceptions);
+		this.truncated = truncated;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		JobExceptionsInfo that = (JobExceptionsInfo) o;
+		return truncated == that.truncated &&
+			Objects.equals(rootException, that.rootException) &&
+			Objects.equals(rootTimestamp, that.rootTimestamp) &&
+			Objects.equals(allExceptions, that.allExceptions);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(rootException, rootTimestamp, allExceptions, truncated);
+	}
+
+	//---------------------------------------------------------------------------------
+	// Static helper classes
+	//---------------------------------------------------------------------------------
+
+	/**
+	 * Nested class to encapsulate the task execution exception.
+	 */
+	public static final class ExecutionExceptionInfo {
+		public static final String FIELD_NAME_EXCEPTION = "exception";
+		public static final String FIELD_NAME_TASK = "task";
+		public static final String FIELD_NAME_LOCATION = "location";
+		public static final String FIELD_NAME_TIMESTAMP = "timestamp";
+
+		@JsonProperty(FIELD_NAME_EXCEPTION)
+		private final String exception;
+
+		@JsonProperty(FIELD_NAME_TASK)
+		private final String task;
+
+		@JsonProperty(FIELD_NAME_LOCATION)
+		private final String location;
+
+		@JsonProperty(FIELD_NAME_TIMESTAMP)
+		private final long timestamp;
+
+		@JsonCreator
+		public ExecutionExceptionInfo(
+			@JsonProperty(FIELD_NAME_EXCEPTION) String exception,
+			@JsonProperty(FIELD_NAME_TASK) String task,
+			@JsonProperty(FIELD_NAME_LOCATION) String location,
+			@JsonProperty(FIELD_NAME_TIMESTAMP) long timestamp) {
+			this.exception = Preconditions.checkNotNull(exception);
+			this.task = Preconditions.checkNotNull(task);
+			this.location = Preconditions.checkNotNull(location);
+			this.timestamp = timestamp;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			JobExceptionsInfo.ExecutionExceptionInfo that = (JobExceptionsInfo.ExecutionExceptionInfo) o;
+			return timestamp == that.timestamp &&
+				Objects.equals(exception, that.exception) &&
+				Objects.equals(task, that.task) &&
+				Objects.equals(location, that.location);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(timestamp, exception, task, location);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java
new file mode 100644
index 0000000..a3c23f0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests that the {@link JobExceptionsInfo} with no root exception can be marshalled and unmarshalled.
+ */
+public class JobExceptionsInfoNoRootTest extends RestResponseMarshallingTestBase<JobExceptionsInfo> {
+	@Override
+	protected Class<JobExceptionsInfo> getTestResponseClass() {
+		return JobExceptionsInfo.class;
+	}
+
+	@Override
+	protected JobExceptionsInfo getTestResponseInstance() throws Exception {
+		List<JobExceptionsInfo.ExecutionExceptionInfo> executionTaskExceptionInfoList = new ArrayList<>();
+		executionTaskExceptionInfoList.add(new JobExceptionsInfo.ExecutionExceptionInfo(
+			"exception1",
+			"task1",
+			"location1",
+			System.currentTimeMillis()));
+		executionTaskExceptionInfoList.add(new JobExceptionsInfo.ExecutionExceptionInfo(
+			"exception2",
+			"task2",
+			"location2",
+			System.currentTimeMillis()));
+		return new JobExceptionsInfo(
+			null,
+			null,
+			executionTaskExceptionInfoList,
+			false);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java
new file mode 100644
index 0000000..b8f3baa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests that the {@link JobExceptionsInfo} can be marshalled and unmarshalled.
+ */
+public class JobExceptionsInfoTest extends RestResponseMarshallingTestBase<JobExceptionsInfo>  {
+	@Override
+	protected Class<JobExceptionsInfo> getTestResponseClass() {
+		return JobExceptionsInfo.class;
+	}
+
+	@Override
+	protected JobExceptionsInfo getTestResponseInstance() throws Exception {
+		List<JobExceptionsInfo.ExecutionExceptionInfo> executionTaskExceptionInfoList = new ArrayList<>();
+		executionTaskExceptionInfoList.add(new JobExceptionsInfo.ExecutionExceptionInfo(
+			"exception1",
+			"task1",
+			"location1",
+			System.currentTimeMillis()));
+		executionTaskExceptionInfoList.add(new JobExceptionsInfo.ExecutionExceptionInfo(
+			"exception2",
+			"task2",
+			"location2",
+			System.currentTimeMillis()));
+		return new JobExceptionsInfo(
+			"root exception",
+			System.currentTimeMillis(),
+			executionTaskExceptionInfoList,
+			false);
+	}
+}