You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/22 21:53:20 UTC
[1/3] flink git commit: [FLINK-8463] [rest] Remove blocking of IO
executor in RestClient#submitRequest
Repository: flink
Updated Branches:
refs/heads/master 517b3f872 -> 776af4a88
[FLINK-8463] [rest] Remove blocking of IO executor in RestClient#submitRequest
Instead of waiting on the ChannelFuture we register a ChannelFutureListener which
is notified when the channel has been established. This unblocks IO executor threads
in the RestClient.
This closes #5319.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01611802
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01611802
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01611802
Branch: refs/heads/master
Commit: 016118026ca96abb691b236ca7d08db94c93684a
Parents: 517b3f8
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jan 19 18:27:22 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 22 16:14:35 2018 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/rest/RestClient.java | 41 ++++++++++++--------
1 file changed, 24 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/01611802/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 71891de..5af50b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
@@ -42,6 +41,7 @@ import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
@@ -181,24 +181,31 @@ public class RestClient {
}
private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, Class<P> responseClass) {
- return CompletableFuture.supplyAsync(() -> bootstrap.connect(targetAddress, targetPort), executor)
- .thenApply((channel) -> {
- try {
- return channel.sync();
- } catch (InterruptedException e) {
- throw new FlinkRuntimeException(e);
+ final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort);
+
+ final CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
+
+ connectFuture.addListener(
+ (ChannelFuture future) -> {
+ if (future.isSuccess()) {
+ channelFuture.complete(future.channel());
+ } else {
+ channelFuture.completeExceptionally(future.cause());
}
- })
- .thenApply((ChannelFuture::channel))
- .thenCompose(channel -> {
- ClientHandler handler = channel.pipeline().get(ClientHandler.class);
- CompletableFuture<JsonResponse> future = handler.getJsonFuture();
- channel.writeAndFlush(httpRequest);
- return future;
- }).thenComposeAsync(
+ });
+
+ return channelFuture
+ .thenComposeAsync(
+ channel -> {
+ ClientHandler handler = channel.pipeline().get(ClientHandler.class);
+ CompletableFuture<JsonResponse> future = handler.getJsonFuture();
+ channel.writeAndFlush(httpRequest);
+ return future;
+ },
+ executor)
+ .thenComposeAsync(
(JsonResponse rawResponse) -> parseResponse(rawResponse, responseClass),
- executor
- );
+ executor);
}
private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonResponse rawResponse, Class<P> responseClass) {
[3/3] flink git commit: [FLINK-8462] [flip6] Filter invalid heartbeat
timeouts in TaskExecutor
Posted by tr...@apache.org.
[FLINK-8462] [flip6] Filter invalid heartbeat timeouts in TaskExecutor
This commit properly stops the heartbeating of disconnected RMs and additionally
ignores outdated heartbeat timeouts for old RM connections out.
This closes #5318.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/776af4a8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/776af4a8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/776af4a8
Branch: refs/heads/master
Commit: 776af4a882c85926fc0764b702fec717c675e34c
Parents: 9a0399c
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jan 19 14:56:42 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 22 18:49:16 2018 +0100
----------------------------------------------------------------------
.../runtime/taskexecutor/TaskExecutor.java | 36 ++--
.../utils/TestingResourceManagerGateway.java | 30 +++-
.../runtime/taskexecutor/TaskExecutorTest.java | 165 +++++++++++++++++++
3 files changed, 209 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/776af4a8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index e88cf58..0f98c49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -741,15 +741,14 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
}
// drop the current connection or connection attempt
- if (resourceManagerConnection != null) {
- resourceManagerConnection.close();
- resourceManagerConnection = null;
- }
+ closeResourceManagerConnection(
+ new FlinkException("New ResourceManager leader found under: " + newLeaderAddress +
+ '(' + newResourceManagerId + ')'));
}
// establish a connection to the new leader
if (newLeaderAddress != null) {
- log.info("Attempting to register at ResourceManager {}", newLeaderAddress);
+ log.info("Attempting to register at ResourceManager {} ({})", newLeaderAddress, newResourceManagerId);
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
@@ -784,13 +783,17 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
}
private void closeResourceManagerConnection(Exception cause) {
- if (isConnectedToResourceManager()) {
- log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerId(), cause);
+ if (resourceManagerConnection != null) {
- resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
+ if (resourceManagerConnection.isConnected()) {
+ log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerId(), cause);
+ resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
- ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
- resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+ ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+ resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+ } else {
+ log.info("Terminating registration attempts towards ResourceManager {}.", resourceManagerConnection.getTargetAddress(), cause);
+ }
resourceManagerConnection.close();
resourceManagerConnection = null;
@@ -1361,11 +1364,16 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceId) {
runAsync(() -> {
- log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
+ // first check whether the timeout is still valid
+ if (resourceManagerConnection != null && resourceManagerConnection.getResourceManagerId().equals(resourceId)) {
+ log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
- closeResourceManagerConnection(
- new TimeoutException(
- "The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+ closeResourceManagerConnection(
+ new TimeoutException(
+ "The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+ } else {
+ log.debug("Received heartbeat timeout for outdated ResourceManager id {}. Ignoring the timeout.", resourceId);
+ }
});
}
http://git-wip-us.apache.org/repos/asf/flink/blob/776af4a8/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index dc1635a..1428e02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -47,6 +48,7 @@ import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Function;
/**
* Implementation of the {@link ResourceManagerGateway} for testing purposes solely.
@@ -55,7 +57,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
private final ResourceManagerId resourceManagerId;
- private final ResourceID resourceId;
+ private final ResourceID ownResourceId;
private final long heartbeatInterval;
@@ -73,6 +75,8 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
private volatile Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer;
+ private volatile Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> registerTaskExecutorFunction;
+
public TestingResourceManagerGateway() {
this(
ResourceManagerId.generate(),
@@ -89,7 +93,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
String address,
String hostname) {
this.resourceManagerId = Preconditions.checkNotNull(resourceManagerId);
- this.resourceId = Preconditions.checkNotNull(resourceId);
+ this.ownResourceId = Preconditions.checkNotNull(resourceId);
this.heartbeatInterval = heartbeatInterval;
this.address = Preconditions.checkNotNull(address);
this.hostname = Preconditions.checkNotNull(hostname);
@@ -118,6 +122,10 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
}
+ public void setRegisterTaskExecutorFunction(Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> registerTaskExecutorFunction) {
+ this.registerTaskExecutorFunction = registerTaskExecutorFunction;
+ }
+
@Override
public CompletableFuture<RegistrationResponse> registerJobManager(JobMasterId jobMasterId, ResourceID jobMasterResourceId, String jobMasterAddress, JobID jobId, Time timeout) {
final Consumer<Tuple4<JobMasterId, ResourceID, String, JobID>> currentConsumer = registerJobManagerConsumer;
@@ -130,7 +138,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
new JobMasterRegistrationSuccess(
heartbeatInterval,
resourceManagerId,
- resourceId));
+ ownResourceId));
}
@Override
@@ -161,11 +169,17 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
@Override
public CompletableFuture<RegistrationResponse> registerTaskExecutor(String taskExecutorAddress, ResourceID resourceId, SlotReport slotReport, int dataPort, HardwareDescription hardwareDescription, Time timeout) {
- return CompletableFuture.completedFuture(
- new TaskExecutorRegistrationSuccess(
- new InstanceID(),
- resourceId,
- heartbeatInterval));
+ final Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> currentFunction = registerTaskExecutorFunction;
+
+ if (currentFunction != null) {
+ return currentFunction.apply(Tuple5.of(taskExecutorAddress, resourceId, slotReport, dataPort, hardwareDescription));
+ } else {
+ return CompletableFuture.completedFuture(
+ new TaskExecutorRegistrationSuccess(
+ new InstanceID(),
+ ownResourceId,
+ heartbeatInterval));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/776af4a8/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 2f0cbd6..0f08512 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -70,7 +70,9 @@ import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
@@ -78,10 +80,12 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.category.Flip6;
import org.apache.flink.util.ExceptionUtils;
@@ -108,12 +112,16 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -137,11 +145,14 @@ public class TaskExecutorTest extends TestLogger {
private final Time timeout = Time.milliseconds(10000L);
private final File tempDir = new File(System.getProperty("java.io.tmpdir"));
+ private TimerService<AllocationID> timerService;
+
private TestingRpcService rpc;
@Before
public void setup() {
rpc = new TestingRpcService();
+ timerService = new TimerService<>(TestingUtils.defaultExecutor(), timeout.toMilliseconds());
}
@After
@@ -150,6 +161,11 @@ public class TaskExecutorTest extends TestLogger {
rpc.stopService();
rpc = null;
}
+
+ if (timerService != null) {
+ timerService.stop();
+ timerService = null;
+ }
}
@Rule
@@ -1449,4 +1465,153 @@ public class TaskExecutorTest extends TestLogger {
taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
}
+
+ /**
+ * Tests that the heartbeat is stopped once the TaskExecutor detects that the RM is no longer leader.
+ *
+ * <p>See FLINK-8462
+ */
+ @Test
+ public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception {
+ final long heartbeatInterval = 1L;
+ final long heartbeatTimeout = 10000L;
+ final long pollTimeout = 1000L;
+ final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(new Configuration());
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+ final RecordingHeartbeatServices heartbeatServices = new RecordingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
+ final ResourceID rmResourceID = ResourceID.generate();
+
+ final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+
+ final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+ final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
+ haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+
+ final String rmAddress = "rm";
+ final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(
+ ResourceManagerId.generate(),
+ rmResourceID,
+ heartbeatInterval,
+ rmAddress,
+ rmAddress);
+
+ rpc.registerGateway(rmAddress, rmGateway);
+
+ final TaskExecutor taskExecutor = new TaskExecutor(
+ rpc,
+ taskManagerConfiguration,
+ taskManagerLocation,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ new TaskExecutorLocalStateStoresManager(),
+ mock(NetworkEnvironment.class),
+ haServices,
+ heartbeatServices,
+ mock(TaskManagerMetricGroup.class),
+ mock(BroadcastVariableManager.class),
+ mock(FileCache.class),
+ taskSlotTable,
+ mock(JobManagerTable.class),
+ mock(JobLeaderService.class),
+ testingFatalErrorHandler);
+
+ try {
+ taskExecutor.start();
+
+ final BlockingQueue<ResourceID> unmonitoredTargets = heartbeatServices.getUnmonitoredTargets();
+ final BlockingQueue<ResourceID> monitoredTargets = heartbeatServices.getMonitoredTargets();
+
+ rmLeaderRetrievalService.notifyListener(rmAddress, rmGateway.getFencingToken().toUUID());
+
+ // wait for TM registration by checking the registered heartbeat targets
+ assertThat(
+ monitoredTargets.poll(pollTimeout, TimeUnit.MILLISECONDS),
+ equalTo(rmResourceID));
+
+ // let RM lose leadership
+ rmLeaderRetrievalService.notifyListener(null, null);
+
+ // the timeout should not have triggered since it is much higher
+ assertThat(unmonitoredTargets.poll(pollTimeout, TimeUnit.MILLISECONDS), equalTo(rmResourceID));
+
+ testingFatalErrorHandler.rethrowError();
+ } finally {
+ RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+ }
+ }
+
+ /**
+ * Special {@link HeartbeatServices} which creates a {@link RecordingHeartbeatManagerImpl}.
+ */
+ private static final class RecordingHeartbeatServices extends HeartbeatServices {
+
+ private final BlockingQueue<ResourceID> unmonitoredTargets;
+
+ private final BlockingQueue<ResourceID> monitoredTargets;
+
+ public RecordingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
+ super(heartbeatInterval, heartbeatTimeout);
+
+ this.unmonitoredTargets = new ArrayBlockingQueue<>(1);
+ this.monitoredTargets = new ArrayBlockingQueue<>(1);
+ }
+
+ @Override
+ public <I, O> HeartbeatManager<I, O> createHeartbeatManager(ResourceID resourceId, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor scheduledExecutor, Logger log) {
+ return new RecordingHeartbeatManagerImpl<>(
+ heartbeatTimeout,
+ resourceId,
+ heartbeatListener,
+ scheduledExecutor,
+ scheduledExecutor,
+ log,
+ unmonitoredTargets,
+ monitoredTargets);
+ }
+
+ public BlockingQueue<ResourceID> getUnmonitoredTargets() {
+ return unmonitoredTargets;
+ }
+
+ public BlockingQueue<ResourceID> getMonitoredTargets() {
+ return monitoredTargets;
+ }
+ }
+
+ /**
+ * {@link HeartbeatManagerImpl} which records the unmonitored targets.
+ */
+ private static final class RecordingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O> {
+
+ private final BlockingQueue<ResourceID> unmonitoredTargets;
+
+ private final BlockingQueue<ResourceID> monitoredTargets;
+
+ public RecordingHeartbeatManagerImpl(
+ long heartbeatTimeoutIntervalMs,
+ ResourceID ownResourceID,
+ HeartbeatListener<I, O> heartbeatListener,
+ Executor executor,
+ ScheduledExecutor scheduledExecutor,
+ Logger log,
+ BlockingQueue<ResourceID> unmonitoredTargets,
+ BlockingQueue<ResourceID> monitoredTargets) {
+ super(heartbeatTimeoutIntervalMs, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
+ this.unmonitoredTargets = unmonitoredTargets;
+ this.monitoredTargets = monitoredTargets;
+ }
+
+ @Override
+ public void unmonitorTarget(ResourceID resourceID) {
+ super.unmonitorTarget(resourceID);
+ unmonitoredTargets.offer(resourceID);
+ }
+
+ @Override
+ public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
+ super.monitorTarget(resourceID, heartbeatTarget);
+ monitoredTargets.offer(resourceID);
+ }
+ }
}
[2/3] flink git commit: [hotfix] Add not null check to message
headers in AbstractRestHandler
Posted by tr...@apache.org.
[hotfix] Add not null check to message headers in AbstractRestHandler
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a0399c8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a0399c8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a0399c8
Branch: refs/heads/master
Commit: 9a0399c888ae75e96c7719b8f079956462023348
Parents: 0161180
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jan 22 16:15:38 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 22 18:49:15 2018 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/rest/handler/AbstractRestHandler.java | 3 ++-
.../rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java | 3 ++-
.../job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java | 3 ++-
.../handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java | 3 ++-
4 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9a0399c8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index abde346..408c40a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException;
@@ -74,7 +75,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
Map<String, String> responseHeaders,
MessageHeaders<R, P, M> messageHeaders) {
super(localRestAddress, leaderRetriever, timeout, responseHeaders);
- this.messageHeaders = messageHeaders;
+ this.messageHeaders = Preconditions.checkNotNull(messageHeaders);
}
public MessageHeaders<R, P, M> getMessageHeaders() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9a0399c8/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 895bdac..997601f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.SubtaskMessageParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
@@ -126,7 +127,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
() -> null,
Time.milliseconds(100),
restHandlerConfiguration.getResponseHeaders(),
- null,
+ SubtaskCurrentAttemptDetailsHeaders.getInstance(),
new ExecutionGraphCache(
restHandlerConfiguration.getTimeout(),
Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),
http://git-wip-us.apache.org/repos/asf/flink/blob/9a0399c8/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index abc8eec..5f03c55 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsInfo;
import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -62,7 +63,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger {
() -> null,
Time.milliseconds(100L),
restHandlerConfiguration.getResponseHeaders(),
- null,
+ SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(),
new ExecutionGraphCache(
restHandlerConfiguration.getTimeout(),
Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),
http://git-wip-us.apache.org/repos/asf/flink/blob/9a0399c8/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 9dbc1de..d55ab77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -129,7 +130,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
() -> null,
Time.milliseconds(100L),
restHandlerConfiguration.getResponseHeaders(),
- null,
+ SubtaskExecutionAttemptDetailsHeaders.getInstance(),
new ExecutionGraphCache(
restHandlerConfiguration.getTimeout(),
Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),