You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/09 16:12:05 UTC
[2/2] flink git commit: [FLINK-5748] [jobmanager] Make the 'future
executor' a ScheduledExecutorService
[FLINK-5748] [jobmanager] Make the 'future executor' a ScheduledExecutorService
This closes #3289
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/665c7e39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/665c7e39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/665c7e39
Branch: refs/heads/master
Commit: 665c7e399928188b22a7963cc05654589d47941c
Parents: 95765b6
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 8 20:51:46 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 9 14:36:35 2017 +0100
----------------------------------------------------------------------
.../MesosApplicationMasterRunner.java | 5 +-
.../clusterframework/MesosJobManager.scala | 4 +-
.../BackPressureStatsTrackerITCase.java | 4 +-
.../StackTraceSampleCoordinatorITCase.java | 4 +-
.../webmonitor/WebRuntimeMonitorITCase.java | 5 +-
.../runtime/executiongraph/ExecutionGraph.java | 11 +-
.../executiongraph/ExecutionGraphBuilder.java | 6 +-
.../runtime/jobmaster/JobManagerServices.java | 12 ++-
.../flink/runtime/jobmaster/JobMaster.java | 3 +-
.../ContaineredJobManager.scala | 4 +-
.../flink/runtime/jobmanager/JobManager.scala | 14 +--
.../runtime/minicluster/FlinkMiniCluster.scala | 4 +-
.../minicluster/LocalFlinkMiniCluster.scala | 4 +-
...ExecutionGraphCheckpointCoordinatorTest.java | 4 +-
.../clusterframework/ClusterShutdownITCase.java | 13 +--
.../clusterframework/ResourceManagerITCase.java | 8 +-
.../ArchivedExecutionGraphTest.java | 4 +-
.../ExecutionGraphConstructionTest.java | 32 +++---
.../ExecutionGraphDeploymentTest.java | 13 +--
.../ExecutionGraphMetricsTest.java | 4 +-
.../ExecutionGraphRestartTest.java | 12 +--
.../ExecutionGraphSignalsTest.java | 4 +-
.../executiongraph/ExecutionGraphTestUtils.java | 9 +-
.../ExecutionStateProgressTest.java | 4 +-
.../ExecutionVertexCancelTest.java | 18 ++--
.../ExecutionVertexDeploymentTest.java | 5 +-
.../ExecutionVertexLocalityTest.java | 6 +-
.../executiongraph/LegacyJobVertexIdTest.java | 3 +-
.../executiongraph/PointwisePatternTest.java | 28 ++---
.../TerminalStateDeadlockTest.java | 4 +-
.../executiongraph/VertexSlotSharingTest.java | 4 +-
.../jobmanager/JobManagerHARecoveryTest.java | 24 ++---
.../runtime/jobmanager/JobManagerTest.java | 20 ++--
.../flink/runtime/jobmanager/JobSubmitTest.java | 5 +-
.../JobManagerLeaderElectionTest.java | 17 ++-
.../runtime/metrics/TaskManagerMetricsTest.java | 5 +-
...askManagerComponentsStartupShutdownTest.java | 6 +-
.../TaskManagerProcessReapingTestBase.java | 5 +-
.../TaskManagerRegistrationTest.java | 12 +--
.../DirectScheduledExecutorService.java | 107 +++++++++++++++++++
.../TaskManagerLossFailsTasksTest.scala | 14 ++-
.../jobmanager/JobManagerRegistrationTest.scala | 8 +-
.../runtime/testingUtils/TestingCluster.scala | 4 +-
.../testingUtils/TestingJobManager.scala | 4 +-
.../runtime/testingUtils/TestingUtils.scala | 86 +++++++++++----
.../partitioner/RescalePartitionerTest.java | 4 +-
...ctTaskManagerProcessFailureRecoveryTest.java | 5 +-
.../recovery/ProcessFailureCancelingITCase.java | 5 +-
.../flink/yarn/TestingYarnJobManager.scala | 4 +-
.../flink/yarn/YarnApplicationMasterRunner.java | 3 +-
.../org/apache/flink/yarn/YarnJobManager.scala | 4 +-
51 files changed, 385 insertions(+), 212 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index de76d8e..5033692 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -72,6 +72,7 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkState;
@@ -195,7 +196,7 @@ public class MesosApplicationMasterRunner {
ActorSystem actorSystem = null;
WebMonitor webMonitor = null;
MesosArtifactServer artifactServer = null;
- ExecutorService futureExecutor = null;
+ ScheduledExecutorService futureExecutor = null;
ExecutorService ioExecutor = null;
MesosServices mesosServices = null;
@@ -213,7 +214,7 @@ public class MesosApplicationMasterRunner {
// JM configuration
int numberProcessors = Hardware.getNumberCPUCores();
- futureExecutor = Executors.newFixedThreadPool(
+ futureExecutor = Executors.newScheduledThreadPool(
numberProcessors,
new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
index 38886f8..3e7c55f 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
@@ -18,7 +18,7 @@
package org.apache.flink.mesos.runtime.clusterframework
-import java.util.concurrent.Executor
+import java.util.concurrent.{Executor, ScheduledExecutorService}
import akka.actor.ActorRef
import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
@@ -51,7 +51,7 @@ import scala.concurrent.duration._
*/
class MesosJobManager(
flinkConfiguration: FlinkConfiguration,
- futureExecutor: Executor,
+ futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
instanceManager: InstanceManager,
scheduler: FlinkScheduler,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 357301a..30a86a2 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -122,8 +122,8 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
try {
jobManger = TestingUtils.createJobManager(
testActorSystem,
- testActorSystem.dispatcher(),
- testActorSystem.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
new Configuration());
final Configuration config = new Configuration();
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index c9fa547..a44e212 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -94,8 +94,8 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
try {
jobManger = TestingUtils.createJobManager(
testActorSystem,
- testActorSystem.dispatcher(),
- testActorSystem.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
new Configuration());
final Configuration config = new Configuration();
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index d8bd6af..8af1f46 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.webmonitor.files.MimeTypes;
@@ -161,8 +162,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
jobManager[i] = JobManager.startJobManagerActors(
jmConfig,
jobManagerSystem[i],
- jobManagerSystem[i].dispatcher(),
- jobManagerSystem[i].dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
JobManager.class,
MemoryArchivist.class)._1();
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index da4a66e..f25120c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -78,6 +78,7 @@ import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -201,7 +202,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
private CheckpointStatsTracker checkpointStatsTracker;
/** The executor which is used to execute futures. */
- private final Executor futureExecutor;
+ private final ScheduledExecutorService futureExecutor;
/** The executor which is used to execute blocking io operations */
private final Executor ioExecutor;
@@ -220,7 +221,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* This constructor is for tests only, because it does not include class loading information.
*/
ExecutionGraph(
- Executor futureExecutor,
+ ScheduledExecutorService futureExecutor,
Executor ioExecutor,
JobID jobId,
String jobName,
@@ -237,15 +238,15 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
serializedConfig,
timeout,
restartStrategy,
- new ArrayList<BlobKey>(),
- new ArrayList<URL>(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
ExecutionGraph.class.getClassLoader(),
new UnregisteredMetricsGroup()
);
}
public ExecutionGraph(
- Executor futureExecutor,
+ ScheduledExecutorService futureExecutor,
Executor ioExecutor,
JobID jobId,
String jobName,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 386f202..c558e43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -44,6 +44,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -51,16 +52,17 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* Utility class to encapsulate the logic of building an {@link ExecutionGraph} from a {@link JobGraph}.
*/
public class ExecutionGraphBuilder {
+
/**
* Builds the ExecutionGraph from the JobGraph.
* If a prior execution graph exists, the JobGraph will be attached. If no prior execution
- * graph exists, then the JobGraph will become attach to a new emoty execution graph.
+ * graph exists, then the JobGraph will become attach to a new empty execution graph.
*/
public static ExecutionGraph buildGraph(
@Nullable ExecutionGraph prior,
JobGraph jobGraph,
Configuration jobManagerConfig,
- Executor futureExecutor,
+ ScheduledExecutorService futureExecutor,
Executor ioExecutor,
ClassLoader classLoader,
CheckpointRecoveryFactory recoveryFactory,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index fff75d5..95500e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -27,12 +27,14 @@ import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.util.ExceptionUtils;
import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -41,7 +43,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class JobManagerServices {
- public final ExecutorService executorService;
+ public final ScheduledExecutorService executorService;
public final BlobLibraryCacheManager libraryCacheManager;
@@ -50,7 +52,7 @@ public class JobManagerServices {
public final Time rpcAskTimeout;
public JobManagerServices(
- ExecutorService executorService,
+ ScheduledExecutorService executorService,
BlobLibraryCacheManager libraryCacheManager,
RestartStrategyFactory restartStrategyFactory,
Time rpcAskTimeout) {
@@ -119,7 +121,7 @@ public class JobManagerServices {
}
return new JobManagerServices(
- new ForkJoinPool(),
+ Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE),
libraryCacheManager,
RestartStrategyFactory.createRestartStrategyFactory(config),
Time.of(timeout.length(), timeout.unit()));
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index d6bcf2c..a318657 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -103,6 +103,7 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -182,7 +183,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityService,
- ExecutorService executorService,
+ ScheduledExecutorService executorService,
BlobLibraryCacheManager libraryCacheManager,
RestartStrategyFactory restartStrategyFactory,
Time rpcAskTimeout,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
index cbe80f1..cd7b363 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.clusterframework
-import java.util.concurrent.Executor
+import java.util.concurrent.{ScheduledExecutorService, Executor}
import akka.actor.ActorRef
import org.apache.flink.api.common.JobID
@@ -59,7 +59,7 @@ import scala.language.postfixOps
*/
abstract class ContaineredJobManager(
flinkConfiguration: Configuration,
- futureExecutor: Executor,
+ futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
instanceManager: InstanceManager,
scheduler: FlinkScheduler,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 81e9fc4..d575f68 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -117,7 +117,7 @@ import scala.language.postfixOps
*/
class JobManager(
protected val flinkConfiguration: Configuration,
- protected val futureExecutor: Executor,
+ protected val futureExecutor: ScheduledExecutorService,
protected val ioExecutor: Executor,
protected val instanceManager: InstanceManager,
protected val scheduler: FlinkScheduler,
@@ -2021,7 +2021,7 @@ object JobManager {
val numberProcessors = Hardware.getNumberCPUCores()
- val futureExecutor = Executors.newFixedThreadPool(
+ val futureExecutor = Executors.newScheduledThreadPool(
numberProcessors,
new NamedThreadFactory("jobmanager-future-", "-thread-"))
@@ -2189,7 +2189,7 @@ object JobManager {
executionMode: JobManagerMode,
externalHostname: String,
port: Int,
- futureExecutor: Executor,
+ futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
jobManagerClass: Class[_ <: JobManager],
archiveClass: Class[_ <: MemoryArchivist],
@@ -2466,7 +2466,7 @@ object JobManager {
*/
def createJobManagerComponents(
configuration: Configuration,
- futureExecutor: Executor,
+ futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
leaderElectionServiceOption: Option[LeaderElectionService]) :
(InstanceManager,
@@ -2601,7 +2601,7 @@ object JobManager {
def startJobManagerActors(
configuration: Configuration,
actorSystem: ActorSystem,
- futureExecutor: Executor,
+ futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
jobManagerClass: Class[_ <: JobManager],
archiveClass: Class[_ <: MemoryArchivist])
@@ -2637,7 +2637,7 @@ object JobManager {
def startJobManagerActors(
configuration: Configuration,
actorSystem: ActorSystem,
- futureExecutor: Executor,
+ futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
jobManagerActorName: Option[String],
archiveActorName: Option[String],
@@ -2707,7 +2707,7 @@ object JobManager {
def getJobManagerProps(
jobManagerClass: Class[_ <: JobManager],
configuration: Configuration,
- futureExecutor: Executor,
+ futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
instanceManager: InstanceManager,
scheduler: FlinkScheduler,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 88d7b3a..64cc97d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -107,13 +107,13 @@ abstract class FlinkMiniCluster(
private var isRunning = false
- val futureExecutor = Executors.newFixedThreadPool(
+ val futureExecutor = Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
new NamedThreadFactory("mini-cluster-future-", "-thread"))
val ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
- new NamedThreadFactory("mini-cluster-future-", "-thread"))
+ new NamedThreadFactory("mini-cluster-io-", "-thread"))
def configuration: Configuration = {
if (originalConfiguration.getInteger(
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 4ec655e..adace0b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.minicluster
import java.net.InetAddress
-import java.util.concurrent.Executor
+import java.util.concurrent.{Executor, ScheduledExecutorService}
import akka.actor.{ActorRef, ActorSystem, Props}
import org.apache.flink.api.common.JobID
@@ -254,7 +254,7 @@ class LocalFlinkMiniCluster(
def getJobManagerProps(
jobManagerClass: Class[_ <: JobManager],
configuration: Configuration,
- futureExecutor: Executor,
+ futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
instanceManager: InstanceManager,
scheduler: Scheduler,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index bc95de7..47e6826 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -93,8 +93,8 @@ public class ExecutionGraphCheckpointCoordinatorTest {
CheckpointIDCounter counter,
CompletedCheckpointStore store) throws Exception {
ExecutionGraph executionGraph = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
new JobID(),
"test",
new Configuration(),
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
index 04839af..41018dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.clusterframework;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
+
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -31,11 +31,12 @@ import org.apache.flink.runtime.testingUtils.TestingMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingResourceManager;
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import scala.Option;
+import scala.Option;
/**
* Runs tests to ensure that a cluster is shutdown properly.
@@ -74,8 +75,8 @@ public class ClusterShutdownITCase extends TestLogger {
ActorGateway jobManager =
TestingUtils.createJobManager(
system,
- system.dispatcher(),
- system.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
config,
"jobmanager1");
@@ -121,8 +122,8 @@ public class ClusterShutdownITCase extends TestLogger {
ActorGateway jobManager =
TestingUtils.createJobManager(
system,
- system.dispatcher(),
- system.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
config,
"jobmanager2");
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
index d4ff03e..6191195 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
@@ -77,8 +77,8 @@ public class ResourceManagerITCase extends TestLogger {
ActorGateway jobManager =
TestingUtils.createJobManager(
system,
- system.dispatcher(),
- system.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
config,
"ReconciliationTest");
ActorGateway me =
@@ -136,8 +136,8 @@ public class ResourceManagerITCase extends TestLogger {
ActorGateway jobManager =
TestingUtils.createJobManager(
system,
- system.dispatcher(),
- system.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
config,
"RegTest");
ActorGateway me =
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 9b1064d..46ce3f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -96,8 +96,8 @@ public class ArchivedExecutionGraphTest {
config.setGlobalJobParameters(new TestJobParameters());
runtimeGraph = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
new JobID(),
"test job",
new Configuration(),
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index bf3a17c..aed1095 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -112,8 +112,8 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
@@ -162,8 +162,8 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
@@ -237,8 +237,8 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
@@ -497,8 +497,8 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
@@ -562,8 +562,8 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
@@ -631,8 +631,8 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
@@ -678,8 +678,8 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
@@ -760,8 +760,8 @@ public class ExecutionGraphConstructionTest {
JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8);
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index ef4f74c..6b05987 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.SerializedValue;
import org.junit.Test;
@@ -89,8 +90,8 @@ public class ExecutionGraphDeploymentTest {
v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
"some job",
new Configuration(),
@@ -313,8 +314,8 @@ public class ExecutionGraphDeploymentTest {
// execution graph that executes actions synchronously
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.directExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ new DirectScheduledExecutorService(),
+ TestingUtils.defaultExecutor(),
jobId,
"failing test job",
new Configuration(),
@@ -358,8 +359,8 @@ public class ExecutionGraphDeploymentTest {
// execution graph that executes actions synchronously
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.directExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ new DirectScheduledExecutorService(),
+ TestingUtils.defaultExecutor(),
jobId,
"some job",
new Configuration(),
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 41e47d2..6b28984 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -66,8 +66,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -82,7 +82,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
*/
@Test
public void testExecutionGraphRestartTimeMetric() throws JobException, IOException, InterruptedException {
- final ExecutorService executor = Executors.newCachedThreadPool();
+ final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
try {
// setup execution graph with mocked scheduling logic
int parallelism = 1;
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 11f12a5..e4f49bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -232,8 +232,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
// Blocking program
ExecutionGraph executionGraph = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
new JobID(),
"TestJob",
new Configuration(),
@@ -548,8 +548,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
ControllableRestartStrategy controllableRestartStrategy = new ControllableRestartStrategy(timeout);
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
new JobID(),
"Test job",
new Configuration(),
@@ -682,8 +682,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException {
return new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
new JobID(),
"Test job",
new Configuration(),
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index fde967e..b7850fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -132,8 +132,8 @@ public class ExecutionGraphSignalsTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 71ae3b6..ef94917 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -24,7 +24,7 @@ import static org.mockito.Mockito.spy;
import java.lang.reflect.Field;
import java.net.InetAddress;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
@@ -168,7 +168,10 @@ public class ExecutionGraphTestUtils {
public static final String ERROR_MESSAGE = "test_failure_error_message";
- public static ExecutionJobVertex getExecutionVertex(JobVertexID id, Executor executor) throws Exception {
+ public static ExecutionJobVertex getExecutionVertex(
+ JobVertexID id, ScheduledExecutorService executor)
+ throws Exception {
+
JobVertex ajv = new JobVertex("TestVertex", id);
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
@@ -200,6 +203,6 @@ public class ExecutionGraphTestUtils {
}
public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws Exception {
- return getExecutionVertex(id, TestingUtils.defaultExecutionContext());
+ return getExecutionVertex(id, TestingUtils.defaultExecutor());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 19e2d6d..7427a1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -52,8 +52,8 @@ public class ExecutionStateProgressTest {
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
ExecutionGraph graph = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jid,
"test job",
new Configuration(),
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index b10b796..7b6c6ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -39,8 +39,10 @@ import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.junit.Test;
+
import scala.concurrent.ExecutionContext;
@SuppressWarnings("serial")
@@ -121,7 +123,6 @@ public class ExecutionVertexCancelTest {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
- final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
setVertexState(vertex, ExecutionState.SCHEDULED);
assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
@@ -187,7 +188,6 @@ public class ExecutionVertexCancelTest {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
- final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
setVertexState(vertex, ExecutionState.SCHEDULED);
assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
@@ -249,11 +249,10 @@ public class ExecutionVertexCancelTest {
public void testCancelFromRunning() {
try {
final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
- final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
ActorGateway actorGateway = new CancelSequenceActorGateway(
TestingUtils.directExecutionContext(),
@@ -268,7 +267,7 @@ public class ExecutionVertexCancelTest {
assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
vertex.cancel();
- vertex.getCurrentExecutionAttempt().cancelingComplete(); // responce by task manager once actially canceled
+ vertex.getCurrentExecutionAttempt().cancelingComplete(); // response by task manager once actually canceled
assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
@@ -290,11 +289,10 @@ public class ExecutionVertexCancelTest {
try {
final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
- final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
final ActorGateway actorGateway = new CancelSequenceActorGateway(
TestingUtils.directExecutionContext(),
@@ -339,12 +337,10 @@ public class ExecutionVertexCancelTest {
// this may happen when the task finished or failed while the call was in progress
try {
final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
- final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
-
final ActorGateway actorGateway = new CancelSequenceActorGateway(
TestingUtils.directExecutionContext(),
@@ -376,7 +372,7 @@ public class ExecutionVertexCancelTest {
public void testCancelCallFails() {
try {
final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index c04ebc6..26cb3f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.junit.Test;
import java.util.Collection;
@@ -96,7 +97,7 @@ public class ExecutionVertexDeploymentTest {
try {
final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService());
final Instance instance = getInstance(
new ActorTaskManagerGateway(
@@ -180,7 +181,7 @@ public class ExecutionVertexDeploymentTest {
public void testDeployFailedSynchronous() {
try {
final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 36b7575..47863ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.instance.SimpleSlot;
@@ -39,6 +38,7 @@ import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
@@ -207,8 +207,8 @@ public class ExecutionVertexLocalityTest extends TestLogger {
null,
testJob,
new Configuration(),
- Executors.directExecutor(),
- Executors.directExecutor(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
getClass().getClassLoader(),
new StandaloneCheckpointRecoveryFactory(),
Time.of(10, TimeUnit.SECONDS),
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
index 44dc0a4..32867bf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import static org.mockito.Mockito.mock;
@@ -48,7 +49,7 @@ public class LegacyJobVertexIdTest {
jobVertex.setInvokableClass(AbstractInvokable.class);
ExecutionGraph executionGraph = new ExecutionGraph(
- mock(Executor.class),
+ mock(ScheduledExecutorService.class),
mock(Executor.class),
new JobID(),
"test",
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 0e147e3..3a7e759 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -66,8 +66,8 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
@@ -112,8 +112,8 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
@@ -159,8 +159,8 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
@@ -207,8 +207,8 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
@@ -253,8 +253,8 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
@@ -319,8 +319,8 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
@@ -376,8 +376,8 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
jobName,
cfg,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 5b1a03e..afdafe3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -180,8 +180,8 @@ public class TerminalStateDeadlockTest {
TestExecGraph(JobID jobId) throws IOException {
super(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
jobId,
"test graph",
EMPTY_CONFIG,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 27708a2..4e1bfae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -80,8 +80,8 @@ public class VertexSlotSharingTest {
List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
new JobID(),
"test job",
new Configuration(),
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 398505f..8985a34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -80,11 +79,13 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
import org.apache.flink.util.InstantiationUtil;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+
import scala.Int;
import scala.Option;
import scala.PartialFunction;
@@ -105,8 +106,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -156,8 +156,6 @@ public class JobManagerHARecoveryTest {
flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
- ExecutorService executor = null;
-
try {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
@@ -175,13 +173,11 @@ public class JobManagerHARecoveryTest {
MemoryArchivist.class,
10), "archive");
- executor = new ForkJoinPool();
-
Props jobManagerProps = Props.create(
TestingJobManager.class,
flinkConfiguration,
- executor,
- executor,
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
instanceManager,
scheduler,
new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
@@ -309,10 +305,6 @@ public class JobManagerHARecoveryTest {
if (taskManager != null) {
taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
-
- if (executor != null) {
- executor.shutdownNow();
- }
}
}
@@ -353,8 +345,8 @@ public class JobManagerHARecoveryTest {
Props jobManagerProps = Props.create(
TestingFailingHAJobManager.class,
flinkConfiguration,
- Executors.directExecutor(),
- Executors.directExecutor(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
mock(InstanceManager.class),
mock(Scheduler.class),
new BlobLibraryCacheManager(mock(BlobService.class), 1 << 20),
@@ -390,7 +382,7 @@ public class JobManagerHARecoveryTest {
public TestingFailingHAJobManager(
Configuration flinkConfiguration,
- Executor futureExecutor,
+ ScheduledExecutorService futureExecutor,
Executor ioExecutor,
InstanceManager instanceManager,
Scheduler scheduler,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index b627273..c5f6d99 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -577,8 +577,8 @@ public class JobManagerTest {
JobManager.startJobManagerActors(
config,
system,
- system.dispatcher(),
- system.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
TestingJobManager.class,
MemoryArchivist.class)._1(),
leaderSessionId);
@@ -787,8 +787,8 @@ public class JobManagerTest {
Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
config,
actorSystem,
- actorSystem.dispatcher(),
- actorSystem.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
Option.apply("jm"),
Option.apply("arch"),
TestingJobManager.class,
@@ -912,8 +912,8 @@ public class JobManagerTest {
Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
config,
actorSystem,
- actorSystem.dispatcher(),
- actorSystem.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
Option.apply("jm"),
Option.apply("arch"),
TestingJobManager.class,
@@ -1017,8 +1017,8 @@ public class JobManagerTest {
Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
config,
actorSystem,
- actorSystem.dispatcher(),
- actorSystem.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
Option.apply("jm"),
Option.apply("arch"),
TestingJobManager.class,
@@ -1116,8 +1116,8 @@ public class JobManagerTest {
Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
new Configuration(),
actorSystem,
- actorSystem.dispatcher(),
- actorSystem.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
Option.apply("jm"),
Option.apply("arch"),
TestingJobManager.class,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index c5b6697..feb3d4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.NetUtils;
@@ -84,8 +85,8 @@ public class JobSubmitTest {
JobManager.startJobManagerActors(
jmConfig,
jobManagerSystem,
- jobManagerSystem.dispatcher(),
- jobManagerSystem.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
JobManager.class,
MemoryArchivist.class)._1();
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index f051281..d6257ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -25,6 +25,7 @@ import akka.actor.Props;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.Configuration;
@@ -45,18 +46,18 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.forkjoin.ForkJoinPool;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
public class JobManagerLeaderElectionTest extends TestLogger {
@@ -66,8 +67,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
private static ActorSystem actorSystem;
private static TestingServer testingServer;
- private static ExecutorService executor;
-
+
private static Timeout timeout = new Timeout(TestingUtils.TESTING_DURATION());
private static FiniteDuration duration = new FiniteDuration(5, TimeUnit.MINUTES);
@@ -75,7 +75,6 @@ public class JobManagerLeaderElectionTest extends TestLogger {
public static void setup() throws Exception {
actorSystem = ActorSystem.create("TestingActorSystem");
testingServer = new TestingServer();
- executor = new ForkJoinPool();
}
@AfterClass
@@ -87,10 +86,6 @@ public class JobManagerLeaderElectionTest extends TestLogger {
if (testingServer != null) {
testingServer.stop();
}
-
- if (executor != null) {
- executor.shutdownNow();
- }
}
/**
@@ -185,8 +180,8 @@ public class JobManagerLeaderElectionTest extends TestLogger {
return Props.create(
TestingJobManager.class,
configuration,
- executor,
- executor,
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
new InstanceManager(),
new Scheduler(TestingUtils.defaultExecutionContext()),
new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index cb37905..aed2b6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -60,8 +61,8 @@ public class TaskManagerMetricsTest {
final ActorRef jobManager = JobManager.startJobManagerActors(
new Configuration(),
actorSystem,
- actorSystem.dispatcher(),
- actorSystem.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
JobManager.class,
MemoryArchivist.class)._1();
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 873ec8e..a6e1a2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
@@ -52,6 +51,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.junit.Test;
@@ -86,8 +86,8 @@ public class TaskManagerComponentsStartupShutdownTest {
final ActorRef jobManager = JobManager.startJobManagerActors(
config,
actorSystem,
- actorSystem.dispatcher(),
- actorSystem.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
JobManager.class,
MemoryArchivist.class)._1();
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 385d1ac..2528e24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.NetUtils;
import org.junit.Test;
@@ -100,8 +101,8 @@ public abstract class TaskManagerProcessReapingTestBase {
ActorRef jmActor = JobManager.startJobManagerActors(
new Configuration(),
jmActorSystem,
- jmActorSystem.dispatcher(),
- jmActorSystem.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
JobManager.class,
MemoryArchivist.class)._1;
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 5753349..e234cba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -108,8 +108,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
// a simple JobManager
jobManager = createJobManager(
actorSystem,
- actorSystem.dispatcher(),
- actorSystem.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
config);
startResourceManager(config, jobManager.actor());
@@ -192,8 +192,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
// now start the JobManager, with the regular akka URL
jobManager = createJobManager(
actorSystem,
- actorSystem.dispatcher(),
- actorSystem.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
new Configuration());
startResourceManager(config, jobManager.actor());
@@ -635,8 +635,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
return JobManager.startJobManagerActors(
configuration,
actorSystem,
- actorSystem.dispatcher(),
- actorSystem.dispatcher(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
NONE_STRING,
NONE_STRING,
JobManager.class,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DirectScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DirectScheduledExecutorService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DirectScheduledExecutorService.java
new file mode 100644
index 0000000..06d632b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DirectScheduledExecutorService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.runtime.concurrent.Executors;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This implements a variant of the {@link Executors#directExecutor()} that also implements the
+ * {@link ScheduledExecutorService} interface.
+ * Scheduled executables are actually scheduled, all other (call / execute) execute synchronously.
+ */
+public class DirectScheduledExecutorService extends AbstractExecutorService implements ScheduledExecutorService {
+
+ private final ScheduledExecutorService scheduledService =
+ java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
+
+ // ------------------------------------------------------------------------
+ // Direct Executor
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void execute(Runnable command) {
+ if (!isShutdown()) {
+ command.run();
+ } else {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Scheduled Executor
+ // ------------------------------------------------------------------------
+
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ return scheduledService.schedule(command, delay, unit);
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ return scheduledService.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return scheduledService.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return scheduledService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+ // ------------------------------------------------------------------------
+ // Shutdown
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void shutdown() {
+ scheduledService.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return scheduledService.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return scheduledService.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return scheduledService.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return scheduledService.awaitTermination(timeout, unit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 6f833f1..258e44e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -18,19 +18,22 @@
package org.apache.flink.runtime.executiongraph
+import java.util.concurrent.Executors
+
import org.apache.flink.api.common.{ExecutionConfig, JobID}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy
import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
import org.apache.flink.runtime.testingUtils.TestingUtils
import org.apache.flink.runtime.testtasks.NoOpInvokable
import org.apache.flink.util.SerializedValue
+
import org.junit.runner.RunWith
+
import org.scalatest.junit.JUnitRunner
import org.scalatest.{Matchers, WordSpecLike}
@@ -39,6 +42,8 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
"A task manager loss" must {
"fail the assigned tasks" in {
+ val executor = Executors.newScheduledThreadPool(1)
+
try {
val instance1 = ExecutionGraphTestUtils.getInstance(
new ActorTaskManagerGateway(new SimpleActorGateway(TestingUtils.defaultExecutionContext)),
@@ -58,8 +63,8 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
val jobGraph = new JobGraph("Pointwise job", sender)
val eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext,
- TestingUtils.defaultExecutionContext,
+ executor,
+ executor,
new JobID(),
"test job",
new Configuration(),
@@ -81,6 +86,9 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
t.printStackTrace()
fail(t.getMessage)
}
+ finally {
+ executor.shutdownNow()
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index cf00206..dfcbf77 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobmanager
import java.net.InetAddress
+import java.util.concurrent.{Executors, ScheduledExecutorService}
import akka.actor._
import akka.testkit.{ImplicitSender, TestKit}
@@ -51,7 +52,10 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
def this() = this(AkkaUtils.createLocalActorSystem(new Configuration()))
+ val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
+
override def afterAll(): Unit = {
+ executor.shutdownNow()
TestKit.shutdownActorSystem(system)
}
@@ -172,8 +176,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
val (jm: ActorRef, _) = JobManager.startJobManagerActors(
new Configuration(),
_system,
- _system.dispatcher,
- _system.dispatcher,
+ executor,
+ executor,
None,
None,
classOf[JobManager],
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index d6215eb..bd4a8fc 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.testingUtils
import java.io.IOException
-import java.util.concurrent.{Executor, TimeUnit, TimeoutException}
+import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit, TimeoutException}
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.pattern.Patterns._
@@ -85,7 +85,7 @@ class TestingCluster(
override def getJobManagerProps(
jobManagerClass: Class[_ <: JobManager],
configuration: Configuration,
- futureExecutor: Executor,
+ futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
instanceManager: InstanceManager,
scheduler: Scheduler,
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 39c7a53..f50a832 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.testingUtils
-import java.util.concurrent.{Executor, ExecutorService}
+import java.util.concurrent.{Executor, ScheduledExecutorService}
import akka.actor.ActorRef
import org.apache.flink.configuration.Configuration
@@ -39,7 +39,7 @@ import scala.language.postfixOps
*/
class TestingJobManager(
flinkConfiguration: Configuration,
- futureExecutor: Executor,
+ futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
instanceManager: InstanceManager,
scheduler: Scheduler,