You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/19 09:42:48 UTC

[flink] 02/02: [FLINK-12916][tests] Rework regex

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0923209936dfa4988f7b7681120fb0782122cce3
Author: Yun Tang <my...@live.com>
AuthorDate: Tue Jul 16 12:12:36 2019 +0200

    [FLINK-12916][tests] Rework regex
---
 .../restore/AbstractOperatorRestoreTestBase.java    | 21 ++++++++++++++++-----
 1 file changed, 16 insertions(+), 5 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index d1d5e1b..154477e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -49,6 +50,8 @@ import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -67,11 +70,13 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 	private static final int NUM_SLOTS_PER_TM = 4;
 	private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000L);
 	private static final Pattern PATTERN_CANCEL_WITH_SAVEPOINT_TOLERATED_EXCEPTIONS = Pattern
-		.compile(
-			"(was not running)" +
-				"|(Not all required tasks are currently running)" +
-				"|(Checkpoint was declined \\(tasks not ready\\))" +
-				"|(Task received cancellation from one of its inputs)"
+		.compile(Stream
+			.of("was not running",
+				CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING.message(),
+				CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY.message(),
+				CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER.message())
+			.map(AbstractOperatorRestoreTestBase::escapeRegexCharacters)
+			.collect(Collectors.joining(")|(", "(", ")"))
 		);
 
 	@Rule
@@ -226,4 +231,10 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 	 * @return savepoint directory to use
 	 */
 	protected abstract String getMigrationSavepointName();
+
+	private static String escapeRegexCharacters(String string) {
+		return string
+			.replaceAll("\\(", "\\\\(")
+			.replaceAll("\\)", "\\\\)");
+	}
 }