You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zhangminglei <gi...@git.apache.org> on 2017/07/08 13:23:55 UTC

[GitHub] flink pull request #4289: [FLINK-7092] [mesos] Shutdown ResourceManager comp...

GitHub user zhangminglei opened a pull request:

    https://github.com/apache/flink/pull/4289

    [FLINK-7092] [mesos] Shutdown ResourceManager components properly (FL…

    The MesosResourceManager starts internally a TaskMonitor, LaunchCoordinator, ConnectionMonitor and a ReconciliationCoordinator. These components have to be properly shut down when the MesosResourceManager closes.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zhangminglei/flink flink-7092-Shutdown_ResourceManager_components

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4289.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4289
    
----
commit 9084679e074d2175780f68a32d0103d13d0c2e5c
Author: zhangminglei <zm...@163.com>
Date:   2017-07-08T13:22:10Z

    [FLINK-7092] [mesos] Shutdown ResourceManager components properly (FLIP-6)

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4289: [FLINK-7092] [mesos] Shutdown ResourceManager comp...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4289#discussion_r129522676
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---
    @@ -312,6 +312,48 @@ private void recoverWorkers() throws Exception {
     	}
     
     	@Override
    +	public void shutDown() throws Exception {
    +		// shut down all components
    +		if (taskMonitor != null) {
    +			try {
    +				actorSystem.stop(taskMonitor);
    +			} catch (Throwable tt) {
    +				LOG.error("Failed to stop taskMonitor", tt);
    --- End diff --
    
    We should rather add the caught exceptions as suppressed exceptions to a global exception. See `shutDownApplication` for details.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4289: [FLINK-7092] [mesos] Shutdown ResourceManager comp...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4289#discussion_r129522998
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---
    @@ -312,6 +312,48 @@ private void recoverWorkers() throws Exception {
     	}
     
     	@Override
    +	public void shutDown() throws Exception {
    +		// shut down all components
    +		if (taskMonitor != null) {
    +			try {
    +				actorSystem.stop(taskMonitor);
    --- End diff --
    
    We should try to gracefully stop the actors. I propose to use `Patterns.gracefulStop`, then collecting for all actors the returned futures, combining them and then wait on their completion with a timeout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4289: [FLINK-7092] [mesos] Shutdown ResourceManager comp...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4289#discussion_r130078259
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---
    @@ -312,6 +318,114 @@ private void recoverWorkers() throws Exception {
     	}
     
     	@Override
    +	public void shutDown() throws Exception {
    +		// shut down all components
    +		Future<Boolean> stopTaskMonitorFuture = null;
    +		Future<Boolean> stopConnectionMonitorFuture = null;
    +		Future<Boolean> stopLaunchCoordinatorFuture = null;
    +		Future<Boolean> stopReconciliationCoordinatorFuture = null;
    +
    +		FiniteDuration stopTimeout = null;
    +
    +		Exception exception = null;
    +
    +		if (taskMonitor != null) {
    +			stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
    +			stopTaskMonitorFuture = Patterns.gracefulStop(taskMonitor, stopTimeout);
    +		}
    +
    +		if (stopTaskMonitorFuture != null) {
    +			boolean stopped = false;
    +
    +			try {
    +				stopped = Await.result(stopTaskMonitorFuture, stopTimeout);
    +			} catch (Exception ex) {
    +				exception = new Exception("TaskMonitor actor did not properly stop.", ex);
    +			}
    +
    +			if (!stopped) {
    +				// the taskMonitor actor did not stop in time, let's kill him
    +				taskMonitor.tell(Kill.getInstance(), ActorRef.noSender());
    +			}
    +		}
    +
    +		if (connectionMonitor != null) {
    +			stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
    +			stopConnectionMonitorFuture = Patterns.gracefulStop(connectionMonitor, stopTimeout);
    +		}
    +
    +		if (stopConnectionMonitorFuture != null) {
    +			boolean stopped = false;
    +
    +			try {
    +				stopped = Await.result(stopConnectionMonitorFuture, stopTimeout);
    +			} catch (Exception ex) {
    +				exception = ExceptionUtils.firstOrSuppressed(
    +					new Exception("ConnectionMonitor actor did not properly stop.", ex),
    +					exception
    +				);
    +			}
    +
    +			if (!stopped) {
    +				// the connectionMonitor actor did not stop in time, let's kill him
    +				connectionMonitor.tell(Kill.getInstance(), ActorRef.noSender());
    +			}
    +		}
    +
    +		if (launchCoordinator != null) {
    +			stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
    +			stopLaunchCoordinatorFuture = Patterns.gracefulStop(launchCoordinator, stopTimeout);
    +		}
    +
    +		if (stopLaunchCoordinatorFuture != null) {
    +			boolean stopped = false;
    +
    +			try {
    +				stopped = Await.result(stopLaunchCoordinatorFuture, stopTimeout);
    +			} catch (Exception ex) {
    +				exception = ExceptionUtils.firstOrSuppressed(
    +					new Exception("LaunchCoordinator actor did not properly stop.", ex),
    +					exception
    +				);
    +			}
    +
    +			if (!stopped) {
    +				// the launchCoordinator actor did not stop in time, let's kill him
    +				launchCoordinator.tell(Kill.getInstance(), ActorRef.noSender());
    +			}
    +		}
    +
    +		if (reconciliationCoordinator != null) {
    +			stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
    +			stopReconciliationCoordinatorFuture = Patterns.gracefulStop(reconciliationCoordinator, stopTimeout);
    +		}
    +
    +		if (stopReconciliationCoordinatorFuture != null) {
    +			boolean stopped = false;
    +
    +			try {
    +				stopped = Await.result(stopReconciliationCoordinatorFuture, stopTimeout);
    +			} catch (Exception ex) {
    +				exception = ExceptionUtils.firstOrSuppressed(
    +					new Exception("ReconciliationCoordinator actor did not properly stop.", ex),
    +					exception
    +				);
    +			}
    +
    +			if (!stopped) {
    +				// the reconciliationCoordinator actor did not stop in time, let's kill him
    +				reconciliationCoordinator.tell(Kill.getInstance(), ActorRef.noSender());
    +			}
    --- End diff --
    
    I think we can combine the different futures into a single future to wait on the shut down concurrently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4289: [FLINK-7092] [mesos] Shutdown ResourceManager comp...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4289


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4289: [FLINK-7092] [mesos] Shutdown ResourceManager comp...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4289#discussion_r129522795
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---
    @@ -312,6 +312,48 @@ private void recoverWorkers() throws Exception {
     	}
     
     	@Override
    +	public void shutDown() throws Exception {
    +		// shut down all components
    +		if (taskMonitor != null) {
    +			try {
    +				actorSystem.stop(taskMonitor);
    +			} catch (Throwable tt) {
    +				LOG.error("Failed to stop taskMonitor", tt);
    +			}
    +		}
    +
    +		if (connectionMonitor != null) {
    +			try {
    +				actorSystem.stop(connectionMonitor);
    +			} catch (Throwable tt) {
    +				LOG.error("Failed to stop connectionMonitor", tt);
    +			}
    +		}
    +
    +		if (launchCoordinator != null) {
    +			try {
    +				actorSystem.stop(launchCoordinator);
    +			} catch (Throwable tt) {
    +				LOG.error("Failed to stop launchCoordinator", tt);
    +			}
    +		}
    +
    +		if (reconciliationCoordinator != null) {
    +			try {
    +				actorSystem.stop(reconciliationCoordinator);
    +			} catch (Throwable tt) {
    +				LOG.error("Failed to stop reconciliationCoordinator", tt);
    +			}
    +		}
    +
    +		if (actorSystem != null) {
    +			actorSystem.shutdown();
    +			actorSystem.awaitTermination();
    +		}
    --- End diff --
    
    We cannot shutdown the `ActorSystem` here because it can be used by someone else outside of the `MesosResourceManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4289: [FLINK-7092] [mesos] Shutdown ResourceManager components ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/4289
  
    Thanks for your generous review @tillrohrmann. I am very appreciate it! PR updated again. Please helps to review. If I am wrong , please helps me out there ~ Peace!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---