You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/31 13:16:37 UTC
[flink] branch master updated: [FLINK-11365][tests] Remove legacy
TaskManagerFailureRecoveryITCase
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7b6f2f4 [FLINK-11365][tests] Remove legacy TaskManagerFailureRecoveryITCase
7b6f2f4 is described below
commit 7b6f2f487511d3766089ecd495597440f8a45142
Author: boshu Zheng <bl...@gmail.com>
AuthorDate: Thu Jan 31 21:16:30 2019 +0800
[FLINK-11365][tests] Remove legacy TaskManagerFailureRecoveryITCase
---
.../recovery/TaskManagerFailureRecoveryITCase.java | 205 ---------------------
1 file changed, 205 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
deleted file mode 100644
index d99f195..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recovery;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.test.util.TestEnvironment;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.pattern.Patterns;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.fail;
-
-/**
- * This test verifies the behavior of the recovery in the case when a TaskManager
- * fails (shut down) in the middle of a job execution.
- *
- * <p>The test works with multiple in-process task managers. Initially, it starts a JobManager
- * and two TaskManagers with 2 slots each. It submits a program with parallelism 4
- * and waits until all tasks are brought up (coordination between the test and the tasks
- * happens via shared blocking queues). It then starts another TaskManager, which is
- * guaranteed to remain empty (all tasks are already deployed) and kills one of
- * the original task managers. The recovery should restart the tasks on the new TaskManager.
- */
-@SuppressWarnings("serial")
-public class TaskManagerFailureRecoveryITCase extends TestLogger {
-
- @Test
- public void testRestartWithFailingTaskManager() {
-
- final int parallelism = 4;
-
- LocalFlinkMiniCluster cluster = null;
- ActorSystem additionalSystem = null;
-
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
- config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
-
- config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "500 ms");
- config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "20 s");
- config.setInteger(AkkaOptions.WATCH_THRESHOLD, 20);
-
- cluster = new LocalFlinkMiniCluster(config, false);
-
- cluster.start();
-
- // for the result
- List<Long> resultCollection = new ArrayList<Long>();
-
- final ExecutionEnvironment env = new TestEnvironment(cluster, parallelism, false);
-
- env.setParallelism(parallelism);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
- env.getConfig().disableSysoutLogging();
-
- env.generateSequence(1, 10)
- .map(new FailingMapper<Long>())
- .reduce(new ReduceFunction<Long>() {
- @Override
- public Long reduce(Long value1, Long value2) {
- return value1 + value2;
- }
- })
- .output(new LocalCollectionOutputFormat<Long>(resultCollection));
-
- // simple reference (atomic does not matter) to pass back an exception from the trigger thread
- final AtomicReference<Throwable> ref = new AtomicReference<Throwable>();
-
- // trigger the execution from a separate thread, so we are available to temper with the
- // cluster during the execution
- Thread trigger = new Thread("program trigger") {
- @Override
- public void run() {
- try {
- env.execute();
- }
- catch (Throwable t) {
- ref.set(t);
- }
- }
- };
- trigger.setDaemon(true);
- trigger.start();
-
- // block until all the mappers are actually deployed
- // the mappers in turn are waiting
- for (int i = 0; i < parallelism; i++) {
- FailingMapper.TASK_TO_COORD_QUEUE.take();
- }
-
- // bring up one more task manager and wait for it to appear
- {
- additionalSystem = cluster.startTaskManagerActorSystem(2);
- ActorRef additionalTaskManager = cluster.startTaskManager(2, additionalSystem);
- Object message = TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage();
- Future<Object> future = Patterns.ask(additionalTaskManager, message, 30000);
-
- try {
- Await.result(future, new FiniteDuration(30000, TimeUnit.MILLISECONDS));
- }
- catch (TimeoutException e) {
- fail ("The additional TaskManager did not come up within 30 seconds");
- }
- }
-
- // kill the two other TaskManagers
- for (ActorRef tm : cluster.getTaskManagersAsJava()) {
- tm.tell(PoisonPill.getInstance(), null);
- }
-
- // wait for the next set of mappers (the recovery ones) to come online
- for (int i = 0; i < parallelism; i++) {
- FailingMapper.TASK_TO_COORD_QUEUE.take();
- }
-
- // tell the mappers that they may continue this time
- for (int i = 0; i < parallelism; i++) {
- FailingMapper.COORD_TO_TASK_QUEUE.add(new Object());
- }
-
- // wait for the program to finish
- trigger.join();
- if (ref.get() != null) {
- Throwable t = ref.get();
- t.printStackTrace();
- fail("Program execution caused an exception: " + t.getMessage());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- if (additionalSystem != null) {
- additionalSystem.shutdown();
- }
- if (cluster != null) {
- cluster.stop();
- }
- }
- }
-
- private static class FailingMapper<T> extends RichMapFunction<T, T> {
- private static final long serialVersionUID = 4435412404173331157L;
-
- private static final BlockingQueue<Object> TASK_TO_COORD_QUEUE = new LinkedBlockingQueue<Object>();
-
- private static final BlockingQueue<Object> COORD_TO_TASK_QUEUE = new LinkedBlockingQueue<Object>();
-
- @Override
- public void open(Configuration parameters) throws Exception {
- TASK_TO_COORD_QUEUE.add(new Object());
- COORD_TO_TASK_QUEUE.take();
- }
-
- @Override
- public T map(T value) throws Exception {
- return value;
- }
- }
-}