You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:27 UTC
[08/52] [abbrv] flink git commit: [hotfix] [tests] Migrate some test
tasks to Java
[hotfix] [tests] Migrate some test tasks to Java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/208324d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/208324d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/208324d6
Branch: refs/heads/master
Commit: 208324d67c9d3cfa66a63fc637dfa280dbd3062c
Parents: 67ed642
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 14 23:54:29 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:23 2016 +0100
----------------------------------------------------------------------
.../StackTraceSampleCoordinatorITCase.java | 6 ++-
.../checkpoint/CoordinatorShutdownTest.java | 3 ++
.../ExecutionGraphMetricsTest.java | 4 +-
.../ExecutionGraphRestartTest.java | 23 ++++++------
.../runtime/jobmanager/JobManagerTest.java | 18 +++++----
.../flink/runtime/jobmanager/JobSubmitTest.java | 7 ++--
.../runtime/taskmanager/TaskManagerTest.java | 5 ++-
.../testtasks/BlockingNoOpInvokable.java | 39 ++++++++++++++++++++
.../flink/runtime/testtasks/NoOpInvokable.java | 30 +++++++++++++++
.../runtime/testtasks/WaitingNoOpInvokable.java | 34 +++++++++++++++++
.../TaskManagerLossFailsTasksTest.scala | 3 +-
.../runtime/jobmanager/JobManagerITCase.scala | 1 +
.../apache/flink/runtime/jobmanager/Tasks.scala | 20 ----------
.../JobSubmissionFailsITCase.java | 6 +--
.../JobManagerHACheckpointRecoveryITCase.java | 4 +-
.../JobManagerHAJobGraphRecoveryITCase.java | 4 +-
.../jobmanager/JobManagerFailsITCase.scala | 2 +-
.../taskmanager/TaskManagerFailsITCase.scala | 3 +-
18 files changed, 154 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index f31f41f..c9fa547 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -32,14 +32,16 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;
@@ -81,7 +83,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
final int parallelism = 1;
final JobVertex task = new JobVertex("Task");
- task.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+ task.setInvokableClass(BlockingNoOpInvokable.class);
task.setParallelism(parallelism);
jobGraph.addVertex(task);
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 777ba9b..bd82d0c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -32,7 +32,10 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+
import org.junit.Test;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index d8d8e24..0f7e75f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -45,12 +45,12 @@ import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
@@ -87,7 +87,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
JobVertex jobVertex = new JobVertex("TestVertex");
jobVertex.setParallelism(parallelism);
- jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
Configuration config = new Configuration();
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 52bfc96..11f12a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -37,16 +37,17 @@ import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
@@ -105,8 +106,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance);
- JobVertex groupVertex = newJobVertex("Task1", NUM_TASKS, Tasks.NoOpInvokable.class);
- JobVertex groupVertex2 = newJobVertex("Task2", NUM_TASKS, Tasks.NoOpInvokable.class);
+ JobVertex groupVertex = newJobVertex("Task1", NUM_TASKS, NoOpInvokable.class);
+ JobVertex groupVertex2 = newJobVertex("Task2", NUM_TASKS, NoOpInvokable.class);
SlotSharingGroup sharingGroup = new SlotSharingGroup();
groupVertex.setSlotSharingGroup(sharingGroup);
@@ -242,7 +243,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
new InfiniteDelayRestartStrategy());
JobVertex jobVertex = new JobVertex("NoOpInvokable");
- jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
jobVertex.setParallelism(NUM_TASKS);
JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
@@ -380,8 +381,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance);
- JobVertex sender = newJobVertex("Task1", 1, Tasks.NoOpInvokable.class);
- JobVertex receiver = newJobVertex("Task2", 1, Tasks.NoOpInvokable.class);
+ JobVertex sender = newJobVertex("Task1", 1, NoOpInvokable.class);
+ JobVertex receiver = newJobVertex("Task2", 1, NoOpInvokable.class);
JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver);
ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000));
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -444,7 +445,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance);
- JobVertex vertex = newJobVertex("Test Vertex", 1, Tasks.NoOpInvokable.class);
+ JobVertex vertex = newJobVertex("Test Vertex", 1, NoOpInvokable.class);
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
@@ -490,7 +491,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance);
- JobVertex vertex = newJobVertex("Test Vertex", 1, Tasks.NoOpInvokable.class);
+ JobVertex vertex = newJobVertex("Test Vertex", 1, NoOpInvokable.class);
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
@@ -539,7 +540,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
scheduler.newInstanceAvailable(instance);
JobVertex sender = new JobVertex("Task");
- sender.setInvokableClass(Tasks.NoOpInvokable.class);
+ sender.setInvokableClass(NoOpInvokable.class);
sender.setParallelism(NUM_TASKS);
JobGraph jobGraph = new JobGraph("Pointwise job", sender);
@@ -655,7 +656,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance);
- JobVertex sender = newJobVertex("Task", NUM_TASKS, Tasks.NoOpInvokable.class);
+ JobVertex sender = newJobVertex("Task", NUM_TASKS, NoOpInvokable.class);
JobGraph jobGraph = new JobGraph("Pointwise job", sender);
@@ -672,7 +673,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
return new Tuple2<>(eg, instance);
}
- private static JobVertex newJobVertex(String task1, int numTasks, Class<Tasks.NoOpInvokable> invokable) {
+ private static JobVertex newJobVertex(String task1, int numTasks, Class<NoOpInvokable> invokable) {
JobVertex groupVertex = new JobVertex(task1);
groupVertex.setInvokableClass(invokable);
groupVertex.setParallelism(numTasks);
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 6d8c70b..3c17daf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -83,6 +83,8 @@ import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -171,7 +173,7 @@ public class JobManagerTest {
// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(1);
- sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+ sender.setInvokableClass(BlockingNoOpInvokable.class); // just block
sender.createAndAddResultDataSet(rid, PIPELINED);
final JobGraph jobGraph = new JobGraph("Blocking test job", sender);
@@ -287,12 +289,12 @@ public class JobManagerTest {
// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(1);
- sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish
+ sender.setInvokableClass(NoOpInvokable.class); // just finish
sender.createAndAddResultDataSet(rid, PIPELINED);
final JobVertex sender2 = new JobVertex("Blocking Sender");
sender2.setParallelism(1);
- sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+ sender2.setInvokableClass(BlockingNoOpInvokable.class); // just block
sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
final JobGraph jobGraph = new JobGraph("Fast finishing producer test job", sender, sender2);
@@ -378,12 +380,12 @@ public class JobManagerTest {
// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(1);
- sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish
+ sender.setInvokableClass(NoOpInvokable.class); // just finish
sender.createAndAddResultDataSet(rid, PIPELINED);
final JobVertex sender2 = new JobVertex("Blocking Sender");
sender2.setParallelism(1);
- sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+ sender2.setInvokableClass(BlockingNoOpInvokable.class); // just block
sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
final JobGraph jobGraph = new JobGraph("Fast finishing producer test job", sender, sender2);
@@ -520,7 +522,7 @@ public class JobManagerTest {
// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(1);
- sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+ sender.setInvokableClass(BlockingNoOpInvokable.class); // just block
final JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", sender);
final JobID jid = jobGraph.getJobID();
@@ -624,11 +626,11 @@ public class JobManagerTest {
JobGraph jobGraph = new JobGraph("croissant");
JobVertex jobVertex1 = new JobVertex("cappuccino");
jobVertex1.setParallelism(4);
- jobVertex1.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+ jobVertex1.setInvokableClass(BlockingNoOpInvokable.class);
JobVertex jobVertex2 = new JobVertex("americano");
jobVertex2.setParallelism(4);
- jobVertex2.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+ jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
jobGraph.addVertex(jobVertex1);
jobGraph.addVertex(jobVertex2);
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 1b8f0c3..53bd318 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.NetUtils;
import org.junit.AfterClass;
@@ -112,7 +113,7 @@ public class JobSubmitTest {
try {
// create a simple job graph
JobVertex jobVertex = new JobVertex("Test Vertex");
- jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
JobGraph jg = new JobGraph("test job", jobVertex);
// request the blob port from the job manager
@@ -176,7 +177,7 @@ public class JobSubmitTest {
}
};
- jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
JobGraph jg = new JobGraph("test job", jobVertex);
// submit the job
@@ -219,7 +220,7 @@ public class JobSubmitTest {
private JobGraph createSimpleJobGraph() {
JobVertex jobVertex = new JobVertex("Vertex");
- jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
List<JobVertexID> vertexIdList = Collections.singletonList(jobVertex.getID());
JobGraph jg = new JobGraph("test job", jobVertex);
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index b06e474..99c1c1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -72,6 +72,7 @@ import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.NetUtils;
@@ -1121,7 +1122,7 @@ public class TaskManagerTest extends TestLogger {
0,
new Configuration(),
new Configuration(),
- Tasks.BlockingNoOpInvokable.class.getName(),
+ BlockingNoOpInvokable.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
Collections.<BlobKey>emptyList(),
@@ -1227,7 +1228,7 @@ public class TaskManagerTest extends TestLogger {
// Look for BlockingNoOpInvokable#invoke
for (StackTraceElement elem : trace) {
if (elem.getClassName().equals(
- Tasks.BlockingNoOpInvokable.class.getName())) {
+ BlockingNoOpInvokable.class.getName())) {
assertEquals("invoke", elem.getMethodName());
success = true;
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java
new file mode 100644
index 0000000..c9adba8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * A task that does nothing but blocks indefinitely, until the executing thread is interrupted.
+ */
+public class BlockingNoOpInvokable extends AbstractInvokable {
+
+ @Override
+ public void invoke() throws Exception {
+ final Object o = new Object();
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (o) {
+ //noinspection InfiniteLoopStatement
+ while (true) {
+ o.wait();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java
new file mode 100644
index 0000000..fa9949a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * A simple task that does nothing and finishes immediately.
+ */
+public class NoOpInvokable extends AbstractInvokable {
+
+ @Override
+ public void invoke() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java
new file mode 100644
index 0000000..de7d59a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * A simple task that does nothing and finishes after a short delay of 100 milliseconds.
+ */
+public class WaitingNoOpInvokable extends AbstractInvokable {
+
+ private static final long waitingTime = 100L;
+
+ @Override
+ public void invoke() throws Exception {
+ Thread.sleep(waitingTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 1cbd605..6f833f1 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmanager.Tasks
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.apache.flink.runtime.testtasks.NoOpInvokable
import org.apache.flink.util.SerializedValue
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@@ -51,7 +52,7 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
scheduler.newInstanceAvailable(instance2)
val sender = new JobVertex("Task")
- sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
+ sender.setInvokableClass(classOf[NoOpInvokable])
sender.setParallelism(20)
val jobGraph = new JobGraph("Pointwise job", sender)
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 0569297..4aa0565 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableExcepti
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
+import org.apache.flink.runtime.testtasks._
import org.junit.runner.RunWith
import org.mockito.Mockito
import org.mockito.Mockito._
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index 87c123a..fabd66b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -25,26 +25,6 @@ import org.apache.flink.types.IntValue
object Tasks {
- class BlockingNoOpInvokable extends AbstractInvokable {
- override def invoke(): Unit = {
- val o = new Object()
- o.synchronized{
- o.wait()
- }
- }
- }
-
- class NoOpInvokable extends AbstractInvokable{
- override def invoke(): Unit = {}
- }
-
- class WaitingNoOpInvokable extends AbstractInvokable{
- val waitingTime = 100L
-
- override def invoke(): Unit = {
- Thread.sleep(waitingTime)
- }
- }
class Sender extends AbstractInvokable{
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 178656d..256b1ae 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -25,9 +25,9 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -63,7 +63,7 @@ public class JobSubmissionFailsITCase {
cluster.start();
final JobVertex jobVertex = new JobVertex("Working job vertex.");
- jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
workingJobGraph = new JobGraph("Working testing job", jobVertex);
}
catch (Exception e) {
@@ -113,7 +113,7 @@ public class JobSubmissionFailsITCase {
public void testExceptionInInitializeOnMaster() {
try {
final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
- failingJobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ failingJobVertex.setInvokableClass(NoOpInvokable.class);
final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 3f08b5a..418aa51 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -31,10 +31,10 @@ import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
@@ -372,7 +372,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
// Blocking JobGraph
JobVertex blockingVertex = new JobVertex("Blocking vertex");
- blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+ blockingVertex.setInvokableClass(BlockingNoOpInvokable.class);
JobGraph jobGraph = new JobGraph(blockingVertex);
// Submit the job in detached mode
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index bf39c4b..236e922 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
-import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -48,6 +47,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
@@ -461,7 +461,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
JobGraph jobGraph = new JobGraph("Blocking program");
JobVertex jobVertex = new JobVertex("Blocking Vertex");
- jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+ jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
jobGraph.addVertex(jobVertex);
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index a8bc7a4..5db02d1 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -23,7 +23,7 @@ import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable}
+import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable}
import org.apache.flink.runtime.messages.Acknowledge
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered
http://git-wip-us.apache.org/repos/asf/flink/blob/208324d6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 3b39b3f..e141cc2 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -25,7 +25,8 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.client.JobExecutionException
import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, BlockingReceiver, NoOpInvokable, Sender}
+import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
+import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable}
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager}
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._