You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/05 11:33:11 UTC

[GitHub] tillrohrmann closed pull request #7022: [BP-1.7][FLINK-10773] Harden resume externalized checkpoint end-to-end test

tillrohrmann closed pull request #7022: [BP-1.7][FLINK-10773] Harden resume externalized checkpoint end-to-end test
URL: https://github.com/apache/flink/pull/7022
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index fb92960bb86..3c8d0ad537f 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -322,6 +322,8 @@ static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) t
 					SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
 					SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()))) {
 
+			private static final long serialVersionUID = -3154419724891779938L;
+
 			@Override
 			public long extractTimestamp(Event element) {
 				return element.getEventTime();
@@ -367,8 +369,8 @@ static boolean isSimulateFailures(ParameterTool pt) {
 		return pt.getBoolean(TEST_SIMULATE_FAILURE.key(), TEST_SIMULATE_FAILURE.defaultValue());
 	}
 
-	static MapFunction<Event, Event> createExceptionThrowingFailureMapper(ParameterTool pt) {
-		return new ExceptionThrowingFailureMapper<>(
+	static MapFunction<Event, Event> createFailureMapper(ParameterTool pt) {
+		return new FailureMapper<>(
 			pt.getLong(
 				TEST_SIMULATE_FAILURE_NUM_RECORDS.key(),
 				TEST_SIMULATE_FAILURE_NUM_RECORDS.defaultValue()),
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index 70fdade4a1c..b14e2af1b52 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -41,7 +41,7 @@
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper;
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
-import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createExceptionThrowingFailureMapper;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createFailureMapper;
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
@@ -68,7 +68,7 @@
 	private static final String OPERATOR_STATE_OPER_NAME = "ArtificalOperatorStateMapper";
 	private static final String TIME_WINDOW_OPER_NAME = "TumblingWindowOperator";
 	private static final String SEMANTICS_CHECK_MAPPER_NAME = "SemanticsCheckMapper";
-	private static final String FAILURE_MAPPER_NAME = "ExceptionThrowingFailureMapper";
+	private static final String FAILURE_MAPPER_NAME = "FailureMapper";
 
 	public static void main(String[] args) throws Exception {
 		final ParameterTool pt = ParameterTool.fromArgs(args);
@@ -145,7 +145,7 @@ public void apply(Integer integer, TimeWindow window, Iterable<Event> input, Col
 
 		if (isSimulateFailures(pt)) {
 			eventStream3 = eventStream3
-				.map(createExceptionThrowingFailureMapper(pt))
+				.map(createFailureMapper(pt))
 				.setParallelism(1)
 				.name(FAILURE_MAPPER_NAME);
 		}
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java
similarity index 94%
rename from flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
rename to flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java
index d758ef5cf0d..a3a1c253fc0 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java
@@ -30,7 +30,7 @@
  * of the operator can also be configured. Note that this also takes into account
  * failures that were not triggered by this mapper, e.g. TaskManager failures.
  */
-public class ExceptionThrowingFailureMapper<T> extends RichMapFunction<T, T> implements CheckpointListener {
+public class FailureMapper<T> extends RichMapFunction<T, T> implements CheckpointListener {
 
 	private static final long serialVersionUID = -5286927943454740016L;
 
@@ -41,7 +41,7 @@
 	private long numProcessedRecords;
 	private long numCompleteCheckpoints;
 
-	public ExceptionThrowingFailureMapper(
+	public FailureMapper(
 			long numProcessedRecordsFailureThreshold,
 			long numCompleteCheckpointsFailureThreshold,
 			int maxNumFailures) {
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 09c42af8742..bdf6f64c5a8 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -347,6 +347,7 @@ function check_logs_for_exceptions {
    | grep -v "java.lang.Exception: Execution was suspended" \
    | grep -v "java.io.InvalidClassException: org.apache.flink.formats.avro.typeutils.AvroSerializer" \
    | grep -v "Caused by: java.lang.Exception: JobManager is shutting down" \
+   | grep -v "java.lang.Exception: Artificial failure" \
    | grep -ic "exception")
   if [[ ${exception_count} -gt 0 ]]; then
     echo "Found exception in log files:"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services