You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/30 21:55:55 UTC
[1/2] flink git commit: [FLINK-8504] [flip6] Deregister jobs from the
JobLeaderService when no more slots allocated
Repository: flink
Updated Branches:
refs/heads/master 0e20b6130 -> e94a488dd
[FLINK-8504] [flip6] Deregister jobs from the JobLeaderService when no more slots allocated
Let the TaskExecutor deregister jobs from the JobLeaderService once it has no more slots
for this job allocated.
This closes #5361.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e94a488d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e94a488d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e94a488d
Branch: refs/heads/master
Commit: e94a488dd78e7c2efdf55a67cea886ee15a641a6
Parents: 23ff120
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jan 25 13:50:43 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jan 30 18:30:17 2018 +0100
----------------------------------------------------------------------
.../runtime/taskexecutor/JobLeaderService.java | 40 +++---
.../runtime/taskexecutor/TaskExecutor.java | 80 ++++++------
.../taskexecutor/slot/TaskSlotTable.java | 45 ++++++-
.../runtime/taskexecutor/TaskExecutorTest.java | 121 ++++++++++++++++++-
4 files changed, 223 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 3b4da4e..5376362 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.taskexecutor;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -37,11 +38,11 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
/**
@@ -80,7 +81,9 @@ public class JobLeaderService {
public JobLeaderService(TaskManagerLocation location) {
this.ownLocation = Preconditions.checkNotNull(location);
- jobLeaderServices = new HashMap<>(4);
+ // Has to be a concurrent hash map because tests might access this service
+ // concurrently via containsJob
+ jobLeaderServices = new ConcurrentHashMap<>(4);
state = JobLeaderService.State.CREATED;
@@ -147,18 +150,6 @@ public class JobLeaderService {
}
/**
- * Check whether the service monitors the given job.
- *
- * @param jobId identifying the job
- * @return True if the given job is monitored; otherwise false
- */
- public boolean containsJob(JobID jobId) {
- Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running.");
-
- return jobLeaderServices.containsKey(jobId);
- }
-
- /**
* Remove the given job from being monitored by the job leader service.
*
* @param jobId identifying the job to remove from monitoring
@@ -199,9 +190,9 @@ public class JobLeaderService {
JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId);
- leaderRetrievalService.start(jobManagerLeaderListener);
-
jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, jobManagerLeaderListener));
+
+ leaderRetrievalService.start(jobManagerLeaderListener);
}
/**
@@ -435,4 +426,21 @@ public class JobLeaderService {
private enum State {
CREATED, STARTED, STOPPED
}
+
+ // -----------------------------------------------------------
+ // Testing methods
+ // -----------------------------------------------------------
+
+ /**
+ * Check whether the service monitors the given job.
+ *
+ * @param jobId identifying the job
+ * @return True if the given job is monitored; otherwise false
+ */
+ @VisibleForTesting
+ public boolean containsJob(JobID jobId) {
+ Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running.");
+
+ return jobLeaderServices.containsKey(jobId);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index ad7414c..9df2e88 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -93,7 +93,6 @@ import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -163,7 +162,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
// --------- job manager connections -----------
- private Map<ResourceID, JobManagerConnection> jobManagerConnections;
+ private final Map<ResourceID, JobManagerConnection> jobManagerConnections;
// --------- task slot allocation table -----------
@@ -195,7 +194,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
JobLeaderService jobLeaderService,
FatalErrorHandler fatalErrorHandler) {
- super(rpcService, AkkaRpcServiceUtils.createRandomName(TaskExecutor.TASK_MANAGER_NAME));
+ super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0.");
@@ -978,10 +977,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
ResourceID resourceID,
JobMasterGateway jobMasterGateway,
int blobPort) {
- Preconditions.checkNotNull(jobID);
- Preconditions.checkNotNull(resourceID);
- Preconditions.checkNotNull(jobMasterGateway);
- Preconditions.checkArgument(blobPort > 0 && blobPort < MAX_BLOB_PORT, "Blob server port is out of range.");
+ checkNotNull(jobID);
+ checkNotNull(resourceID);
+ checkNotNull(jobMasterGateway);
+ checkArgument(blobPort > 0 && blobPort < MAX_BLOB_PORT, "Blob server port is out of range.");
TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway);
@@ -1029,7 +1028,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
}
private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception cause) throws IOException {
- Preconditions.checkNotNull(jobManagerConnection);
+ checkNotNull(jobManagerConnection);
JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
jobManagerGateway.disconnectTaskManager(getResourceID(), cause);
jobManagerConnection.getLibraryCacheManager().shutdown();
@@ -1104,36 +1103,40 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
}
private void freeSlotInternal(AllocationID allocationId, Throwable cause) {
- Preconditions.checkNotNull(allocationId);
+ checkNotNull(allocationId);
try {
- TaskSlot taskSlot = taskSlotTable.freeSlot(allocationId, cause);
+ final JobID jobId = taskSlotTable.getOwningJob(allocationId);
- if (taskSlot != null && isConnectedToResourceManager()) {
- // the slot was freed. Tell the RM about it
- ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+ final int slotIndex = taskSlotTable.freeSlot(allocationId, cause);
- resourceManagerGateway.notifySlotAvailable(
- resourceManagerConnection.getRegistrationId(),
- new SlotID(getResourceID(), taskSlot.getIndex()),
- allocationId);
+ if (slotIndex != -1) {
- // check whether we still have allocated slots for the same job
- final JobID jobId = taskSlot.getJobId();
- final Iterator<Task> tasks = taskSlotTable.getTasks(jobId);
+ if (isConnectedToResourceManager()) {
+ // the slot was freed. Tell the RM about it
+ ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
- if (!tasks.hasNext()) {
- // we can remove the job from the job leader service
- try {
- jobLeaderService.removeJob(jobId);
- } catch (Exception e) {
- log.info("Could not remove job {} from JobLeaderService.", jobId, e);
- }
+ resourceManagerGateway.notifySlotAvailable(
+ resourceManagerConnection.getRegistrationId(),
+ new SlotID(getResourceID(), slotIndex),
+ allocationId);
+ }
- closeJobManagerConnection(
- jobId,
- new FlinkException("TaskExecutor " + getAddress() +
- " has no more allocated slots for job " + jobId + '.'));
+ if (jobId != null) {
+ // check whether we still have allocated slots for the same job
+ if (taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty()) {
+ // we can remove the job from the job leader service
+ try {
+ jobLeaderService.removeJob(jobId);
+ } catch (Exception e) {
+ log.info("Could not remove job {} from JobLeaderService.", jobId, e);
+ }
+
+ closeJobManagerConnection(
+ jobId,
+ new FlinkException("TaskExecutor " + getAddress() +
+ " has no more allocated slots for job " + jobId + '.'));
+ }
}
}
} catch (SlotNotFoundException e) {
@@ -1141,13 +1144,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
}
}
- private void freeSlotInternal(AllocationID allocationId) {
- freeSlotInternal(allocationId, new Exception("The slot " + allocationId + " is being freed."));
- }
-
private void timeoutSlot(AllocationID allocationId, UUID ticket) {
- Preconditions.checkNotNull(allocationId);
- Preconditions.checkNotNull(ticket);
+ checkNotNull(allocationId);
+ checkNotNull(ticket);
if (taskSlotTable.isValidTimeout(allocationId, ticket)) {
freeSlotInternal(allocationId, new Exception("The slot " + allocationId + " has timed out."));
@@ -1285,7 +1284,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
private final JobMasterGateway jobMasterGateway;
private TaskManagerActionsImpl(JobMasterGateway jobMasterGateway) {
- this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
+ this.jobMasterGateway = checkNotNull(jobMasterGateway);
}
@Override
@@ -1318,7 +1317,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public void freeSlot(final AllocationID allocationId) {
- runAsync(() -> TaskExecutor.this.freeSlotInternal(allocationId));
+ runAsync(() ->
+ freeSlotInternal(
+ allocationId,
+ new FlinkException("TaskSlotTable requested freeing the TaskSlot " + allocationId + '.')));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index fcb2761..f8f9164 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -133,6 +133,22 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
slotActions = null;
}
+ /**
+ * Returns the all {@link AllocationID} for the given job.
+ *
+ * @param jobId for which to return the set of {@link AllocationID}.
+ * @return Set of {@link AllocationID} for the given job
+ */
+ public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) {
+ final Set<AllocationID> allocationIds = slotsPerJob.get(jobId);
+
+ if (allocationIds == null) {
+ return Collections.emptySet();
+ } else {
+ return Collections.unmodifiableSet(allocationIds);
+ }
+ }
+
// ---------------------------------------------------------------------
// Slot report methods
// ---------------------------------------------------------------------
@@ -268,7 +284,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
* @throws SlotNotFoundException if there is not task slot for the given allocation id
* @return Index of the freed slot if the slot could be freed; otherwise -1
*/
- public TaskSlot freeSlot(AllocationID allocationId) throws SlotNotFoundException {
+ public int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
return freeSlot(allocationId, new Exception("The task slot of this task is being freed."));
}
@@ -282,8 +298,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
* @throws SlotNotFoundException if there is not task slot for the given allocation id
* @return The freed TaskSlot. If the TaskSlot cannot be freed then null.
*/
- @Nullable
- public TaskSlot freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
+ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
checkInit();
TaskSlot taskSlot = getTaskSlot(allocationId);
@@ -317,7 +332,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
slotsPerJob.remove(jobId);
}
- return taskSlot;
+ return taskSlot.getIndex();
} else {
// we couldn't free the task slot because it still contains task, fail the tasks
// and set the slot state to releasing so that it gets eventually freed
@@ -329,7 +344,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
taskIterator.next().failExternally(cause);
}
- return null;
+ return -1;
}
} else {
throw new SlotNotFoundException(allocationId);
@@ -422,6 +437,25 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
return new AllocationIDIterator(jobId, TaskSlotState.ACTIVE);
}
+ /**
+ * Returns the owning job of the {@link TaskSlot} identified by the
+ * given {@link AllocationID}.
+ *
+ * @param allocationId identifying the slot for which to retrieve the owning job
+ * @return Owning job of the specified {@link TaskSlot} or null if there is no slot for
+ * the given allocation id or if the slot has no owning job assigned
+ */
+ @Nullable
+ public JobID getOwningJob(AllocationID allocationId) {
+ final TaskSlot taskSlot = getTaskSlot(allocationId);
+
+ if (taskSlot != null) {
+ return taskSlot.getJobId();
+ } else {
+ return null;
+ }
+ }
+
// ---------------------------------------------------------------------
// Task methods
// ---------------------------------------------------------------------
@@ -538,6 +572,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
// Internal methods
// ---------------------------------------------------------------------
+ @Nullable
private TaskSlot getTaskSlot(AllocationID allocationId) {
Preconditions.checkNotNull(allocationId);
http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 0c3adae..efd27f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -61,12 +61,14 @@ import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
@@ -89,6 +91,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.category.Flip6;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
@@ -106,7 +109,6 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
-import java.io.File;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collection;
@@ -122,6 +124,7 @@ import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -143,7 +146,6 @@ import static org.mockito.Mockito.when;
public class TaskExecutorTest extends TestLogger {
private final Time timeout = Time.milliseconds(10000L);
- private final File tempDir = new File(System.getProperty("java.io.tmpdir"));
private TimerService<AllocationID> timerService;
@@ -1034,7 +1036,6 @@ public class TaskExecutorTest extends TestLogger {
mock(NetworkEnvironment.class),
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
-
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
@@ -1544,6 +1545,120 @@ public class TaskExecutorTest extends TestLogger {
}
/**
+ * Tests that a job is removed from the JobLeaderService once a TaskExecutor has
+ * no more slots assigned to this job.
+ *
+ * <p>See FLINK-8504
+ */
+ @Test
+ public void testRemoveJobFromJobLeaderService() throws Exception {
+ final Configuration configuration = new Configuration();
+ final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+ final LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
+ final JobLeaderService jobLeaderService = new JobLeaderService(localTaskManagerLocation);
+ final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+ final TaskSlotTable taskSlotTable = new TaskSlotTable(
+ Collections.singleton(ResourceProfile.UNKNOWN),
+ timerService);
+
+ final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+
+ final TestingLeaderRetrievalService resourceManagerLeaderRetriever = new TestingLeaderRetrievalService();
+ haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
+
+ final TaskExecutor taskExecutor = new TaskExecutor(
+ rpc,
+ taskManagerConfiguration,
+ localTaskManagerLocation,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ new TaskExecutorLocalStateStoresManager(),
+ mock(NetworkEnvironment.class),
+ haServices,
+ new HeartbeatServices(1000L, 1000L),
+ UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+ new BroadcastVariableManager(),
+ mock(FileCache.class),
+ taskSlotTable,
+ new JobManagerTable(),
+ jobLeaderService,
+ testingFatalErrorHandler);
+
+ try {
+ final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ final ResourceManagerId resourceManagerId = resourceManagerGateway.getFencingToken();
+
+ rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
+ resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerId.toUUID());
+
+ final JobID jobId = new JobID();
+
+ final CompletableFuture<LeaderRetrievalListener> startFuture = new CompletableFuture<>();
+ final CompletableFuture<Void> stopFuture = new CompletableFuture<>();
+
+ final StartStopNotifyingLeaderRetrievalService jobMasterLeaderRetriever = new StartStopNotifyingLeaderRetrievalService(
+ startFuture,
+ stopFuture);
+ haServices.setJobMasterLeaderRetriever(jobId, jobMasterLeaderRetriever);
+
+ taskExecutor.start();
+
+ final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+ final SlotID slotId = new SlotID(localTaskManagerLocation.getResourceID(), 0);
+ final AllocationID allocationId = new AllocationID();
+
+ assertThat(startFuture.isDone(), is(false));
+ assertThat(jobLeaderService.containsJob(jobId), is(false));
+
+ taskExecutorGateway.requestSlot(
+ slotId,
+ jobId,
+ allocationId,
+ "foobar",
+ resourceManagerId,
+ timeout).get();
+
+ // wait until the job leader retrieval service for jobId is started
+ startFuture.get();
+ assertThat(jobLeaderService.containsJob(jobId), is(true));
+
+ taskExecutorGateway.freeSlot(allocationId, new FlinkException("Test exception"), timeout).get();
+
+ // wait that the job leader retrieval service for jobId stopped becaue it should get removed
+ stopFuture.get();
+ assertThat(jobLeaderService.containsJob(jobId), is(false));
+
+ testingFatalErrorHandler.rethrowError();
+ } finally {
+ RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+ }
+ }
+
+ private static final class StartStopNotifyingLeaderRetrievalService implements LeaderRetrievalService {
+ private final CompletableFuture<LeaderRetrievalListener> startFuture;
+
+ private final CompletableFuture<Void> stopFuture;
+
+ private StartStopNotifyingLeaderRetrievalService(
+ CompletableFuture<LeaderRetrievalListener> startFuture,
+ CompletableFuture<Void> stopFuture) {
+ this.startFuture = startFuture;
+ this.stopFuture = stopFuture;
+ }
+
+ @Override
+ public void start(LeaderRetrievalListener listener) throws Exception {
+ startFuture.complete(listener);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ stopFuture.complete(null);
+ }
+ }
+
+ /**
* Special {@link HeartbeatServices} which creates a {@link RecordingHeartbeatManagerImpl}.
*/
private static final class RecordingHeartbeatServices extends HeartbeatServices {
[2/2] flink git commit: [hotfix] Rearrange TaskExecutor imports
Posted by tr...@apache.org.
[hotfix] Rearrange TaskExecutor imports
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23ff1203
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23ff1203
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23ff1203
Branch: refs/heads/master
Commit: 23ff1203a0583d2859b3f0f92f4cb0c604b83baa
Parents: 0e20b61
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jan 25 12:01:09 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jan 30 18:30:17 2018 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/23ff1203/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 30cf377..ad7414c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -66,10 +66,10 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
-import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;