You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zjureel <gi...@git.apache.org> on 2017/10/17 04:55:33 UTC
[GitHub] flink pull request #4843: [FLINK-7703] Port JobExceptionsHandler to new REST...
GitHub user zjureel opened a pull request:
https://github.com/apache/flink/pull/4843
[FLINK-7703] Port JobExceptionsHandler to new REST endpoint
## What is the purpose of the change
Port JobExceptionsHandler to new REST endpoint
## Brief change log
- *Create JobExceptionsHandler/JobExceptionsInfo/JobExceptionsHeaders class*
- *Add JobExceptionsHandler to DispatcherRestEndpoint*
## Verifying this change
This change added tests and can be verified as follows:
- *Add test case JobExceptionsInfoTest for JobExceptionsInfo*
- *Add test case JobExceptionsInfoNoRootTest for JobExceptionsInfo with no root exception*
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zjureel/flink FLINK-7703
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4843.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4843
----
commit 30b7281303400047f602fc20bb2dd39a6011d2cd
Author: zjureel <zj...@gmail.com>
Date: 2017-10-17T03:34:07Z
[FLINK-7703] Port JobExceptionsHandler to new REST endpoint
commit 2994bdc7ce5f8985cb48cb72a8063bd910b24148
Author: zjureel <zj...@gmail.com>
Date: 2017-10-17T03:34:27Z
[FLINK-7703] add test case for JobExceptionsInfo
----
---
[GitHub] flink pull request #4843: [FLINK-7703] Port JobExceptionsHandler to new REST...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4843#discussion_r145111385
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobExceptionsHandler}.
+ */
+@JsonSerialize(using = JobExceptionsInfo.Serializer.class)
+@JsonDeserialize(using = JobExceptionsInfo.Deserializer.class)
+public class JobExceptionsInfo implements ResponseBody {
+
+ public static final String FIELD_NAME_JOB_ID = "jid";
+ public static final String FIELD_NAME_JOB_NAME = "name";
+ public static final String FIELD_NAME_EXECUTION_EXCEPTIONS = "execution-exceptions";
+
+ private final JobID jobId;
+
+ private final String jobName;
+
+ @Nullable
+ private final ExecutionExceptionsInfo executionExceptionsInfo;
+
+ public JobExceptionsInfo(JobID jobId, String jobName, @Nullable ExecutionExceptionsInfo executionExceptionsInfo) {
+ this.jobId = Preconditions.checkNotNull(jobId);
+ this.jobName = Preconditions.checkNotNull(jobName);
+ this.executionExceptionsInfo = executionExceptionsInfo;
+ }
+
+ public JobID getJobId() {
+ return jobId;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ @Nullable
+ public ExecutionExceptionsInfo getExecutionExceptionsInfo() {
+ return executionExceptionsInfo;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobExceptionsInfo that = (JobExceptionsInfo) o;
+ return Objects.equals(executionExceptionsInfo, that.getExecutionExceptionsInfo());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(executionExceptionsInfo);
+ }
+
+ //---------------------------------------------------------------------------------
+ // Static helper classes
+ //---------------------------------------------------------------------------------
+
+ /**
+ * Json serializer for the {@link JobConfigInfo}.
+ */
+ public static final class Serializer extends StdSerializer<JobExceptionsInfo> {
+
+ private static final long serialVersionUID = 865827967854426549L;
+
+ public Serializer() {
+ super(JobExceptionsInfo.class);
+ }
+
+ @Override
+ public void serialize(
+ JobExceptionsInfo jobExceptionsInfo,
+ JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException {
+ jsonGenerator.writeStartObject();
+
+ jsonGenerator.writeStringField(FIELD_NAME_JOB_ID, jobExceptionsInfo.getJobId().toString());
+ jsonGenerator.writeStringField(FIELD_NAME_JOB_NAME, jobExceptionsInfo.getJobName());
+
+ if (jobExceptionsInfo.getExecutionExceptionsInfo() != null) {
+ jsonGenerator.writeObjectField(FIELD_NAME_EXECUTION_EXCEPTIONS, jobExceptionsInfo.getExecutionExceptionsInfo());
+ }
+
+ jsonGenerator.writeEndObject();
+ }
+ }
+
+ /**
+ * Json deserializer for the {@link JobExceptionsInfo}.
+ */
+ public static final class Deserializer extends StdDeserializer<JobExceptionsInfo> {
+
+ private static final long serialVersionUID = 3916722126430424592L;
+
+ public Deserializer() {
+ super(JobExceptionsInfo.class);
+ }
+
+ @Override
+ public JobExceptionsInfo deserialize(
+ JsonParser jsonParser,
+ DeserializationContext deserializationContext) throws IOException {
+ JsonNode rootNode = jsonParser.readValueAsTree();
+
+ final JobID jobId = JobID.fromHexString(rootNode.get(FIELD_NAME_JOB_ID).asText());
+ final String jobName = rootNode.get(FIELD_NAME_JOB_NAME).asText();
+
+ final JobExceptionsInfo.ExecutionExceptionsInfo executionExceptionsInfo;
+
+ if (rootNode.has(FIELD_NAME_EXECUTION_EXCEPTIONS)) {
+ executionExceptionsInfo = RestMapperUtils.getStrictObjectMapper().treeToValue(
+ rootNode.get(FIELD_NAME_EXECUTION_EXCEPTIONS), JobExceptionsInfo.ExecutionExceptionsInfo.class);
+ } else {
+ executionExceptionsInfo = null;
+ }
+
+ return new JobExceptionsInfo(jobId, jobName, executionExceptionsInfo);
+ }
+ }
+
+ /**
+ * Nested class to encapsulate the task execution exception.
+ */
+ public static final class ExecutionTaskExceptionInfo {
--- End diff --
rename to ExecutionExceptionInfo
---
[GitHub] flink pull request #4843: [FLINK-7703] Port JobExceptionsHandler to new REST...
Posted by zjureel <gi...@git.apache.org>.
Github user zjureel closed the pull request at:
https://github.com/apache/flink/pull/4843
---
[GitHub] flink issue #4843: [FLINK-7703] Port JobExceptionsHandler to new REST endpoi...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/4843
@zjureel Can you close this PR? I've already merged it.
---
[GitHub] flink issue #4843: [FLINK-7703] Port JobExceptionsHandler to new REST endpoi...
Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on the issue:
https://github.com/apache/flink/pull/4843
@zentol Thank you for your review. I think you're right about ExecutionExceptionsInfo and JobExceptionsInfo, and I have fixed them already. THX
---
[GitHub] flink pull request #4843: [FLINK-7703] Port JobExceptionsHandler to new REST...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4843#discussion_r145112112
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobExceptionsHandler}.
+ */
+@JsonSerialize(using = JobExceptionsInfo.Serializer.class)
+@JsonDeserialize(using = JobExceptionsInfo.Deserializer.class)
+public class JobExceptionsInfo implements ResponseBody {
+
+ public static final String FIELD_NAME_JOB_ID = "jid";
+ public static final String FIELD_NAME_JOB_NAME = "name";
+ public static final String FIELD_NAME_EXECUTION_EXCEPTIONS = "execution-exceptions";
+
+ private final JobID jobId;
+
+ private final String jobName;
+
+ @Nullable
+ private final ExecutionExceptionsInfo executionExceptionsInfo;
+
+ public JobExceptionsInfo(JobID jobId, String jobName, @Nullable ExecutionExceptionsInfo executionExceptionsInfo) {
--- End diff --
This class doesn't appear to resemble the response that the existing JobExceptionsHandler returns. The ExecutionExceptionsInfo contains everything we need.
What is this class for?
---
[GitHub] flink issue #4843: [FLINK-7703] Port JobExceptionsHandler to new REST endpoi...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/4843
merging.
---