You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/09 08:12:21 UTC

[GitHub] asfgit closed pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy

asfgit closed pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy
URL: https://github.com/apache/flink/pull/6739
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
index 61a9064ccbb..45ef760d25e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.execution;
 
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableType;
+
 /**
  * Exception thrown in order to suppress job restarts.
  *
@@ -25,6 +28,7 @@
  * job restarts. The JobManager will <strong>not</strong> restart a job, which
  * fails with this Exception.
  */
+@ThrowableAnnotation(ThrowableType.NonRecoverableError)
 public class SuppressRestartsException extends RuntimeException {
 
 	private static final long serialVersionUID = 221873676920848349L;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index e1c1657af44..bea7b8a06bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -19,7 +19,10 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableType;
 
+@ThrowableAnnotation(ThrowableType.NonRecoverableError)
 public class NoResourceAvailableException extends JobException {
 
 	private static final long serialVersionUID = -2249953165298717803L;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableAnnotation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableAnnotation.java
new file mode 100644
index 00000000000..86733501fbc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableAnnotation.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * for annotate the type of an {@link Throwable}.
+ */
+@Inherited
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ThrowableAnnotation {
+
+	/** get ThrowableType.*/
+	ThrowableType value() default ThrowableType.RecoverableError;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
new file mode 100644
index 00000000000..087d6d4e034
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+import java.lang.annotation.Annotation;
+
+/**
+ * Helper class, given a exception do the classification.
+ */
+public class ThrowableClassifier {
+
+	/**
+	 * classify the exceptions by extract the {@link ThrowableAnnotation} of it, that will be handled different failover logic.
+	 * @param cause
+	 * @return ThrowableType.RecoverableError if there is no such annotation
+	 */
+	public static ThrowableType getThrowableType(Throwable cause) {
+		final Annotation annotation = cause.getClass().getAnnotation(ThrowableAnnotation.class);
+		return annotation == null ? ThrowableType.RecoverableError : ((ThrowableAnnotation) annotation).value();
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
new file mode 100644
index 00000000000..5fb9a8efdad
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+/**
+ * */
+public enum ThrowableType {
+
+	/**
+	 * this indicates error that would not succeed even with retry, such as DivideZeroExeception.
+	 * No failover should be attempted with such error. Instead, the job should fail immediately.
+	 */
+	NonRecoverableError,
+
+	/**
+	 * data consumption error, which indicates that we should revoke the producer.
+	 * */
+	PartitionDataMissingError,
+
+	/**
+	 * this indicates error related to running environment, such as hardware error, service issue, in which case we should consider blacklist the machine.
+	 * */
+	EnvironmentError,
+
+	/**
+	 * this indicates other errors that is recoverable.
+	 * */
+	RecoverableError
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java
new file mode 100644
index 00000000000..57b330e3997
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableClassifier;
+import org.apache.flink.runtime.throwable.ThrowableType;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test throwable classifier
+ * */
+public class ThrowableClassifierTest extends TestLogger {
+
+	@Test
+	public void testThrowableType_NonRecoverable() {
+		assertEquals(ThrowableType.NonRecoverableError,
+			ThrowableClassifier.getThrowableType(new SuppressRestartsException(new Exception(""))));
+
+		assertEquals(ThrowableType.NonRecoverableError,
+			ThrowableClassifier.getThrowableType(new NoResourceAvailableException()));
+	}
+
+	@Test
+	public void testThrowableType_Recoverable() {
+		assertEquals(ThrowableType.RecoverableError,
+			ThrowableClassifier.getThrowableType(new Exception("")));
+		assertEquals(ThrowableType.RecoverableError,
+			ThrowableClassifier.getThrowableType(new ThrowableType_RecoverableFailure_Exception()));
+	}
+
+	@Test
+	public void testThrowableType_EnvironmentError() {
+		assertEquals(ThrowableType.EnvironmentError,
+			ThrowableClassifier.getThrowableType(new ThrowableType_EnvironmentError_Exception()));
+	}
+
+	@Test
+	public void testThrowableType_PartitionDataMissingError() {
+		assertEquals(ThrowableType.PartitionDataMissingError,
+			ThrowableClassifier.getThrowableType(new ThrowableType_PartitionDataMissingError_Exception()));
+	}
+
+	@Test
+	public void testThrowableType_InheritError() {
+		assertEquals(ThrowableType.PartitionDataMissingError,
+			ThrowableClassifier.getThrowableType(new Sub_ThrowableType_PartitionDataMissingError_Exception()));
+	}
+
+	@ThrowableAnnotation(ThrowableType.PartitionDataMissingError)
+	private class ThrowableType_PartitionDataMissingError_Exception extends Exception {
+	}
+
+	@ThrowableAnnotation(ThrowableType.EnvironmentError)
+	private class ThrowableType_EnvironmentError_Exception extends Exception {
+	}
+
+	@ThrowableAnnotation(ThrowableType.RecoverableError)
+	private class ThrowableType_RecoverableFailure_Exception extends Exception {
+	}
+
+	private class Sub_ThrowableType_PartitionDataMissingError_Exception extends ThrowableType_PartitionDataMissingError_Exception {
+	}
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services