You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by wangzhijiang999 <gi...@git.apache.org> on 2017/01/18 11:19:06 UTC

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

GitHub user wangzhijiang999 opened a pull request:

    https://github.com/apache/flink/pull/3151

    [FLINK-4364][runtime]mplement TaskManager side of heartbeat from JobM\u2026

    1. **TaskManagerRunner** creates **HeartbeatManagerImpl** components when constructs the **TaskManager**.
    2. **TaskManager** begins to monitor the JobManager when registration at new job leader successful.
    3. Currently the payload will be null for interaction between **TaskManager** and **JobManager**, it can be expanded if needed later. Also the logic of heartbeat timeout is not implemented yet, and maybe realized in failover detection issue.
    4. The **JobManager** side implementation will be realized in another jira.
    
    @tillrohrmann  for review please!

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wangzhijiang999/flink FLINK-4364

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3151.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3151
    
----
commit 362f717b0ec50d8cc864d2f2efd432a5efbfd24a
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Date:   2017-01-18T11:08:19Z

    [FLINK-4364][runtime]mplement TaskManager side of heartbeat from JobManager

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3151: [FLINK-4364] [runtime] [FLIP-6] Implement TaskManager sid...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3151
  
    Hi @zhijiangW, just wanted to check on you what the state of this PR is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102945400
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---
    @@ -105,6 +110,105 @@
     	@Rule
     	public TestName name = new TestName();
     
    +	@Test
    +	public void testHeartbeatTimeoutWithJobManager() throws Exception {
    +		final JobID jobId = new JobID();
    +		final Configuration configuration = new Configuration();
    +		final TaskManagerConfiguration tmConfig = TaskManagerConfiguration.fromConfiguration(configuration);
    +		final ResourceID tmResourceId = new ResourceID("tm");
    +		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
    +		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), mock(TimerService.class));
    +
    +		final TestingSerialRpcService rpc = new TestingSerialRpcService();
    +		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
    +		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    +		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		haServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService);
    +		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +
    +		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
    +
    +		final long heartbeatTimeout = 1000L;
    +		final HeartbeatManagerImpl<Object, Object> tmHeartbeatManager = new HeartbeatManagerImpl<>(
    +				heartbeatTimeout,
    +				tmResourceId,
    +				rpc.getExecutor(),
    +				Executors.newSingleThreadScheduledExecutor(),
    +				log);
    +
    +		final String jobMasterAddress = "jm";
    +		final UUID jmLeaderId = UUID.randomUUID();
    +		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
    +		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
    +		final int blobPort = 42;
    +
    +		when(jobMasterGateway.registerTaskManager(
    +				any(String.class),
    +				eq(taskManagerLocation),
    +				eq(jmLeaderId),
    +				any(Time.class)
    +		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
    +		when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
    +
    +		try {
    +			final TaskExecutor taskManager = new TaskExecutor(
    +					tmConfig,
    +					taskManagerLocation,
    +					rpc,
    +					mock(MemoryManager.class),
    +					mock(IOManager.class),
    +					mock(NetworkEnvironment.class),
    +					haServices,
    +					mock(MetricRegistry.class),
    +					tmHeartbeatManager,
    +					mock(TaskManagerMetricGroup.class),
    +					mock(BroadcastVariableManager.class),
    +					mock(FileCache.class),
    +					taskSlotTable,
    +					new JobManagerTable(),
    +					jobLeaderService,
    +					testingFatalErrorHandler);
    +
    +			taskManager.start();
    +
    +			rpc.registerGateway(jobMasterAddress, jobMasterGateway);
    +
    +			// we have to add the job after the TaskExecutor, because otherwise the service has not
    +			// been properly started.
    +			jobLeaderService.addJob(jobId, jobMasterAddress);
    +
    +			// now inform the task manager about the new job leader
    +			jmLeaderRetrievalService.notifyListener(jobMasterAddress, jmLeaderId);
    +
    +			// register task manager success will trigger monitoring heartbeat target between tm and jm
    +			verify(jobMasterGateway).registerTaskManager(
    +					eq(taskManager.getAddress()), eq(taskManagerLocation), eq(jmLeaderId), any(Time.class));
    +
    +			final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(tmHeartbeatManager, "heartbeatTargets");
    +			final JobManagerTable jobManagerTable = Whitebox.getInternalState(taskManager, "jobManagerTable");
    +			final Map<ResourceID, JobManagerConnection> jobManagerConnections = Whitebox.getInternalState(taskManager, "jobManagerConnections");
    +
    +			// before heartbeat timeout
    +			assertTrue(heartbeatTargets.containsKey(jmResourceId));
    +			assertTrue(jobManagerTable.contains(jobId));
    +			assertTrue(jobManagerConnections.containsKey(jmResourceId));
    +
    +			// the job manager will not schedule heartbeat because of mock and the task manager will be notified heartbeat timeout
    +			Thread.sleep(heartbeatTimeout);
    --- End diff --
    
    `sleep` is not good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3151: [FLINK-4364][runtime] Implement TaskManager side of heart...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/3151
  
    @tillrohrmann , I have submitted the modifications that covered all your suggestions. 
    
    Only one issue I am not confirmed is in order to reuse the **ScheduledExecutor** in **RpcService**, I modified the type of **ScheduledExecutorService** in **HeartbeatManagerImpl**, otherwise we need another wrapper like **ScheduledExecutorServiceAdapter**.
    
    Wish your further advices and thanks in advance!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102945494
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---
    @@ -105,6 +110,105 @@
     	@Rule
     	public TestName name = new TestName();
     
    +	@Test
    +	public void testHeartbeatTimeoutWithJobManager() throws Exception {
    +		final JobID jobId = new JobID();
    +		final Configuration configuration = new Configuration();
    +		final TaskManagerConfiguration tmConfig = TaskManagerConfiguration.fromConfiguration(configuration);
    +		final ResourceID tmResourceId = new ResourceID("tm");
    +		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
    +		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), mock(TimerService.class));
    +
    +		final TestingSerialRpcService rpc = new TestingSerialRpcService();
    +		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
    +		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    +		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		haServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService);
    +		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +
    +		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
    +
    +		final long heartbeatTimeout = 1000L;
    +		final HeartbeatManagerImpl<Object, Object> tmHeartbeatManager = new HeartbeatManagerImpl<>(
    +				heartbeatTimeout,
    +				tmResourceId,
    +				rpc.getExecutor(),
    +				Executors.newSingleThreadScheduledExecutor(),
    +				log);
    +
    +		final String jobMasterAddress = "jm";
    +		final UUID jmLeaderId = UUID.randomUUID();
    +		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
    +		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
    +		final int blobPort = 42;
    +
    +		when(jobMasterGateway.registerTaskManager(
    +				any(String.class),
    +				eq(taskManagerLocation),
    +				eq(jmLeaderId),
    +				any(Time.class)
    +		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
    +		when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
    +
    +		try {
    +			final TaskExecutor taskManager = new TaskExecutor(
    +					tmConfig,
    +					taskManagerLocation,
    +					rpc,
    +					mock(MemoryManager.class),
    +					mock(IOManager.class),
    +					mock(NetworkEnvironment.class),
    +					haServices,
    +					mock(MetricRegistry.class),
    +					tmHeartbeatManager,
    +					mock(TaskManagerMetricGroup.class),
    +					mock(BroadcastVariableManager.class),
    +					mock(FileCache.class),
    +					taskSlotTable,
    +					new JobManagerTable(),
    +					jobLeaderService,
    +					testingFatalErrorHandler);
    +
    +			taskManager.start();
    +
    +			rpc.registerGateway(jobMasterAddress, jobMasterGateway);
    +
    +			// we have to add the job after the TaskExecutor, because otherwise the service has not
    +			// been properly started.
    +			jobLeaderService.addJob(jobId, jobMasterAddress);
    +
    +			// now inform the task manager about the new job leader
    +			jmLeaderRetrievalService.notifyListener(jobMasterAddress, jmLeaderId);
    +
    +			// register task manager success will trigger monitoring heartbeat target between tm and jm
    +			verify(jobMasterGateway).registerTaskManager(
    +					eq(taskManager.getAddress()), eq(taskManagerLocation), eq(jmLeaderId), any(Time.class));
    +
    +			final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(tmHeartbeatManager, "heartbeatTargets");
    +			final JobManagerTable jobManagerTable = Whitebox.getInternalState(taskManager, "jobManagerTable");
    +			final Map<ResourceID, JobManagerConnection> jobManagerConnections = Whitebox.getInternalState(taskManager, "jobManagerConnections");
    +
    +			// before heartbeat timeout
    +			assertTrue(heartbeatTargets.containsKey(jmResourceId));
    +			assertTrue(jobManagerTable.contains(jobId));
    +			assertTrue(jobManagerConnections.containsKey(jmResourceId));
    +
    +			// the job manager will not schedule heartbeat because of mock and the task manager will be notified heartbeat timeout
    +			Thread.sleep(heartbeatTimeout);
    +
    +			// after heartbeat timeout
    +			assertFalse(jobManagerTable.contains(jobId));
    +			assertFalse(jobManagerConnections.containsKey(jmResourceId));
    +			verify(jobMasterGateway).disconnectTaskManager(eq(tmResourceId));
    --- End diff --
    
    Better to introduce the timeout here. That way we wait a given time until the `disconnectTaskManager` should have happened.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r100034459
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -741,6 +763,18 @@ private void establishJobManagerConnection(JobID jobId, JobMasterGateway jobMast
     			jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort()));
     		}
     
    +		heartbeatManager.monitorTarget(registrationSuccess.getResourceID(), new HeartbeatTarget() {
    +			@Override
    +			public void sendHeartbeat(ResourceID resourceID, Object payload) {
    +				jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
    +			}
    +
    +			@Override
    +			public void requestHeartbeat(ResourceID resourceID, Object payload) {
    +				throw new UnsupportedOperationException("Should never call requestHeartbeat in task manager.");
    --- End diff --
    
    `requestHeartbeat` sends a heartbeat to the target and requests an answer. `sendHeartbeat` simply sends a heartbeat to the target. You're right that `HeartbeatManagerImpl` won't call `requestHeartbeat`, but the JM should also be able to respond to a heartbeat request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102763098
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
    +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
    +import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.instance.SlotPool;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
    +import org.apache.flink.runtime.rpc.TestingSerialRpcService;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.runtime.testtasks.NoOpInvokable;
    +import org.apache.flink.runtime.util.ExecutorThreadFactory;
    +import org.apache.flink.runtime.util.Hardware;
    +import org.apache.flink.runtime.util.TestingFatalErrorHandler;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TestName;
    +import org.powermock.reflect.Whitebox;
    +
    +import java.net.InetAddress;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.atLeast;
    +
    +public class JobMasterTest extends TestLogger {
    +
    +	@Rule
    +	public TestName name = new TestName();
    +
    +	@Test
    +	public void testHeartbeatTimeoutWithTaskManager() throws Exception {
    +		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    +		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +		haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
    +		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
    +
    +		final String jobManagerAddress = "jm";
    +		final UUID jmLeaderId = UUID.randomUUID();
    +		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
    +
    +		final String taskManagerAddress = "tm";
    +		final ResourceID tmResourceId = new ResourceID(taskManagerAddress);
    +		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
    +		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
    +
    +		final TestingSerialRpcService rpc = new TestingSerialRpcService();
    +		rpc.registerGateway(taskManagerAddress, taskExecutorGateway);
    +
    +		final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(
    +				Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
    +
    +		final long heartbeatInterval = 10L;
    +		final long heartbeatTimeout = 1000L;
    +		final HeartbeatManagerSenderImpl<Object, Object> jmHeartbeatManager = new HeartbeatManagerSenderImpl<>(
    +				heartbeatInterval,
    +				heartbeatTimeout,
    +				jmResourceId,
    +				executorService,
    +				rpc.getScheduledExecutor(),
    +				log);
    +
    +		final JobVertex jobVertex = new JobVertex("NoOpInvokable");
    +		jobVertex.setInvokableClass(NoOpInvokable.class);
    +		final JobGraph jobGraph = new JobGraph("test", jobVertex);
    +
    +		final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(mock(BlobService.class), 1000000000L);
    +		libraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
    +
    +		final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
    +		final JobManagerMetricGroup jmMetricGroup = new JobManagerMetricGroup(registry, "host");
    +
    +		try {
    +			final JobMaster jobMaster = new JobMaster(
    +					jobGraph,
    +					new Configuration(),
    +					rpc,
    +					haServices,
    +					executorService,
    +					libraryCacheManager,
    +					mock(RestartStrategyFactory.class),
    +					Time.of(10, TimeUnit.SECONDS),
    +					jmMetricGroup,
    +					jmResourceId,
    +					jmHeartbeatManager,
    +					mock(OnCompletionActions.class),
    +					testingFatalErrorHandler,
    +					libraryCacheManager.getClassLoader(jobGraph.getJobID()));
    +
    +			// also start the heartbeat manager in job manager
    +			jobMaster.start(jmLeaderId);
    +
    +			// register task manager will trigger monitoring heartbeat target, schedule heartbeat request in interval time
    +			jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId);
    +
    +			final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(jmHeartbeatManager, "heartbeatTargets");
    +			final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTMsInJM = Whitebox.getInternalState(jobMaster, "registeredTaskManagers");
    +			final SlotPool slotPool = Whitebox.getInternalState(jobMaster, "slotPool");
    +			final HashSet<ResourceID> registeredTMsInSlotPool = Whitebox.getInternalState(slotPool, "registeredTaskManagers");
    +
    +			// before heartbeat timeout
    +			assertTrue(heartbeatTargets.containsKey(tmResourceId));
    +			assertTrue(registeredTMsInJM.containsKey(tmResourceId));
    +			assertTrue(registeredTMsInSlotPool.contains(tmResourceId));
    +
    +			// trigger heartbeat timeout in job manager side, because the task manager will not response the heartbeat
    +			Thread.sleep(heartbeatTimeout);
    --- End diff --
    
    Sleeps in test cases are not good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3151: [FLINK-4364] [runtime] [FLIP-6] Implement TaskManager sid...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/3151
  
    Hi @tillrohrmann , I have submitted the modifications as you suggest one week ago. Have you received it and any other issues?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3151: [FLINK-4364] [runtime] [FLIP-6] Implement TaskManager sid...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/3151
  
    @tillrohrmann , just remind to review my modifications for your free time, because this would block my next pull request of heartbeat between `TaskManager` and `ResourceManager`. Thank you! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102753723
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
    @@ -703,6 +703,18 @@
     	 * Exit JVM on fatal Akka errors
     	 */
     	public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = "akka.jvm-exit-on-fatal-error";
    +
    +	// ----------------------------- Heartbeat Settings -----------------------
    +
    +	/**
    +	 * Timeout for requesting and receiving heartbeat for both sender and receiver sides
    +	 */
    +	public static final String HEARTBEAT_TIMEOUT = "heartbeat.timeout";
    +
    +	/**
    +	 * Time interval for requesting heartbeat from sender side
    +	 */
    +	public static final String HEARTBEAT_INTERVAL = "heartbeat.interval";
    --- End diff --
    
    The community decided to move away from the `ConfigConstants` in favour of `ConfigOptions`. See `HighAvailabilityOptions` to see how they are defined and used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3151: [FLINK-4364][runtime]mplement TaskManager side of heartbe...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on the issue:

    https://github.com/apache/flink/pull/3151
  
    @tillrohrmann , I am preparing the testing code and can submit the updates this week. Thank you for continuous help!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99826924
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) {
     		}
     	}
     
    +	/**
    +	 * The heartbeat listener for JobManager and ResourceManager, they can be distinguished by ResourceID
    +	 * and trigger different processes.
    +	 */
    +	private final class JMRMHeartbeatListener implements HeartbeatListener {
    --- End diff --
    
    Hmm it could make sense to have a version specialized for the JM and RM because you will trigger different operations. And then the TM doesn't have to make the distinction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99564224
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---
    @@ -218,4 +218,12 @@ void failSlot(final ResourceID taskManagerId,
     			final TaskManagerLocation taskManagerLocation,
     			final UUID leaderId,
     			@RpcTimeout final Time timeout);
    +
    +	/**
    +	 * Send the heartbeat to job manager from task manager
    +	 *
    +	 * @param resourceID unique id of the task manager
    +	 * @param payload the payload information from the task manager
    +	 */
    +	void heartbeatFromTaskManager(final ResourceID resourceID, final Object payload);
    --- End diff --
    
    The type of the payload should not be `Object`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99826253
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -741,6 +763,18 @@ private void establishJobManagerConnection(JobID jobId, JobMasterGateway jobMast
     			jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort()));
     		}
     
    +		heartbeatManager.monitorTarget(registrationSuccess.getResourceID(), new HeartbeatTarget() {
    +			@Override
    +			public void sendHeartbeat(ResourceID resourceID, Object payload) {
    +				jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
    +			}
    +
    +			@Override
    +			public void requestHeartbeat(ResourceID resourceID, Object payload) {
    +				throw new UnsupportedOperationException("Should never call requestHeartbeat in task manager.");
    --- End diff --
    
    Sure, but a JM and RM should also be able to send their own heartbeat without a request for an answer to the TM, right? What I'm trying to say is that the TM should not constrain unnecessarily the way you can interact with it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99732610
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) {
     		}
     	}
     
    +	/**
    +	 * The heartbeat listener for JobManager and ResourceManager, they can be distinguished by ResourceID
    +	 * and trigger different processes.
    +	 */
    +	private final class JMRMHeartbeatListener implements HeartbeatListener {
    +
    +		JMRMHeartbeatListener() {
    +		}
    +
    +		@Override
    +		public void notifyHeartbeatTimeout(final ResourceID resourceID) {
    +			log.info("Notify heartbeat timeout for resourceID {}", resourceID);
    --- End diff --
    
    Yes, it actually should trigger some actions with timeout. Currently I did not submit this part because I think it is related with failure detection logic  and supposed to submit in another PR. To make the heartbeat mechanism complete, I will add this part in the following modifications.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r100031031
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -112,6 +114,13 @@ public TaskManagerRunner(
     		// Initialize the TM metrics
     		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
     
    +		HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(
    +				taskManagerConfiguration.getTimeout().toMilliseconds(),
    +				resourceID,
    +				executor,
    +				Executors.newSingleThreadScheduledExecutor(),
    --- End diff --
    
    Do you have further suggestions where to introduce the **ScheduledExecutorService** seems suitable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102925000
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -188,6 +196,8 @@ public JobMaster(
     			RestartStrategyFactory restartStrategyFactory,
     			Time rpcAskTimeout,
     			@Nullable JobManagerMetricGroup jobManagerMetricGroup,
    +			ResourceID resourceID,
    +			HeartbeatManagerImpl heartbeatManager,
    --- End diff --
    
    Raw usage of `HeartbeatManagerImpl` is discouraged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102946505
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
    +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
    +import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.instance.SlotPool;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
    +import org.apache.flink.runtime.rpc.TestingSerialRpcService;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.runtime.testtasks.NoOpInvokable;
    +import org.apache.flink.runtime.util.ExecutorThreadFactory;
    +import org.apache.flink.runtime.util.Hardware;
    +import org.apache.flink.runtime.util.TestingFatalErrorHandler;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TestName;
    +import org.powermock.reflect.Whitebox;
    +
    +import java.net.InetAddress;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.atLeast;
    +
    +public class JobMasterTest extends TestLogger {
    +
    +	@Rule
    +	public TestName name = new TestName();
    +
    +	@Test
    +	public void testHeartbeatTimeoutWithTaskManager() throws Exception {
    +		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    +		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +		haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
    +		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
    +
    +		final String jobManagerAddress = "jm";
    +		final UUID jmLeaderId = UUID.randomUUID();
    +		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
    +
    +		final String taskManagerAddress = "tm";
    +		final ResourceID tmResourceId = new ResourceID(taskManagerAddress);
    +		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
    +		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
    +
    +		final TestingSerialRpcService rpc = new TestingSerialRpcService();
    +		rpc.registerGateway(taskManagerAddress, taskExecutorGateway);
    +
    +		final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(
    +				Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
    +
    +		final long heartbeatInterval = 10L;
    +		final long heartbeatTimeout = 1000L;
    +		final HeartbeatManagerSenderImpl<Object, Object> jmHeartbeatManager = new HeartbeatManagerSenderImpl<>(
    +				heartbeatInterval,
    +				heartbeatTimeout,
    +				jmResourceId,
    +				executorService,
    +				rpc.getScheduledExecutor(),
    +				log);
    +
    +		final JobVertex jobVertex = new JobVertex("NoOpInvokable");
    +		jobVertex.setInvokableClass(NoOpInvokable.class);
    +		final JobGraph jobGraph = new JobGraph("test", jobVertex);
    +
    +		final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(mock(BlobService.class), 1000000000L);
    +		libraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
    +
    +		final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
    +		final JobManagerMetricGroup jmMetricGroup = new JobManagerMetricGroup(registry, "host");
    +
    +		try {
    +			final JobMaster jobMaster = new JobMaster(
    +					jobGraph,
    +					new Configuration(),
    +					rpc,
    +					haServices,
    +					executorService,
    +					libraryCacheManager,
    +					mock(RestartStrategyFactory.class),
    +					Time.of(10, TimeUnit.SECONDS),
    +					jmMetricGroup,
    +					jmResourceId,
    +					jmHeartbeatManager,
    +					mock(OnCompletionActions.class),
    +					testingFatalErrorHandler,
    +					libraryCacheManager.getClassLoader(jobGraph.getJobID()));
    +
    +			// also start the heartbeat manager in job manager
    +			jobMaster.start(jmLeaderId);
    +
    +			// register task manager will trigger monitoring heartbeat target, schedule heartbeat request in interval time
    +			jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId);
    +
    +			final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(jmHeartbeatManager, "heartbeatTargets");
    +			final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTMsInJM = Whitebox.getInternalState(jobMaster, "registeredTaskManagers");
    +			final SlotPool slotPool = Whitebox.getInternalState(jobMaster, "slotPool");
    +			final HashSet<ResourceID> registeredTMsInSlotPool = Whitebox.getInternalState(slotPool, "registeredTaskManagers");
    +
    +			// before heartbeat timeout
    +			assertTrue(heartbeatTargets.containsKey(tmResourceId));
    +			assertTrue(registeredTMsInJM.containsKey(tmResourceId));
    +			assertTrue(registeredTMsInSlotPool.contains(tmResourceId));
    +
    +			// trigger heartbeat timeout in job manager side, because the task manager will not response the heartbeat
    +			Thread.sleep(heartbeatTimeout);
    +
    +			// after heartbeat timeout
    +			verify(taskExecutorGateway, atLeast(1)).heartbeatFromJobManager(eq(jmResourceId));
    --- End diff --
    
    In order to wait for the timeout being triggered you could extend the `HeartbeatManagerImpl` and introduce a wait mechanism in `unmonitorTarget` method. This should be called by the `JobMaster` to unregister the timed out `TaskExecutor`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99732047
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) {
     		}
     	}
     
    +	/**
    +	 * The heartbeat listener for JobManager and ResourceManager, they can be distinguished by ResourceID
    +	 * and trigger different processes.
    +	 */
    +	private final class JMRMHeartbeatListener implements HeartbeatListener {
    --- End diff --
    
    From TaskExecutor side, it will monitor the JM and RM that initiate the heartbeat, so JMRMHeartbeatListener indicates the heartbeat timeout with JM or RM, the current PR just shows the interaction with JM part. I am supposed to submit the RM part in another PR, maybe submit both sides together will help to understand easily.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102762844
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
    +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
    +import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.instance.SlotPool;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
    +import org.apache.flink.runtime.rpc.TestingSerialRpcService;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.runtime.testtasks.NoOpInvokable;
    +import org.apache.flink.runtime.util.ExecutorThreadFactory;
    +import org.apache.flink.runtime.util.Hardware;
    +import org.apache.flink.runtime.util.TestingFatalErrorHandler;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TestName;
    +import org.powermock.reflect.Whitebox;
    +
    +import java.net.InetAddress;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.atLeast;
    +
    +public class JobMasterTest extends TestLogger {
    +
    +	@Rule
    +	public TestName name = new TestName();
    +
    +	@Test
    +	public void testHeartbeatTimeoutWithTaskManager() throws Exception {
    +		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    +		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +		haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
    +		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
    +
    +		final String jobManagerAddress = "jm";
    +		final UUID jmLeaderId = UUID.randomUUID();
    +		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
    +
    +		final String taskManagerAddress = "tm";
    +		final ResourceID tmResourceId = new ResourceID(taskManagerAddress);
    +		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
    +		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
    +
    +		final TestingSerialRpcService rpc = new TestingSerialRpcService();
    +		rpc.registerGateway(taskManagerAddress, taskExecutorGateway);
    +
    +		final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(
    +				Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
    +
    +		final long heartbeatInterval = 10L;
    +		final long heartbeatTimeout = 1000L;
    +		final HeartbeatManagerSenderImpl<Object, Object> jmHeartbeatManager = new HeartbeatManagerSenderImpl<>(
    +				heartbeatInterval,
    +				heartbeatTimeout,
    +				jmResourceId,
    +				executorService,
    +				rpc.getScheduledExecutor(),
    +				log);
    +
    +		final JobVertex jobVertex = new JobVertex("NoOpInvokable");
    +		jobVertex.setInvokableClass(NoOpInvokable.class);
    +		final JobGraph jobGraph = new JobGraph("test", jobVertex);
    +
    +		final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(mock(BlobService.class), 1000000000L);
    +		libraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
    +
    +		final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
    +		final JobManagerMetricGroup jmMetricGroup = new JobManagerMetricGroup(registry, "host");
    --- End diff --
    
    I think you can also give `null` to the `JobMaster` constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3151: [FLINK-4364][runtime]mplement TaskManager side of heartbe...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3151
  
    Thanks @wangzhijiang999. Ping me once you've updated the PR. I think it would make sense to test that the respective steps are taken if the JM detects that a TM died and vice versa. For that you basically only need to start a JM and a TM and let them connect.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3151: [FLINK-4364][runtime] Implement TaskManager side of heart...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/3151
  
    Hi, @tillrohrmann , thanks for so detail reviews! I got all the points and may submit the modifications tomorrow, then I will ping you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99564431
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -126,6 +129,9 @@
     	/** The metric registry in the task manager */
     	private final MetricRegistry metricRegistry;
     
    +	/** The heartbeat manager for job manager and resource manager in the task manager */
    +	private final HeartbeatManagerImpl heartbeatManager;
    --- End diff --
    
    Raw usage of `HeartbeatManagerImpl`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99564793
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) {
     		}
     	}
     
    +	/**
    +	 * The heartbeat listener for JobManager and ResourceManager, they can be distinguished by ResourceID
    +	 * and trigger different processes.
    +	 */
    +	private final class JMRMHeartbeatListener implements HeartbeatListener {
    +
    +		JMRMHeartbeatListener() {
    +		}
    +
    +		@Override
    +		public void notifyHeartbeatTimeout(final ResourceID resourceID) {
    +			log.info("Notify heartbeat timeout for resourceID {}", resourceID);
    --- End diff --
    
    A heartbeat timeout should trigger some kind of action. E.g. unregistering the JM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3151: [FLINK-4364] [runtime] [FLIP-6] Implement TaskManager sid...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/3151
  
    @tillrohrmann , yeah, I will submit the following PRs soon after this one is merged into master, because there are some common codes base on this PR.  Thank you for merging!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99827091
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) {
     		}
     	}
     
    +	/**
    +	 * The heartbeat listener for JobManager and ResourceManager, they can be distinguished by ResourceID
    +	 * and trigger different processes.
    +	 */
    +	private final class JMRMHeartbeatListener implements HeartbeatListener {
    +
    +		JMRMHeartbeatListener() {
    +		}
    +
    +		@Override
    +		public void notifyHeartbeatTimeout(final ResourceID resourceID) {
    +			log.info("Notify heartbeat timeout for resourceID {}", resourceID);
    --- End diff --
    
    I think it would be enough to create the respective handler method in the `TaskExecutor`. The actual handling can then be implemented in a separate PR. But that way, we can easily test the behaviour by checking whether this method was called in case of a timeout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99825777
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---
    @@ -218,4 +218,12 @@ void failSlot(final ResourceID taskManagerId,
     			final TaskManagerLocation taskManagerLocation,
     			final UUID leaderId,
     			@RpcTimeout final Time timeout);
    +
    +	/**
    +	 * Send the heartbeat to job manager from task manager
    +	 *
    +	 * @param resourceID unique id of the task manager
    +	 * @param payload the payload information from the task manager
    +	 */
    +	void heartbeatFromTaskManager(final ResourceID resourceID, final Object payload);
    --- End diff --
    
    If it's `Void` at the moment, why not leaving this parameter out for the moment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102922310
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
    +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
    +import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.instance.SlotPool;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
    +import org.apache.flink.runtime.rpc.TestingSerialRpcService;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.runtime.testtasks.NoOpInvokable;
    +import org.apache.flink.runtime.util.ExecutorThreadFactory;
    +import org.apache.flink.runtime.util.Hardware;
    +import org.apache.flink.runtime.util.TestingFatalErrorHandler;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TestName;
    +import org.powermock.reflect.Whitebox;
    +
    +import java.net.InetAddress;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.atLeast;
    +
    +public class JobMasterTest extends TestLogger {
    +
    +	@Rule
    +	public TestName name = new TestName();
    +
    +	@Test
    +	public void testHeartbeatTimeoutWithTaskManager() throws Exception {
    +		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    +		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +		haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
    +		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
    +
    +		final String jobManagerAddress = "jm";
    +		final UUID jmLeaderId = UUID.randomUUID();
    +		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
    +
    +		final String taskManagerAddress = "tm";
    +		final ResourceID tmResourceId = new ResourceID(taskManagerAddress);
    +		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
    +		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
    +
    +		final TestingSerialRpcService rpc = new TestingSerialRpcService();
    +		rpc.registerGateway(taskManagerAddress, taskExecutorGateway);
    +
    +		final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(
    +				Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
    +
    +		final long heartbeatInterval = 10L;
    +		final long heartbeatTimeout = 1000L;
    --- End diff --
    
    Can we lower the heartbeat timeout? Then the test case won't take that long.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99825613
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -126,6 +129,9 @@
     	/** The metric registry in the task manager */
     	private final MetricRegistry metricRegistry;
     
    +	/** The heartbeat manager for job manager and resource manager in the task manager */
    +	private final HeartbeatManagerImpl heartbeatManager;
    --- End diff --
    
    No I meant that you did not specify the generic type parameters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102759833
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---
    @@ -97,6 +98,8 @@
     	 */
     	Executor getExecutor();
     
    +	ScheduledExecutorService getScheduledExecutor();
    --- End diff --
    
    If we rebase on #3310, then we don't need this anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102945035
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---
    @@ -105,6 +110,105 @@
     	@Rule
     	public TestName name = new TestName();
     
    +	@Test
    +	public void testHeartbeatTimeoutWithJobManager() throws Exception {
    +		final JobID jobId = new JobID();
    +		final Configuration configuration = new Configuration();
    +		final TaskManagerConfiguration tmConfig = TaskManagerConfiguration.fromConfiguration(configuration);
    +		final ResourceID tmResourceId = new ResourceID("tm");
    +		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
    +		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), mock(TimerService.class));
    +
    +		final TestingSerialRpcService rpc = new TestingSerialRpcService();
    +		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
    +		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    +		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		haServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService);
    +		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +
    +		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
    +
    +		final long heartbeatTimeout = 1000L;
    --- End diff --
    
    I think we should lower the timeout for the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99736386
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -126,6 +129,9 @@
     	/** The metric registry in the task manager */
     	private final MetricRegistry metricRegistry;
     
    +	/** The heartbeat manager for job manager and resource manager in the task manager */
    +	private final HeartbeatManagerImpl heartbeatManager;
    --- End diff --
    
    Yes, the current **HeartbeatManagerImpl** is enough to use, so I did not wrapper it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102924815
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
    +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
    +import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.instance.SlotPool;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
    +import org.apache.flink.runtime.rpc.TestingSerialRpcService;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.runtime.testtasks.NoOpInvokable;
    +import org.apache.flink.runtime.util.ExecutorThreadFactory;
    +import org.apache.flink.runtime.util.Hardware;
    +import org.apache.flink.runtime.util.TestingFatalErrorHandler;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TestName;
    +import org.powermock.reflect.Whitebox;
    +
    +import java.net.InetAddress;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.atLeast;
    +
    +public class JobMasterTest extends TestLogger {
    +
    +	@Rule
    +	public TestName name = new TestName();
    +
    +	@Test
    +	public void testHeartbeatTimeoutWithTaskManager() throws Exception {
    +		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    +		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +		haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
    +		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
    +
    +		final String jobManagerAddress = "jm";
    +		final UUID jmLeaderId = UUID.randomUUID();
    +		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
    +
    +		final String taskManagerAddress = "tm";
    +		final ResourceID tmResourceId = new ResourceID(taskManagerAddress);
    +		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
    +		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
    +
    +		final TestingSerialRpcService rpc = new TestingSerialRpcService();
    +		rpc.registerGateway(taskManagerAddress, taskExecutorGateway);
    +
    +		final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(
    +				Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
    +
    +		final long heartbeatInterval = 10L;
    +		final long heartbeatTimeout = 1000L;
    +		final HeartbeatManagerSenderImpl<Object, Object> jmHeartbeatManager = new HeartbeatManagerSenderImpl<>(
    +				heartbeatInterval,
    +				heartbeatTimeout,
    +				jmResourceId,
    +				executorService,
    +				rpc.getScheduledExecutor(),
    +				log);
    +
    +		final JobVertex jobVertex = new JobVertex("NoOpInvokable");
    +		jobVertex.setInvokableClass(NoOpInvokable.class);
    +		final JobGraph jobGraph = new JobGraph("test", jobVertex);
    +
    +		final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(mock(BlobService.class), 1000000000L);
    +		libraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
    +
    +		final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
    +		final JobManagerMetricGroup jmMetricGroup = new JobManagerMetricGroup(registry, "host");
    +
    +		try {
    +			final JobMaster jobMaster = new JobMaster(
    +					jobGraph,
    +					new Configuration(),
    +					rpc,
    +					haServices,
    +					executorService,
    +					libraryCacheManager,
    +					mock(RestartStrategyFactory.class),
    +					Time.of(10, TimeUnit.SECONDS),
    +					jmMetricGroup,
    +					jmResourceId,
    +					jmHeartbeatManager,
    +					mock(OnCompletionActions.class),
    +					testingFatalErrorHandler,
    +					libraryCacheManager.getClassLoader(jobGraph.getJobID()));
    +
    +			// also start the heartbeat manager in job manager
    +			jobMaster.start(jmLeaderId);
    +
    +			// register task manager will trigger monitoring heartbeat target, schedule heartbeat request in interval time
    +			jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId);
    +
    +			final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(jmHeartbeatManager, "heartbeatTargets");
    +			final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTMsInJM = Whitebox.getInternalState(jobMaster, "registeredTaskManagers");
    +			final SlotPool slotPool = Whitebox.getInternalState(jobMaster, "slotPool");
    +			final HashSet<ResourceID> registeredTMsInSlotPool = Whitebox.getInternalState(slotPool, "registeredTaskManagers");
    +
    +			// before heartbeat timeout
    +			assertTrue(heartbeatTargets.containsKey(tmResourceId));
    +			assertTrue(registeredTMsInJM.containsKey(tmResourceId));
    +			assertTrue(registeredTMsInSlotPool.contains(tmResourceId));
    +
    +			// trigger heartbeat timeout in job manager side, because the task manager will not response the heartbeat
    +			Thread.sleep(heartbeatTimeout);
    +
    +			// after heartbeat timeout
    +			verify(taskExecutorGateway, atLeast(1)).heartbeatFromJobManager(eq(jmResourceId));
    --- End diff --
    
    We could get rid of the sleep by using `verify(taskExecutorGateway, timeout(heartbeatInterval * 5)).heartbeatFromJobManager`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102759091
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -732,7 +765,21 @@ public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throw
     
     					slotPoolGateway.registerTaskManager(taskManagerId);
     					registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
    -					return new JMTMRegistrationSuccess(taskManagerId, libraryCacheManager.getBlobServerPort());
    +
    +					// monitor the task manager as heartbeat target
    +					heartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
    +						@Override
    +						public void sendHeartbeat(ResourceID resourceID, Void payload) {
    +							// the task manager will not request heartbeat, so this method will never be called currently
    +						}
    +
    +						@Override
    +						public void requestHeartbeat(ResourceID resourceID, Void payload) {
    +							taskExecutorGateway.heartbeatFromJobManager(resourceID);
    +						}
    +					});
    +
    +					return new JMTMRegistrationSuccess(JobMaster.this.resourceID, libraryCacheManager.getBlobServerPort());
    --- End diff --
    
    `JobMaster.this.` not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99566630
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -741,6 +763,18 @@ private void establishJobManagerConnection(JobID jobId, JobMasterGateway jobMast
     			jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort()));
     		}
     
    +		heartbeatManager.monitorTarget(registrationSuccess.getResourceID(), new HeartbeatTarget() {
    +			@Override
    +			public void sendHeartbeat(ResourceID resourceID, Object payload) {
    +				jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
    +			}
    +
    +			@Override
    +			public void requestHeartbeat(ResourceID resourceID, Object payload) {
    +				throw new UnsupportedOperationException("Should never call requestHeartbeat in task manager.");
    --- End diff --
    
    A heartbeat request is totally fine. This should not throw an `UnsupportedOperationException` but trigger a heartbeat being sent back to `resourceID`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3151: [FLINK-4364][runtime]mplement TaskManager side of heartbe...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on the issue:

    https://github.com/apache/flink/pull/3151
  
    @tillrohrmann . Thank you for detail review and comments!
    
    This PR just submit the heartbeat logic in TM side, because there is already a jira of JM heartbeat side.
    
    For my implementation, the JM initiates the heartbeat with **HeartbeatManagerSenderImpl** and the TM responses the heartbeat with **HeartbeatManagerImpl**. So the heartbeat process is one-way.
    
    I think it is better to submit the JM heartbeat logic in this PR in order to understand easily. I will modify this PR soon, and for testing there already exists the UT for basic heartbeat logic. Do you mean to add some ITCases?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r100034748
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -112,6 +114,13 @@ public TaskManagerRunner(
     		// Initialize the TM metrics
     		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
     
    +		HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(
    +				taskManagerConfiguration.getTimeout().toMilliseconds(),
    +				resourceID,
    +				executor,
    +				Executors.newSingleThreadScheduledExecutor(),
    --- End diff --
    
    Maybe we could add to the `RpcService` that it provides a `ScheduledExecutorService`. If we see that the load on this executor is too much, then we can introduce a dedicated service. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99733319
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -112,6 +114,13 @@ public TaskManagerRunner(
     		// Initialize the TM metrics
     		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
     
    +		HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(
    +				taskManagerConfiguration.getTimeout().toMilliseconds(),
    +				resourceID,
    +				executor,
    +				Executors.newSingleThreadScheduledExecutor(),
    --- End diff --
    
    I agree with that, and initially I want to reuse the executor in **RPCService** but it needs some modifications. I will introduce it in Runner in order to share among components.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99565299
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -112,6 +114,13 @@ public TaskManagerRunner(
     		// Initialize the TM metrics
     		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
     
    +		HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(
    +				taskManagerConfiguration.getTimeout().toMilliseconds(),
    +				resourceID,
    +				executor,
    +				Executors.newSingleThreadScheduledExecutor(),
    --- End diff --
    
    Maybe we should introduce a `ScheduledExecutorService` which can be shared across components and which is given to the respective `Runner`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102924568
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
    +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
    +import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.instance.SlotPool;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
    +import org.apache.flink.runtime.rpc.TestingSerialRpcService;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.runtime.testtasks.NoOpInvokable;
    +import org.apache.flink.runtime.util.ExecutorThreadFactory;
    +import org.apache.flink.runtime.util.Hardware;
    +import org.apache.flink.runtime.util.TestingFatalErrorHandler;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TestName;
    +import org.powermock.reflect.Whitebox;
    +
    +import java.net.InetAddress;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.atLeast;
    +
    +public class JobMasterTest extends TestLogger {
    +
    +	@Rule
    +	public TestName name = new TestName();
    +
    +	@Test
    +	public void testHeartbeatTimeoutWithTaskManager() throws Exception {
    +		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    +		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +		haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
    +		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
    +
    +		final String jobManagerAddress = "jm";
    +		final UUID jmLeaderId = UUID.randomUUID();
    +		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
    +
    +		final String taskManagerAddress = "tm";
    +		final ResourceID tmResourceId = new ResourceID(taskManagerAddress);
    +		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
    +		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
    +
    +		final TestingSerialRpcService rpc = new TestingSerialRpcService();
    +		rpc.registerGateway(taskManagerAddress, taskExecutorGateway);
    +
    +		final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(
    +				Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
    +
    +		final long heartbeatInterval = 10L;
    +		final long heartbeatTimeout = 1000L;
    +		final HeartbeatManagerSenderImpl<Object, Object> jmHeartbeatManager = new HeartbeatManagerSenderImpl<>(
    +				heartbeatInterval,
    +				heartbeatTimeout,
    +				jmResourceId,
    +				executorService,
    +				rpc.getScheduledExecutor(),
    +				log);
    +
    +		final JobVertex jobVertex = new JobVertex("NoOpInvokable");
    +		jobVertex.setInvokableClass(NoOpInvokable.class);
    +		final JobGraph jobGraph = new JobGraph("test", jobVertex);
    --- End diff --
    
    `JobGraph` can be empty.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102762098
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
    +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
    +import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.instance.SlotPool;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
    +import org.apache.flink.runtime.rpc.TestingSerialRpcService;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.runtime.testtasks.NoOpInvokable;
    +import org.apache.flink.runtime.util.ExecutorThreadFactory;
    +import org.apache.flink.runtime.util.Hardware;
    +import org.apache.flink.runtime.util.TestingFatalErrorHandler;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TestName;
    +import org.powermock.reflect.Whitebox;
    +
    +import java.net.InetAddress;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.atLeast;
    +
    +public class JobMasterTest extends TestLogger {
    +
    +	@Rule
    +	public TestName name = new TestName();
    --- End diff --
    
    What is that for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102760735
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -212,6 +220,38 @@ public void start() throws Exception {
     
     		// start the job leader service
     		jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
    +
    +		// start the heartbeat manager for monitoring job manager and resource manager
    +		heartbeatManager.start(new HeartbeatListener<Void, Void>() {
    +			@Override
    +			public void notifyHeartbeatTimeout(final ResourceID resourceID) {
    +				runAsync(new Runnable() {
    +					@Override
    +					public void run() {
    +						if (jobManagerConnections.containsKey(resourceID)) {
    +							log.info("Notify heartbeat timeout with job manager {}", resourceID);
    +							JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
    +							if (jobManagerConnection != null) {
    +								closeJobManagerConnection(jobManagerConnection.getJobID());
    +							}
    +						}
    +						// TODO check whether the resource id indicates the resource manager based on resource manager connection
    +						// TODO then trigger the action of losing resource manager
    --- End diff --
    
    I assume we'll use a different `HeartbeatManager` for the resource manager heartbeats because we have different payload there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99734627
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -741,6 +763,18 @@ private void establishJobManagerConnection(JobID jobId, JobMasterGateway jobMast
     			jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort()));
     		}
     
    +		heartbeatManager.monitorTarget(registrationSuccess.getResourceID(), new HeartbeatTarget() {
    +			@Override
    +			public void sendHeartbeat(ResourceID resourceID, Object payload) {
    +				jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
    +			}
    +
    +			@Override
    +			public void requestHeartbeat(ResourceID resourceID, Object payload) {
    +				throw new UnsupportedOperationException("Should never call requestHeartbeat in task manager.");
    --- End diff --
    
    My previous understanding is that the heartbeat is only requested from RM and JM to TM, and the TM will only response the heartbeat. Do you mean the heartbeat can both request from both sides? If to do so, the TM also needs to schedule a heartbeat request at interval time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3151: [FLINK-4364][runtime]mplement TaskManager side of heartbe...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on the issue:

    https://github.com/apache/flink/pull/3151
  
    @tillrohrmann , I have submitted the updates that may cover your suggestions.
    
    There are still two issues that I am not confirmed.
    
    First, for heartbeat interval and timeout default values in **ConfigConstants**, they are not invalid currently and you can modify it with your professional experience.
    
    Second, the introduction of **ScheduledExecutorService** in **RPCService**, my initial idea is trying to use the existing scheduler in **RPCService**, but it can not be got from **AkkaRPCService** implementation. Another way is to replace the current **ScheduledExecutorService** parameter with **RPCService** in construction of **HeartbeatManagerSenderImpl**, and the **RPCService** can also schedule the heartbeat request. But the return value of **scheduleRunnable** method in **RPCService** is conflict with that in **HeartbeatManagerSenderImpl**. So I just bring another single thread pool in **RPCService** for use currently. Maybe the number of threads in pool can refer to number of cpu cores.
    
    Maybe there are still something to be polished, and I am willing for further modifications by your comments.
    BTW, the heartbeat interaction between TM and RM will be submitted in another PR after this confirmation because of some common points.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102756167
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -706,7 +739,7 @@ public void failSlot(final ResourceID taskManagerId,
     
     		if (registeredTaskManagers.containsKey(taskManagerId)) {
     			final RegistrationResponse response = new JMTMRegistrationSuccess(
    -					taskManagerId, libraryCacheManager.getBlobServerPort());
    +					this.resourceID, libraryCacheManager.getBlobServerPort());
    --- End diff --
    
    `this.` should be unnecessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99564712
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) {
     		}
     	}
     
    +	/**
    +	 * The heartbeat listener for JobManager and ResourceManager, they can be distinguished by ResourceID
    +	 * and trigger different processes.
    +	 */
    +	private final class JMRMHeartbeatListener implements HeartbeatListener {
    --- End diff --
    
    Why `JMRM` and not `JMTM`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102756036
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -511,7 +543,8 @@ public Acknowledge scheduleOrUpdateConsumers(
     
     	@RpcMethod
     	public void disconnectTaskManager(final ResourceID resourceID) {
    -		throw new UnsupportedOperationException();
    +		registeredTaskManagers.remove(resourceID);
    +		slotPoolGateway.releaseTaskManager(resourceID);
    --- End diff --
    
    I think we should also remove the resourceID (=target) from the `HeartbeatManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102761465
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -775,6 +845,7 @@ private void closeJobManagerConnection(JobID jobId) {
     
     		if (jobManagerConnection != null) {
     			try {
    +				jobManagerConnections.remove(jobManagerConnection.getResourceID());
    --- End diff --
    
    We should also unmonitor the target.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r100030725
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -741,6 +763,18 @@ private void establishJobManagerConnection(JobID jobId, JobMasterGateway jobMast
     			jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort()));
     		}
     
    +		heartbeatManager.monitorTarget(registrationSuccess.getResourceID(), new HeartbeatTarget() {
    +			@Override
    +			public void sendHeartbeat(ResourceID resourceID, Object payload) {
    +				jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
    +			}
    +
    +			@Override
    +			public void requestHeartbeat(ResourceID resourceID, Object payload) {
    +				throw new UnsupportedOperationException("Should never call requestHeartbeat in task manager.");
    --- End diff --
    
    If I understand correctly the TM can request the heartbeat to itself and the JM also can send the heartbeat to itself? 
    If it is true, the **requestHeartbeat** will never be called from current logic in TM side. Or I can just remove the **UnsupportedOperationException** to not limitation and leave this method empty currently?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3151: [FLINK-4364] [runtime] [FLIP-6] Implement TaskManager sid...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3151
  
    Ah perfect. Sorry my bad. Will take a look and try to merge it this week :-) Thanks a lot for your work!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime] Implement TaskManager side o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102924621
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
    +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
    +import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.instance.SlotPool;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
    +import org.apache.flink.runtime.rpc.TestingSerialRpcService;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.runtime.testtasks.NoOpInvokable;
    +import org.apache.flink.runtime.util.ExecutorThreadFactory;
    +import org.apache.flink.runtime.util.Hardware;
    +import org.apache.flink.runtime.util.TestingFatalErrorHandler;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TestName;
    +import org.powermock.reflect.Whitebox;
    +
    +import java.net.InetAddress;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.atLeast;
    +
    +public class JobMasterTest extends TestLogger {
    +
    +	@Rule
    +	public TestName name = new TestName();
    +
    +	@Test
    +	public void testHeartbeatTimeoutWithTaskManager() throws Exception {
    +		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    +		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +		haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
    +		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
    +
    +		final String jobManagerAddress = "jm";
    +		final UUID jmLeaderId = UUID.randomUUID();
    +		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
    +
    +		final String taskManagerAddress = "tm";
    +		final ResourceID tmResourceId = new ResourceID(taskManagerAddress);
    +		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
    +		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
    +
    +		final TestingSerialRpcService rpc = new TestingSerialRpcService();
    +		rpc.registerGateway(taskManagerAddress, taskExecutorGateway);
    +
    +		final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(
    +				Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
    +
    +		final long heartbeatInterval = 10L;
    +		final long heartbeatTimeout = 1000L;
    +		final HeartbeatManagerSenderImpl<Object, Object> jmHeartbeatManager = new HeartbeatManagerSenderImpl<>(
    +				heartbeatInterval,
    +				heartbeatTimeout,
    +				jmResourceId,
    +				executorService,
    +				rpc.getScheduledExecutor(),
    +				log);
    +
    +		final JobVertex jobVertex = new JobVertex("NoOpInvokable");
    +		jobVertex.setInvokableClass(NoOpInvokable.class);
    +		final JobGraph jobGraph = new JobGraph("test", jobVertex);
    +
    +		final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(mock(BlobService.class), 1000000000L);
    --- End diff --
    
    Using the `PowerMockRunner`, we could mock the `BlobLibraryCacheManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3151: [FLINK-4364] [runtime] [FLIP-6] Implement TaskManager sid...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3151
  
    Hi @zhijiangW, I'm currently merging the PR. Please also open the other PRs for the heartbeats between the RM <--> JM and RM <--> TM. This makes it easier for me to quickly merge all of them in one fell swoop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r99731288
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---
    @@ -218,4 +218,12 @@ void failSlot(final ResourceID taskManagerId,
     			final TaskManagerLocation taskManagerLocation,
     			final UUID leaderId,
     			@RpcTimeout final Time timeout);
    +
    +	/**
    +	 * Send the heartbeat to job manager from task manager
    +	 *
    +	 * @param resourceID unique id of the task manager
    +	 * @param payload the payload information from the task manager
    +	 */
    +	void heartbeatFromTaskManager(final ResourceID resourceID, final Object payload);
    --- End diff --
    
    Currently we have not considered which specific payload information should be attached with the heartbeat, so use the object to work around.  It should be changed to specific type if confirmation. Do you think which payload should be attached necessary currently?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364] [runtime] [FLIP-6] Implement TaskMana...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3151


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r100222823
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---
    @@ -112,6 +114,13 @@ public TaskManagerRunner(
     		// Initialize the TM metrics
     		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
     
    +		HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(
    +				taskManagerConfiguration.getTimeout().toMilliseconds(),
    +				resourceID,
    +				executor,
    +				Executors.newSingleThreadScheduledExecutor(),
    --- End diff --
    
    Yes, I totally agree with that and the advantages can avoid bringing extra thread pool and run all the PRC messages in the uniform executor if **RPCService** can handle well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---