You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:16 UTC
[09/50] [abbrv] flink git commit: [FLINK-5810] [flip-6] Use single
timeout task for SlotManager
[FLINK-5810] [flip-6] Use single timeout task for SlotManager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d75ec5b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d75ec5b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d75ec5b3
Branch: refs/heads/table-retraction
Commit: d75ec5b3551573d4eb1886c8e75dfdf6dc328da1
Parents: d16a5a2
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Apr 27 17:46:29 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Apr 28 15:28:26 2017 +0200
----------------------------------------------------------------------
.../slotmanager/PendingSlotRequest.java | 35 +--
.../slotmanager/SlotManager.java | 219 ++++++++------
.../slotmanager/TaskManagerRegistration.java | 38 +--
.../clusterframework/ResourceManagerTest.java | 8 +-
.../slotmanager/SlotManagerTest.java | 291 +++++++++----------
.../slotmanager/SlotProtocolTest.java | 2 -
.../src/test/resources/log4j-test.properties | 2 +-
.../runtime/testingUtils/TestingUtils.scala | 4 +-
8 files changed, 295 insertions(+), 304 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
index 1195791..ffe1bfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
@@ -27,8 +27,6 @@ import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
-import java.util.UUID;
-import java.util.concurrent.ScheduledFuture;
public class PendingSlotRequest {
@@ -37,14 +35,12 @@ public class PendingSlotRequest {
@Nullable
private CompletableFuture<Acknowledge> requestFuture;
- @Nullable
- private UUID timeoutIdentifier;
-
- @Nullable
- private ScheduledFuture<?> timeoutFuture;
+ /** Timestamp when this pending slot request has been created. */
+ private final long creationTimestamp;
public PendingSlotRequest(SlotRequest slotRequest) {
this.slotRequest = Preconditions.checkNotNull(slotRequest);
+ creationTimestamp = System.currentTimeMillis();
}
// ------------------------------------------------------------------------
@@ -57,11 +53,6 @@ public class PendingSlotRequest {
return slotRequest.getResourceProfile();
}
- @Nullable
- public UUID getTimeoutIdentifier() {
- return timeoutIdentifier;
- }
-
public JobID getJobId() {
return slotRequest.getJobId();
}
@@ -70,6 +61,10 @@ public class PendingSlotRequest {
return slotRequest.getTargetAddress();
}
+ public long getCreationTimestamp() {
+ return creationTimestamp;
+ }
+
public boolean isAssigned() {
return null != requestFuture;
}
@@ -82,20 +77,4 @@ public class PendingSlotRequest {
public CompletableFuture<Acknowledge> getRequestFuture() {
return requestFuture;
}
-
- public void cancelTimeout() {
- if (timeoutFuture != null) {
- timeoutFuture.cancel(true);
-
- timeoutIdentifier = null;
- timeoutFuture = null;
- }
- }
-
- public void registerTimeout(ScheduledFuture<?> newTimeoutFuture, UUID newTimeoutIdentifier) {
- cancelTimeout();
-
- timeoutFuture = newTimeoutFuture;
- timeoutIdentifier = newTimeoutIdentifier;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index f09b73a..829a06d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -64,11 +64,10 @@ import java.util.concurrent.TimeoutException;
* {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
*
* In order to free resources and avoid resource leaks, idling task managers (task managers whose
- * slots are currently not used) and not fulfilled pending slot requests time out triggering their
- * release and failure, respectively.
+ * slots are currently not used) and pending slot requests time out triggering their release and
+ * failure, respectively.
*/
public class SlotManager implements AutoCloseable {
-
private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
/** Scheduled executor for timeouts */
@@ -107,6 +106,10 @@ public class SlotManager implements AutoCloseable {
/** Callbacks for resource (de-)allocations */
private ResourceManagerActions resourceManagerActions;
+ private ScheduledFuture<?> taskManagerTimeoutCheck;
+
+ private ScheduledFuture<?> slotRequestTimeoutCheck;
+
/** True iff the component has been started */
private boolean started;
@@ -128,6 +131,10 @@ public class SlotManager implements AutoCloseable {
leaderId = null;
resourceManagerActions = null;
+ mainThreadExecutor = null;
+ taskManagerTimeoutCheck = null;
+ slotRequestTimeoutCheck = null;
+
started = false;
}
@@ -142,17 +149,52 @@ public class SlotManager implements AutoCloseable {
* @param newResourceManagerActions to use for resource (de-)allocations
*/
public void start(UUID newLeaderId, Executor newMainThreadExecutor, ResourceManagerActions newResourceManagerActions) {
+ LOG.info("Starting the SlotManager.");
+
leaderId = Preconditions.checkNotNull(newLeaderId);
mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
resourceManagerActions = Preconditions.checkNotNull(newResourceManagerActions);
started = true;
+
+ taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ mainThreadExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ checkTaskManagerTimeouts();
+ }
+ });
+ }
+ }, 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ mainThreadExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ checkSlotRequestTimeouts();
+ }
+ });
+ }
+ }, 0L, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
/**
* Suspends the component. This clears the internal state of the slot manager.
*/
public void suspend() {
+ LOG.info("Suspending the SlotManager.");
+
+ // stop the timeout checks for the TaskManagers and the SlotRequests
+ taskManagerTimeoutCheck.cancel(false);
+ slotRequestTimeoutCheck.cancel(false);
+
+ taskManagerTimeoutCheck = null;
+ slotRequestTimeoutCheck = null;
+
for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
cancelPendingSlotRequest(pendingSlotRequest);
}
@@ -177,6 +219,8 @@ public class SlotManager implements AutoCloseable {
*/
@Override
public void close() throws Exception {
+ LOG.info("Closing the SlotManager.");
+
suspend();
}
@@ -249,6 +293,8 @@ public class SlotManager implements AutoCloseable {
public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
checkInit();
+ LOG.info("Register TaskManager {} at the SlotManager.", taskExecutorConnection.getInstanceID());
+
// we identify task managers by their instance id
if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
@@ -272,8 +318,13 @@ public class SlotManager implements AutoCloseable {
taskExecutorConnection);
}
- if (!anySlotUsed(taskManagerRegistration.getSlots())) {
- registerTaskManagerTimeout(taskManagerRegistration);
+ // determine if the task manager is idle or not
+ boolean idle = !anySlotUsed(taskManagerRegistration.getSlots());
+
+ if (idle) {
+ taskManagerRegistration.markIdle();
+ } else {
+ taskManagerRegistration.markUsed();
}
}
@@ -292,9 +343,7 @@ public class SlotManager implements AutoCloseable {
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
if (null != taskManagerRegistration) {
- removeSlots(taskManagerRegistration.getSlots());
-
- taskManagerRegistration.cancelTimeout();
+ internalUnregisterTaskManager(taskManagerRegistration);
return true;
} else {
@@ -334,8 +383,11 @@ public class SlotManager implements AutoCloseable {
}
if (idle) {
- // no slot of this task manager is being used --> register timer to free this resource
- registerTaskManagerTimeout(taskManagerRegistration);
+ // no slot of this task manager is being used --> mark this task manager to be idle which allows it to
+ // time out
+ taskManagerRegistration.markIdle();
+ } else {
+ taskManagerRegistration.markUsed();
}
return true;
@@ -371,9 +423,14 @@ public class SlotManager implements AutoCloseable {
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
- if (null != taskManagerRegistration && !anySlotUsed(taskManagerRegistration.getSlots())) {
- registerTaskManagerTimeout(taskManagerRegistration);
+ if (null != taskManagerRegistration) {
+ if (anySlotUsed(taskManagerRegistration.getSlots())) {
+ taskManagerRegistration.markUsed();
+ } else {
+ taskManagerRegistration.markIdle();
+ }
}
+
} else {
LOG.debug("Received request to free slot {} with expected allocation id {}, " +
"but actual allocation id {} differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId());
@@ -524,8 +581,8 @@ public class SlotManager implements AutoCloseable {
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
if (null != taskManagerRegistration) {
- // disable any registered time out for the task manager
- taskManagerRegistration.cancelTimeout();
+ // mark this TaskManager to be used to exempt it from timing out
+ taskManagerRegistration.markUsed();
}
}
@@ -551,24 +608,6 @@ public class SlotManager implements AutoCloseable {
if (taskManagerSlot != null) {
allocateSlot(taskManagerSlot, pendingSlotRequest);
} else {
- final UUID timeoutIdentifier = UUID.randomUUID();
- final AllocationID allocationId = pendingSlotRequest.getAllocationId();
-
- // register timeout for slot request
- ScheduledFuture<?> timeoutFuture = scheduledExecutor.schedule(new Runnable() {
- @Override
- public void run() {
- mainThreadExecutor.execute(new Runnable() {
- @Override
- public void run() {
- timeoutSlotRequest(allocationId, timeoutIdentifier);
- }
- });
- }
- }, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
- pendingSlotRequest.registerTimeout(timeoutFuture, timeoutIdentifier);
-
resourceManagerActions.allocateResource(pendingSlotRequest.getResourceProfile());
}
}
@@ -591,6 +630,16 @@ public class SlotManager implements AutoCloseable {
taskManagerSlot.setAssignedSlotRequest(pendingSlotRequest);
pendingSlotRequest.setRequestFuture(completableFuture);
+ TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
+
+ if (taskManagerRegistration != null) {
+ // mark the task manager to be used since we have a pending slot request assigned ot one of its slots
+ taskManagerRegistration.markUsed();
+ } else {
+ throw new IllegalStateException("Could not find a registered task manager for instance id " +
+ taskManagerSlot.getInstanceId() + '.');
+ }
+
// RPC call to the task manager
Future<Acknowledge> requestFuture = gateway.requestSlot(
slotId,
@@ -717,7 +766,7 @@ public class SlotManager implements AutoCloseable {
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
if (null != taskManagerRegistration && !anySlotUsed(taskManagerRegistration.getSlots())) {
- registerTaskManagerTimeout(taskManagerRegistration);
+ taskManagerRegistration.markIdle();
}
} else {
LOG.debug("There was no slot with {} registered. Probably this slot has been already freed.", slotId);
@@ -778,8 +827,6 @@ public class SlotManager implements AutoCloseable {
* @param pendingSlotRequest to cancel
*/
private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
- pendingSlotRequest.cancelTimeout();
-
CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture();
if (null != request) {
@@ -791,54 +838,50 @@ public class SlotManager implements AutoCloseable {
// Internal timeout methods
// ---------------------------------------------------------------------------------------------
- private void timeoutTaskManager(InstanceID instanceId, UUID timeoutIdentifier) {
- TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
+ private void checkTaskManagerTimeouts() {
+ if (!taskManagerRegistrations.isEmpty()) {
+ long currentTime = System.currentTimeMillis();
+
+ Iterator<Map.Entry<InstanceID, TaskManagerRegistration>> taskManagerRegistrationIterator = taskManagerRegistrations.entrySet().iterator();
+
+ while (taskManagerRegistrationIterator.hasNext()) {
+ TaskManagerRegistration taskManagerRegistration = taskManagerRegistrationIterator.next().getValue();
- if (null != taskManagerRegistration) {
- if (Objects.equals(timeoutIdentifier, taskManagerRegistration.getTimeoutIdentifier())) {
if (anySlotUsed(taskManagerRegistration.getSlots())) {
- LOG.debug("Cannot release the task manager with instance id {}, because some " +
- "of its slots are still being used.", instanceId);
- } else {
- unregisterTaskManager(instanceId);
+ taskManagerRegistration.markUsed();
+ } else if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) {
+ taskManagerRegistrationIterator.remove();
- resourceManagerActions.releaseResource(instanceId);
- }
- } else {
- taskManagerRegistrations.put(instanceId, taskManagerRegistration);
+ internalUnregisterTaskManager(taskManagerRegistration);
- LOG.debug("Expected timeout identifier {} differs from the task manager's " +
- "timeout identifier {}. Ignoring the task manager timeout call.",
- timeoutIdentifier, taskManagerRegistration.getTimeoutIdentifier());
+ resourceManagerActions.releaseResource(taskManagerRegistration.getInstanceId());
+ }
}
- } else {
- LOG.debug("Could not find a registered task manager with instance id {}. Ignoring the task manager timeout call.", instanceId);
}
}
- private void timeoutSlotRequest(AllocationID allocationId, UUID timeoutIdentifier) {
- PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
+ private void checkSlotRequestTimeouts() {
+ if (!pendingSlotRequests.isEmpty()) {
+ long currentTime = System.currentTimeMillis();
- if (null != pendingSlotRequest) {
- if (Objects.equals(timeoutIdentifier, pendingSlotRequest.getTimeoutIdentifier())) {
- if (!pendingSlotRequest.isAssigned()) {
+ Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator();
+
+ while (slotRequestIterator.hasNext()) {
+ PendingSlotRequest slotRequest = slotRequestIterator.next().getValue();
+
+ if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) {
+ slotRequestIterator.remove();
+
+ if (slotRequest.isAssigned()) {
+ cancelPendingSlotRequest(slotRequest);
+ }
resourceManagerActions.notifyAllocationFailure(
- pendingSlotRequest.getJobId(),
- allocationId,
+ slotRequest.getJobId(),
+ slotRequest.getAllocationId(),
new TimeoutException("The allocation could not be fulfilled in time."));
- } else {
- LOG.debug("Cannot fail pending slot request {} because it has been assigned.", allocationId);
}
- } else {
- pendingSlotRequests.put(allocationId, pendingSlotRequest);
-
- LOG.debug("Expected timeout identifier {} differs from the pending slot request's " +
- "timeout identifier {}. Ignoring the slot request timeout call.",
- timeoutIdentifier, pendingSlotRequest.getTimeoutIdentifier());
}
- } else {
- LOG.debug("Could not find pending slot request with allocation id {}. Ignoring the slot request timeout call.", allocationId);
}
}
@@ -846,6 +889,12 @@ public class SlotManager implements AutoCloseable {
// Internal utility methods
// ---------------------------------------------------------------------------------------------
+ private void internalUnregisterTaskManager(TaskManagerRegistration taskManagerRegistration) {
+ Preconditions.checkNotNull(taskManagerRegistration);
+
+ removeSlots(taskManagerRegistration.getSlots());
+ }
+
private boolean checkDuplicateRequest(AllocationID allocationId) {
return pendingSlotRequests.containsKey(allocationId) || fulfilledSlotRequests.containsKey(allocationId);
}
@@ -853,38 +902,18 @@ public class SlotManager implements AutoCloseable {
private boolean anySlotUsed(Iterable<SlotID> slotsToCheck) {
if (null != slotsToCheck) {
- boolean idle = true;
-
for (SlotID slotId : slotsToCheck) {
TaskManagerSlot taskManagerSlot = slots.get(slotId);
if (null != taskManagerSlot) {
- idle &= taskManagerSlot.isFree();
+ if (taskManagerSlot.isAllocated()) {
+ return true;
+ }
}
}
-
- return !idle;
- } else {
- return false;
}
- }
-
- private void registerTaskManagerTimeout(final TaskManagerRegistration taskManagerRegistration) {
- final UUID timeoutIdentifier = UUID.randomUUID();
-
- ScheduledFuture<?> timeoutFuture = scheduledExecutor.schedule(new Runnable() {
- @Override
- public void run() {
- mainThreadExecutor.execute(new Runnable() {
- @Override
- public void run() {
- timeoutTaskManager(taskManagerRegistration.getInstanceId(), timeoutIdentifier);
- }
- });
- }
- }, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- taskManagerRegistration.registerTimeout(timeoutFuture, timeoutIdentifier);
+ return false;
}
private void checkInit() {
@@ -911,11 +940,11 @@ public class SlotManager implements AutoCloseable {
}
@VisibleForTesting
- boolean hasTimeoutRegistered(InstanceID instanceId) {
+ boolean isTaskManagerIdle(InstanceID instanceId) {
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
if (null != taskManagerRegistration) {
- return taskManagerRegistration.getTimeoutIdentifier() != null;
+ return taskManagerRegistration.isIdle();
} else {
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
index 3a15cb3..7d3764c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
@@ -25,8 +25,6 @@ import org.apache.flink.util.Preconditions;
import java.util.Collection;
import java.util.HashSet;
-import java.util.UUID;
-import java.util.concurrent.ScheduledFuture;
public class TaskManagerRegistration {
@@ -34,9 +32,8 @@ public class TaskManagerRegistration {
private final HashSet<SlotID> slots;
- private UUID timeoutIdentifier;
-
- private ScheduledFuture<?> timeoutFuture;
+ /** Timestamp when the last time becoming idle. Otherwise Long.MAX_VALUE. */
+ private long idleSince;
public TaskManagerRegistration(
TaskExecutorConnection taskManagerConnection,
@@ -47,8 +44,7 @@ public class TaskManagerRegistration {
this.slots = new HashSet<>(slots);
- timeoutIdentifier = null;
- timeoutFuture = null;
+ idleSince = Long.MAX_VALUE;
}
public TaskExecutorConnection getTaskManagerConnection() {
@@ -59,31 +55,27 @@ public class TaskManagerRegistration {
return taskManagerConnection.getInstanceID();
}
- public UUID getTimeoutIdentifier() {
- return timeoutIdentifier;
- }
-
public Iterable<SlotID> getSlots() {
return slots;
}
- public boolean containsSlot(SlotID slotId) {
- return slots.contains(slotId);
+ public long getIdleSince() {
+ return idleSince;
}
- public void cancelTimeout() {
- if (null != timeoutFuture) {
- timeoutFuture.cancel(false);
+ public boolean isIdle() {
+ return idleSince != Long.MAX_VALUE;
+ }
- timeoutFuture = null;
- timeoutIdentifier = null;
- }
+ public void markIdle() {
+ idleSince = System.currentTimeMillis();
}
- public void registerTimeout(ScheduledFuture<?> newTimeoutFuture, UUID newTimeoutIdentifier) {
- cancelTimeout();
+ public void markUsed() {
+ idleSince = Long.MAX_VALUE;
+ }
- timeoutFuture = newTimeoutFuture;
- timeoutIdentifier = newTimeoutIdentifier;
+ public boolean containsSlot(SlotID slotId) {
+ return slots.contains(slotId);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 41c2e16..c740518 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -488,13 +488,17 @@ public class ResourceManagerTest extends TestLogger {
final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
- final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
final MetricRegistry metricRegistry = mock(MetricRegistry.class);
final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
highAvailabilityServices,
rpcService.getScheduledExecutor(),
Time.minutes(5L));
final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+ final SlotManager slotManager = new SlotManager(
+ TestingUtils.defaultScheduledExecutor(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime());
try {
final StandaloneResourceManager resourceManager = new StandaloneResourceManager(
@@ -504,7 +508,7 @@ public class ResourceManagerTest extends TestLogger {
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
- slotManagerFactory,
+ slotManager,
metricRegistry,
jobLeaderIdService,
testingFatalErrorHandler);
http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index fff2829..39c5f25 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -25,10 +25,9 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.*;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -38,7 +37,6 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
-import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -46,9 +44,11 @@ import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -648,7 +648,7 @@ public class SlotManagerTest extends TestLogger {
*/
@Test
public void testTaskManagerTimeout() throws Exception {
- final long tmTimeout = 50L;
+ final long tmTimeout = 500L;
final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
final UUID leaderId = UUID.randomUUID();
@@ -661,7 +661,7 @@ public class SlotManagerTest extends TestLogger {
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
final SlotReport slotReport = new SlotReport(slotStatus);
- final Executor mainThreadExecutor = mock(Executor.class);
+ final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
try (SlotManager slotManager = new SlotManager(
TestingUtils.defaultScheduledExecutor(),
@@ -671,24 +671,21 @@ public class SlotManagerTest extends TestLogger {
slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
- slotManager.registerTaskManager(taskManagerConnection, slotReport);
-
- ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
-
- verify(mainThreadExecutor, timeout(tmTimeout * 10L)).execute(runnableArgumentCaptor.capture());
+ mainThreadExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ slotManager.registerTaskManager(taskManagerConnection, slotReport);
+ }
+ });
- // the only runnable being executed by the main thread executor should be the timeout runnable
- Runnable timeoutRunnable = runnableArgumentCaptor.getValue();
-
- timeoutRunnable.run();
-
- verify(resourceManagerActions, times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()));
+ verify(resourceManagerActions, timeout(100L * tmTimeout).times(1))
+ .releaseResource(eq(taskManagerConnection.getInstanceID()));
}
}
/**
* Tests that slot requests time out after the specified request timeout. If a slot request
- * times out, then the request is cancelled, removed from the slot manager and the resourc
+ * times out, then the request is cancelled, removed from the slot manager and the resource
* manager is notified about the failed allocation.
*/
@Test
@@ -703,7 +700,7 @@ public class SlotManagerTest extends TestLogger {
final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
- final Executor mainThreadExecutor = mock(Executor.class);
+ final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
try (SlotManager slotManager = new SlotManager(
TestingUtils.defaultScheduledExecutor(),
@@ -713,21 +710,27 @@ public class SlotManagerTest extends TestLogger {
slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
- assertTrue(slotManager.registerSlotRequest(slotRequest));
-
- ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
-
- verify(mainThreadExecutor, timeout(allocationTimeout * 10L)).execute(runnableArgumentCaptor.capture());
-
- // the only runnable being executed by the main thread executor should be the timeout runnable
- Runnable timeoutRunnable = runnableArgumentCaptor.getValue();
+ final AtomicReference<Exception> atomicException = new AtomicReference<>(null);
- timeoutRunnable.run();
+ mainThreadExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ assertTrue(slotManager.registerSlotRequest(slotRequest));
+ } catch (Exception e) {
+ atomicException.compareAndSet(null, e);
+ }
+ }
+ });
- verify(resourceManagerActions, times(1)).notifyAllocationFailure(
+ verify(resourceManagerActions, timeout(100L * allocationTimeout).times(1)).notifyAllocationFailure(
eq(jobId),
eq(allocationId),
any(TimeoutException.class));
+
+ if (atomicException.get() != null) {
+ throw atomicException.get();
+ }
}
}
@@ -815,6 +818,7 @@ public class SlotManagerTest extends TestLogger {
@Test
@SuppressWarnings("unchecked")
public void testSlotReportWhileActiveSlotRequest() throws Exception {
+ final long verifyTimeout = 1000L;
final UUID leaderId = UUID.randomUUID();
final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
@@ -842,20 +846,37 @@ public class SlotManagerTest extends TestLogger {
final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
- // we have to manually trigger the future call backs to simulate the main thread executor behaviour
- final Executor mainThreadExecutorMock = mock(Executor.class);
+ final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
- try (SlotManager slotManager = new SlotManager(
+ try (final SlotManager slotManager = new SlotManager(
TestingUtils.defaultScheduledExecutor(),
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime())) {
- slotManager.start(leaderId, mainThreadExecutorMock, resourceManagerActions);
-
- slotManager.registerTaskManager(taskManagerConnection, slotReport);
+ slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
- slotManager.registerSlotRequest(slotRequest);
+ Future<Void> registrationFuture = FlinkFuture.supplyAsync(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ slotManager.registerTaskManager(taskManagerConnection, slotReport);
+
+ return null;
+ }
+ }, mainThreadExecutor)
+ .thenAccept(new AcceptFunction<Void>() {
+ @Override
+ public void accept(Void value) {
+ try {
+ slotManager.registerSlotRequest(slotRequest);
+ } catch (SlotManagerException e) {
+ throw new RuntimeException("Could not register slots.", e);
+ }
+ }
+ });
+
+ // check that no exception has been thrown
+ registrationFuture.get();
ArgumentCaptor<SlotID> slotIdCaptor = ArgumentCaptor.forClass(SlotID.class);
@@ -867,26 +888,33 @@ public class SlotManagerTest extends TestLogger {
eq(leaderId),
any(Time.class));
- final SlotID requestedSlotdId = slotIdCaptor.getValue();
- final SlotID freeSlotId = requestedSlotdId.equals(slotId1) ? slotId2 : slotId1;
+ final SlotID requestedSlotId = slotIdCaptor.getValue();
+ final SlotID freeSlotId = requestedSlotId.equals(slotId1) ? slotId2 : slotId1;
+
+ Future<Boolean> freeSlotFuture = FlinkFuture.supplyAsync(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return slotManager.getSlot(freeSlotId).isFree();
+ }
+ }, mainThreadExecutor);
- assertTrue(slotManager.getSlot(freeSlotId).isFree());
+ assertTrue(freeSlotFuture.get());
final SlotStatus newSlotStatus1 = new SlotStatus(slotIdCaptor.getValue(), resourceProfile, new JobID(), new AllocationID());
final SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile);
final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
- // this should update the slot with the pending slot request triggering the reassignment of it
- slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport);
+ FlinkFuture.supplyAsync(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ // this should update the slot with the pending slot request triggering the reassignment of it
+ slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport);
- ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
- verify(mainThreadExecutorMock).execute(runnableArgumentCaptor.capture());
+ return null;
+ }
+ }, mainThreadExecutor);
- Runnable requestFailureRunnable = runnableArgumentCaptor.getValue();
-
- requestFailureRunnable.run();
-
- verify(taskExecutorGateway, times(2)).requestSlot(
+ verify(taskExecutorGateway, timeout(verifyTimeout).times(2)).requestSlot(
slotIdCaptor.capture(),
eq(jobId),
eq(allocationId),
@@ -894,16 +922,18 @@ public class SlotManagerTest extends TestLogger {
eq(leaderId),
any(Time.class));
- verify(mainThreadExecutorMock, times(2)).execute(runnableArgumentCaptor.capture());
- Runnable requestSuccessRunnable = runnableArgumentCaptor.getValue();
+ final SlotID requestedSlotId2 = slotIdCaptor.getValue();
- requestSuccessRunnable.run();
-
- final SlotID requestedSlotId = slotIdCaptor.getValue();
+ assertEquals(slotId2, requestedSlotId2);
- assertEquals(slotId2, requestedSlotId);
+ Future<TaskManagerSlot> requestedSlotFuture = FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() {
+ @Override
+ public TaskManagerSlot call() throws Exception {
+ return slotManager.getSlot(requestedSlotId2);
+ }
+ }, mainThreadExecutor);
- TaskManagerSlot slot = slotManager.getSlot(requestedSlotId);
+ TaskManagerSlot slot = requestedSlotFuture.get();
assertTrue(slot.isAllocated());
assertEquals(allocationId, slot.getAllocationId());
@@ -916,11 +946,12 @@ public class SlotManagerTest extends TestLogger {
*/
@Test
public void testTimeoutForUnusedTaskManager() throws Exception {
- final long taskManagerTimeout = 123456L;
+ final long taskManagerTimeout = 50L;
+ final long verifyTimeout = taskManagerTimeout * 10L;
final UUID leaderId = UUID.randomUUID();
final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
- final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+ final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor();
final ResourceID resourceId = ResourceID.generate();
@@ -946,21 +977,34 @@ public class SlotManagerTest extends TestLogger {
final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
final SlotReport initialSlotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
- try (SlotManager slotManager = new SlotManager(
+ final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
+
+ try (final SlotManager slotManager = new SlotManager(
scheduledExecutor,
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime(),
Time.of(taskManagerTimeout, TimeUnit.MILLISECONDS))) {
- slotManager.start(leaderId, Executors.directExecutor(), resourceManagerActions);
-
- slotManager.registerSlotRequest(slotRequest);
+ slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
- slotManager.registerTaskManager(taskManagerConnection, initialSlotReport);
+ FlinkFuture.supplyAsync(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ slotManager.registerSlotRequest(slotRequest);
+
+ return null;
+ }
+ }, mainThreadExecutor)
+ .thenAccept(new AcceptFunction<Void>() {
+ @Override
+ public void accept(Void value) {
+ slotManager.registerTaskManager(taskManagerConnection, initialSlotReport);
+ }
+ });
ArgumentCaptor<SlotID> slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class);
- verify(taskExecutorGateway).requestSlot(
+ verify(taskExecutorGateway, timeout(verifyTimeout)).requestSlot(
slotIdArgumentCaptor.capture(),
eq(jobId),
eq(allocationId),
@@ -968,103 +1012,48 @@ public class SlotManagerTest extends TestLogger {
eq(leaderId),
any(Time.class));
- assertFalse(slotManager.hasTimeoutRegistered(taskManagerConnection.getInstanceID()));
-
- SlotID slotId = slotIdArgumentCaptor.getValue();
- TaskManagerSlot slot = slotManager.getSlot(slotId);
-
- assertTrue(slot.isAllocated());
- assertEquals(allocationId, slot.getAllocationId());
-
- slotManager.freeSlot(slotId, allocationId);
+ Future<Boolean> idleFuture = FlinkFuture.supplyAsync(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
+ }
+ }, mainThreadExecutor);
- assertTrue(slotManager.hasTimeoutRegistered(taskManagerConnection.getInstanceID()));
+ // check that the TaskManaer is not idle
+ assertFalse(idleFuture.get());
- ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
+ final SlotID slotId = slotIdArgumentCaptor.getValue();
- // filter out the schedule call for the task manager which will be registered using the
- // taskManagerTimeout value
- verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), eq(taskManagerTimeout), eq(TimeUnit.MILLISECONDS));
+ Future<TaskManagerSlot> slotFuture = FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() {
+ @Override
+ public TaskManagerSlot call() throws Exception {
+ return slotManager.getSlot(slotId);
+ }
+ }, mainThreadExecutor);
- Runnable timeoutRunnable = runnableArgumentCaptor.getValue();
+ TaskManagerSlot slot = slotFuture.get();
- timeoutRunnable.run();
-
- verify(resourceManagerActions, times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()));
- }
- }
-
- /**
- * Tests that the slot manager re-registers a timeout for a rejected slot request.
- */
- @Test
- public void testTimeoutForRejectedSlotRequest() throws Exception {
-
- final long slotRequestTimeout = 1337L;
- final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
-
- final ResourceID resourceId = ResourceID.generate();
- final SlotID slotId = new SlotID(resourceId, 0);
- final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
- final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
- final SlotReport slotReport = new SlotReport(slotStatus);
-
- final UUID leaderId = UUID.randomUUID();
- final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
-
- final JobID jobId = new JobID();
- final AllocationID allocationId = new AllocationID();
- final AllocationID allocationId2 = new AllocationID();
- final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
-
- CompletableFuture<Acknowledge> requestFuture = new FlinkCompletableFuture<>();
-
- final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- eq(slotId),
- eq(jobId),
- eq(allocationId),
- anyString(),
- eq(leaderId),
- any(Time.class))).thenReturn(requestFuture);
-
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
-
- try (SlotManager slotManager = new SlotManager(
- scheduledExecutor,
- TestingUtils.infiniteTime(),
- Time.milliseconds(slotRequestTimeout),
- TestingUtils.infiniteTime())) {
-
- slotManager.start(leaderId, Executors.directExecutor(), resourceManagerActions);
-
- slotManager.registerTaskManager(taskManagerConnection, slotReport);
-
- slotManager.registerSlotRequest(slotRequest);
-
- verify(taskExecutorGateway).requestSlot(
- eq(slotId),
- eq(jobId),
- eq(allocationId),
- anyString(),
- eq(leaderId),
- any(Time.class));
-
- requestFuture.completeExceptionally(new SlotOccupiedException("Slot is already occupied", allocationId2));
-
- ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
- verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), eq(slotRequestTimeout), eq(TimeUnit.MILLISECONDS));
+ assertTrue(slot.isAllocated());
+ assertEquals(allocationId, slot.getAllocationId());
- Runnable timeoutRunnable = runnableArgumentCaptor.getValue();
+ Future<Boolean> idleFuture2 = FlinkFuture.supplyAsync(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ slotManager.freeSlot(slotId, allocationId);
- timeoutRunnable.run();
+ return null;
+ }
+ }, mainThreadExecutor)
+ .thenApply(new ApplyFunction<Void, Boolean>() {
+ @Override
+ public Boolean apply(Void value) {
+ return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
+ }
+ });
- verify(resourceManagerActions).notifyAllocationFailure(eq(jobId), eq(allocationId), any(Exception.class));
+ assertTrue(idleFuture2.get());
- TaskManagerSlot slot = slotManager.getSlot(slotId);
-
- assertTrue(slot.isAllocated());
- assertEquals(allocationId2, slot.getAllocationId());
+ verify(resourceManagerActions, timeout(verifyTimeout).times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index c09316c..a1ab1ab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -76,7 +76,6 @@ public class SlotProtocolTest extends TestLogger {
@Test
public void testSlotsUnavailableRequest() throws Exception {
final JobID jobID = new JobID();
- final ResourceID jmResourceId = new ResourceID(jmAddress);
final UUID rmLeaderID = UUID.randomUUID();
@@ -133,7 +132,6 @@ public class SlotProtocolTest extends TestLogger {
@Test
public void testSlotAvailableRequest() throws Exception {
final JobID jobID = new JobID();
- final ResourceID jmResourceId = new ResourceID(jmAddress);
final UUID rmLeaderID = UUID.randomUUID();
http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 7ba1633..812a256 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=OFF, console
+log4j.rootLogger=DEBUG, console
# -----------------------------------------------------------------------------
# Console (use 'console')
http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 03b5172..876e26b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -82,7 +82,7 @@ object TestingUtils {
def getDefaultTestingActorSystemConfig = testConfig
def infiniteTime: Time = {
- Time.milliseconds(Long.MaxValue);
+ Time.milliseconds(Integer.MAX_VALUE);
}
@@ -113,7 +113,7 @@ object TestingUtils {
def defaultExecutor: ScheduledExecutorService = {
synchronized {
if (sharedExecutorInstance == null || sharedExecutorInstance.isShutdown) {
- sharedExecutorInstance = Executors.newSingleThreadScheduledExecutor()
+ sharedExecutorInstance = Executors.newSingleThreadScheduledExecutor();
}
sharedExecutorInstance