You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/22 21:53:20 UTC

[1/3] flink git commit: [FLINK-8463] [rest] Remove blocking of IO executor in RestClient#submitRequest

Repository: flink
Updated Branches:
  refs/heads/master 517b3f872 -> 776af4a88


[FLINK-8463] [rest] Remove blocking of IO executor in RestClient#submitRequest

Instead of waiting on the ChannelFuture we register a ChannelFutureListener which
is notified when the channel has been established. This unblocks IO executor threads
in the RestClient.

This closes #5319.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01611802
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01611802
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01611802

Branch: refs/heads/master
Commit: 016118026ca96abb691b236ca7d08db94c93684a
Parents: 517b3f8
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jan 19 18:27:22 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 22 16:14:35 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/rest/RestClient.java   | 41 ++++++++++++--------
 1 file changed, 24 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/01611802/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 71891de..5af50b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
@@ -42,6 +41,7 @@ import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
@@ -181,24 +181,31 @@ public class RestClient {
 	}
 
 	private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, Class<P> responseClass) {
-		return CompletableFuture.supplyAsync(() -> bootstrap.connect(targetAddress, targetPort), executor)
-			.thenApply((channel) -> {
-				try {
-					return channel.sync();
-				} catch (InterruptedException e) {
-					throw new FlinkRuntimeException(e);
+		final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort);
+
+		final CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
+
+		connectFuture.addListener(
+			(ChannelFuture future) -> {
+				if (future.isSuccess()) {
+					channelFuture.complete(future.channel());
+				} else {
+					channelFuture.completeExceptionally(future.cause());
 				}
-			})
-			.thenApply((ChannelFuture::channel))
-			.thenCompose(channel -> {
-				ClientHandler handler = channel.pipeline().get(ClientHandler.class);
-				CompletableFuture<JsonResponse> future = handler.getJsonFuture();
-				channel.writeAndFlush(httpRequest);
-				return future;
-			}).thenComposeAsync(
+			});
+
+		return channelFuture
+			.thenComposeAsync(
+				channel -> {
+					ClientHandler handler = channel.pipeline().get(ClientHandler.class);
+					CompletableFuture<JsonResponse> future = handler.getJsonFuture();
+					channel.writeAndFlush(httpRequest);
+					return future;
+				},
+				executor)
+			.thenComposeAsync(
 				(JsonResponse rawResponse) -> parseResponse(rawResponse, responseClass),
-				executor
-			);
+				executor);
 	}
 
 	private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonResponse rawResponse, Class<P> responseClass) {


[3/3] flink git commit: [FLINK-8462] [flip6] Filter invalid heartbeat timeouts in TaskExecutor

Posted by tr...@apache.org.
[FLINK-8462] [flip6] Filter invalid heartbeat timeouts in TaskExecutor

This commit properly stops the heartbeating of disconnected RMs and additionally
ignores outdated heartbeat timeouts for old RM connections out.

This closes #5318.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/776af4a8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/776af4a8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/776af4a8

Branch: refs/heads/master
Commit: 776af4a882c85926fc0764b702fec717c675e34c
Parents: 9a0399c
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jan 19 14:56:42 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 22 18:49:16 2018 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      |  36 ++--
 .../utils/TestingResourceManagerGateway.java    |  30 +++-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 165 +++++++++++++++++++
 3 files changed, 209 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/776af4a8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index e88cf58..0f98c49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -741,15 +741,14 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			}
 
 			// drop the current connection or connection attempt
-			if (resourceManagerConnection != null) {
-				resourceManagerConnection.close();
-				resourceManagerConnection = null;
-			}
+			closeResourceManagerConnection(
+				new FlinkException("New ResourceManager leader found under: " + newLeaderAddress +
+					'(' + newResourceManagerId + ')'));
 		}
 
 		// establish a connection to the new leader
 		if (newLeaderAddress != null) {
-			log.info("Attempting to register at ResourceManager {}", newLeaderAddress);
+			log.info("Attempting to register at ResourceManager {} ({})", newLeaderAddress, newResourceManagerId);
 			resourceManagerConnection =
 				new TaskExecutorToResourceManagerConnection(
 					log,
@@ -784,13 +783,17 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 	}
 
 	private void closeResourceManagerConnection(Exception cause) {
-		if (isConnectedToResourceManager()) {
-			log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerId(), cause);
+		if (resourceManagerConnection != null) {
 
-			resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
+			if (resourceManagerConnection.isConnected()) {
+				log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerId(), cause);
+				resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
 
-			ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
-			resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+				ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+				resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+			} else {
+				log.info("Terminating registration attempts towards ResourceManager {}.", resourceManagerConnection.getTargetAddress(), cause);
+			}
 
 			resourceManagerConnection.close();
 			resourceManagerConnection = null;
@@ -1361,11 +1364,16 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		@Override
 		public void notifyHeartbeatTimeout(final ResourceID resourceId) {
 			runAsync(() -> {
-				log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
+				// first check whether the timeout is still valid
+				if (resourceManagerConnection != null && resourceManagerConnection.getResourceManagerId().equals(resourceId)) {
+					log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
 
-				closeResourceManagerConnection(
-					new TimeoutException(
-						"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+					closeResourceManagerConnection(
+						new TimeoutException(
+							"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+				} else {
+					log.debug("Received heartbeat timeout for outdated ResourceManager id {}. Ignoring the timeout.", resourceId);
+				}
 			});
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/776af4a8/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index dc1635a..1428e02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -47,6 +48,7 @@ import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * Implementation of the {@link ResourceManagerGateway} for testing purposes solely.
@@ -55,7 +57,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 
 	private final ResourceManagerId resourceManagerId;
 
-	private final ResourceID resourceId;
+	private final ResourceID ownResourceId;
 
 	private final long heartbeatInterval;
 
@@ -73,6 +75,8 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 
 	private volatile Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer;
 
+	private volatile Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> registerTaskExecutorFunction;
+
 	public TestingResourceManagerGateway() {
 		this(
 			ResourceManagerId.generate(),
@@ -89,7 +93,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 			String address,
 			String hostname) {
 		this.resourceManagerId = Preconditions.checkNotNull(resourceManagerId);
-		this.resourceId = Preconditions.checkNotNull(resourceId);
+		this.ownResourceId = Preconditions.checkNotNull(resourceId);
 		this.heartbeatInterval = heartbeatInterval;
 		this.address = Preconditions.checkNotNull(address);
 		this.hostname = Preconditions.checkNotNull(hostname);
@@ -118,6 +122,10 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 		this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
 	}
 
+	public void setRegisterTaskExecutorFunction(Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> registerTaskExecutorFunction) {
+		this.registerTaskExecutorFunction = registerTaskExecutorFunction;
+	}
+
 	@Override
 	public CompletableFuture<RegistrationResponse> registerJobManager(JobMasterId jobMasterId, ResourceID jobMasterResourceId, String jobMasterAddress, JobID jobId, Time timeout) {
 		final Consumer<Tuple4<JobMasterId, ResourceID, String, JobID>> currentConsumer = registerJobManagerConsumer;
@@ -130,7 +138,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 			new JobMasterRegistrationSuccess(
 				heartbeatInterval,
 				resourceManagerId,
-				resourceId));
+				ownResourceId));
 	}
 
 	@Override
@@ -161,11 +169,17 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 
 	@Override
 	public CompletableFuture<RegistrationResponse> registerTaskExecutor(String taskExecutorAddress, ResourceID resourceId, SlotReport slotReport, int dataPort, HardwareDescription hardwareDescription, Time timeout) {
-		return CompletableFuture.completedFuture(
-			new TaskExecutorRegistrationSuccess(
-				new InstanceID(),
-				resourceId,
-				heartbeatInterval));
+		final Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> currentFunction = registerTaskExecutorFunction;
+
+		if (currentFunction != null) {
+			return currentFunction.apply(Tuple5.of(taskExecutorAddress, resourceId, slotReport, dataPort, hardwareDescription));
+		} else {
+			return CompletableFuture.completedFuture(
+				new TaskExecutorRegistrationSuccess(
+					new InstanceID(),
+					ownResourceId,
+					heartbeatInterval));
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/776af4a8/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 2f0cbd6..0f08512 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -70,7 +70,9 @@ import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
@@ -78,10 +80,12 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.ExceptionUtils;
@@ -108,12 +112,16 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -137,11 +145,14 @@ public class TaskExecutorTest extends TestLogger {
 	private final Time timeout = Time.milliseconds(10000L);
 	private final File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
+	private TimerService<AllocationID> timerService;
+
 	private TestingRpcService rpc;
 
 	@Before
 	public void setup() {
 		rpc = new TestingRpcService();
+		timerService = new TimerService<>(TestingUtils.defaultExecutor(), timeout.toMilliseconds());
 	}
 
 	@After
@@ -150,6 +161,11 @@ public class TaskExecutorTest extends TestLogger {
 			rpc.stopService();
 			rpc = null;
 		}
+
+		if (timerService != null) {
+			timerService.stop();
+			timerService = null;
+		}
 	}
 
 	@Rule
@@ -1449,4 +1465,153 @@ public class TaskExecutorTest extends TestLogger {
 			taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		}
 	}
+
+	/**
+	 * Tests that the heartbeat is stopped once the TaskExecutor detects that the RM is no longer leader.
+	 *
+	 * <p>See FLINK-8462
+	 */
+	@Test
+	public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception {
+		final long heartbeatInterval = 1L;
+		final long heartbeatTimeout = 10000L;
+		final long pollTimeout = 1000L;
+		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(new Configuration());
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+		final RecordingHeartbeatServices heartbeatServices = new RecordingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
+		final ResourceID rmResourceID = ResourceID.generate();
+
+		final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
+		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+
+		final String rmAddress = "rm";
+		final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(
+			ResourceManagerId.generate(),
+			rmResourceID,
+			heartbeatInterval,
+			rmAddress,
+			rmAddress);
+
+		rpc.registerGateway(rmAddress, rmGateway);
+
+		final TaskExecutor taskExecutor = new TaskExecutor(
+			rpc,
+			taskManagerConfiguration,
+			taskManagerLocation,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			new TaskExecutorLocalStateStoresManager(),
+			mock(NetworkEnvironment.class),
+			haServices,
+			heartbeatServices,
+			mock(TaskManagerMetricGroup.class),
+			mock(BroadcastVariableManager.class),
+			mock(FileCache.class),
+			taskSlotTable,
+			mock(JobManagerTable.class),
+			mock(JobLeaderService.class),
+			testingFatalErrorHandler);
+
+		try {
+			taskExecutor.start();
+
+			final BlockingQueue<ResourceID> unmonitoredTargets = heartbeatServices.getUnmonitoredTargets();
+			final BlockingQueue<ResourceID> monitoredTargets = heartbeatServices.getMonitoredTargets();
+
+			rmLeaderRetrievalService.notifyListener(rmAddress, rmGateway.getFencingToken().toUUID());
+
+			// wait for TM registration by checking the registered heartbeat targets
+			assertThat(
+				monitoredTargets.poll(pollTimeout, TimeUnit.MILLISECONDS),
+				equalTo(rmResourceID));
+
+			// let RM lose leadership
+			rmLeaderRetrievalService.notifyListener(null, null);
+
+			// the timeout should not have triggered since it is much higher
+			assertThat(unmonitoredTargets.poll(pollTimeout, TimeUnit.MILLISECONDS), equalTo(rmResourceID));
+
+			testingFatalErrorHandler.rethrowError();
+		} finally {
+			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+		}
+	}
+
+	/**
+	 * Special {@link HeartbeatServices} which creates a {@link RecordingHeartbeatManagerImpl}.
+	 */
+	private static final class RecordingHeartbeatServices extends HeartbeatServices {
+
+		private final BlockingQueue<ResourceID> unmonitoredTargets;
+
+		private final BlockingQueue<ResourceID> monitoredTargets;
+
+		public RecordingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
+			super(heartbeatInterval, heartbeatTimeout);
+
+			this.unmonitoredTargets = new ArrayBlockingQueue<>(1);
+			this.monitoredTargets = new ArrayBlockingQueue<>(1);
+		}
+
+		@Override
+		public <I, O> HeartbeatManager<I, O> createHeartbeatManager(ResourceID resourceId, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor scheduledExecutor, Logger log) {
+			return new RecordingHeartbeatManagerImpl<>(
+				heartbeatTimeout,
+				resourceId,
+				heartbeatListener,
+				scheduledExecutor,
+				scheduledExecutor,
+				log,
+				unmonitoredTargets,
+				monitoredTargets);
+		}
+
+		public BlockingQueue<ResourceID> getUnmonitoredTargets() {
+			return unmonitoredTargets;
+		}
+
+		public BlockingQueue<ResourceID> getMonitoredTargets() {
+			return monitoredTargets;
+		}
+	}
+
+	/**
+	 * {@link HeartbeatManagerImpl} which records the unmonitored targets.
+	 */
+	private static final class RecordingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O> {
+
+		private final BlockingQueue<ResourceID> unmonitoredTargets;
+
+		private final BlockingQueue<ResourceID> monitoredTargets;
+
+		public RecordingHeartbeatManagerImpl(
+				long heartbeatTimeoutIntervalMs,
+				ResourceID ownResourceID,
+				HeartbeatListener<I, O> heartbeatListener,
+				Executor executor,
+				ScheduledExecutor scheduledExecutor,
+				Logger log,
+				BlockingQueue<ResourceID> unmonitoredTargets,
+				BlockingQueue<ResourceID> monitoredTargets) {
+			super(heartbeatTimeoutIntervalMs, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
+			this.unmonitoredTargets = unmonitoredTargets;
+			this.monitoredTargets = monitoredTargets;
+		}
+
+		@Override
+		public void unmonitorTarget(ResourceID resourceID) {
+			super.unmonitorTarget(resourceID);
+			unmonitoredTargets.offer(resourceID);
+		}
+
+		@Override
+		public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
+			super.monitorTarget(resourceID, heartbeatTarget);
+			monitoredTargets.offer(resourceID);
+		}
+	}
 }


[2/3] flink git commit: [hotfix] Add not null check to message headers in AbstractRestHandler

Posted by tr...@apache.org.
[hotfix] Add not null check to message headers in AbstractRestHandler


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a0399c8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a0399c8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a0399c8

Branch: refs/heads/master
Commit: 9a0399c888ae75e96c7719b8f079956462023348
Parents: 0161180
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jan 22 16:15:38 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 22 18:49:15 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/rest/handler/AbstractRestHandler.java    | 3 ++-
 .../rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java | 3 ++-
 .../job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java       | 3 ++-
 .../handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java    | 3 ++-
 4 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9a0399c8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index abde346..408c40a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException;
@@ -74,7 +75,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 			Map<String, String> responseHeaders,
 			MessageHeaders<R, P, M> messageHeaders) {
 		super(localRestAddress, leaderRetriever, timeout, responseHeaders);
-		this.messageHeaders = messageHeaders;
+		this.messageHeaders = Preconditions.checkNotNull(messageHeaders);
 	}
 
 	public MessageHeaders<R, P, M> getMessageHeaders() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9a0399c8/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 895bdac..997601f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.SubtaskMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
@@ -126,7 +127,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
 			() -> null,
 			Time.milliseconds(100),
 			restHandlerConfiguration.getResponseHeaders(),
-			null,
+			SubtaskCurrentAttemptDetailsHeaders.getInstance(),
 			new ExecutionGraphCache(
 				restHandlerConfiguration.getTimeout(),
 				Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),

http://git-wip-us.apache.org/repos/asf/flink/blob/9a0399c8/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index abc8eec..5f03c55 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsInfo;
 import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -62,7 +63,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger {
 			() -> null,
 			Time.milliseconds(100L),
 			restHandlerConfiguration.getResponseHeaders(),
-			null,
+			SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(),
 			new ExecutionGraphCache(
 				restHandlerConfiguration.getTimeout(),
 				Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),

http://git-wip-us.apache.org/repos/asf/flink/blob/9a0399c8/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 9dbc1de..d55ab77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
 import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -129,7 +130,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
 			() -> null,
 			Time.milliseconds(100L),
 			restHandlerConfiguration.getResponseHeaders(),
-			null,
+			SubtaskExecutionAttemptDetailsHeaders.getInstance(),
 			new ExecutionGraphCache(
 				restHandlerConfiguration.getTimeout(),
 				Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),