You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/10/20 10:22:48 UTC

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4872

    [FLINK-7876] Register TaskManagerMetricGroup under ResourceID

    ## What is the purpose of the change
    
    This commit changes that TaskManagerMetricGroups are now registered under the
    TaskManager's ResourceID instead of the InstanceID. This allows to create the
    TaskManagerMetricGroup at startup of the TaskManager.
    
    Moreover, it pulls the MetricRegistry out of JobManager and TaskManager. This
    allows to reuse the same MetricRegistry across multiple instances (e.g. in the
    FlinkMiniCluster case). Moreover, it ensures proper cleanup of a potentially
    started MetricyQueryServiceActor.
    
    Last but not least the PR corrects how the `MetricRegistry` is instantiated in the `ClusterEntrypoint`.
    
    ## Brief change log
    
    - Pull MetricRegistry out of `JobManager` and `TaskManager`
    - Provide `TaskManagerMetricGroup` and `JobManagerMetricGroup` at start up of respective component
    - Register `TaskManagerMetricGroup` under `TaskManager's` `ResourceID`
    - Adapt `TaskManagersHandler` to serve `ResourceID` instead of `InstanceID`
    - Adapt `MetricFetcher` to retain `TaskManagers` based on `ResourceID`
    - Correct instantiation of `MetricRegistry` in `ClusterEntrypoint`
    - Make `MetricRegistry` an interface and rename `MetricRegistry` into `MetricRegistryImpl` as one of the implementations of the `MetricRegistry` interface
    - Introduce `NoOpMetricRegistry` implementation which simply does nothing and can be used when testing components for which one does not want to register metrics
    
    ## Verifying this change
    
    Tested manually.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink taskManagerMetricGroupRegistration

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4872.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4872
    
----
commit 9106445f757099763c350b96dc0a0afad8f9b081
Author: Till <ti...@gmail.com>
Date:   2017-10-18T11:50:06Z

    [FLINK-7863] Generalize MetricFetcher to work with a RestfulGateway
    
    Add more logging to MetricFetcher

commit 577ac1d23afe9fe04d1026600e15f26270f8252e
Author: Till <ti...@gmail.com>
Date:   2017-10-19T12:51:07Z

    Fix MetricFetcherTest

commit 7371c68e09ed983e21ee46110566f60b57dcf942
Author: Till <ti...@gmail.com>
Date:   2017-10-19T15:22:53Z

    [FLINK-7876] Register TaskManagerMetricGroup under ResourceID
    
    This commit changes that TaskManagerMetricGroups are now registered under the
    TaskManager's ResourceID instead of the InstanceID. This allows to create the
    TaskManagerMetricGroup at startup of the TaskManager.
    
    Moreover, it pulls the MetricRegistry out of JobManager and TaskManager. This
    allows to reuse the same MetricRegistry across multiple instances (e.g. in the
    FlinkMiniCluster case). Moreover, it ensures proper cleanup of a potentially
    started MetricyQueryServiceActor.
    
    Change TaskManagersHandler to work with ResourceID instead of InstanceID
    
    Adapt MetricFetcher to use ResourceID instead of InstanceID

commit 660d33ee72de982996e49a88f99591a2e2062b22
Author: Till <ti...@gmail.com>
Date:   2017-10-20T10:13:10Z

    [FLINK-7876] Properly start and shutdown MetricRegistry by ClusterEntrypoint

----


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r148226952
  
    --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---
    @@ -42,7 +42,8 @@ import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, Submitt
     import org.apache.flink.runtime.leaderelection.LeaderElectionService
     import org.apache.flink.runtime.messages.JobManagerMessages
     import org.apache.flink.runtime.messages.JobManagerMessages._
    -import org.apache.flink.runtime.metrics.MetricRegistry
    +import org.apache.flink.runtime.metrics.MetricRegistryImpl
    --- End diff --
    
    Will remove it.


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r148227334
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java ---
    @@ -28,15 +28,15 @@
     import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
     import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
     import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
    -import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryImpl;
     import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
     
     import java.util.UUID;
     
     public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
     	
    -	private static final MetricRegistry EMPTY_REGISTRY = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
    +	private static final MetricRegistryImpl EMPTY_REGISTRY = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
    --- End diff --
    
    good catch. Will change it.


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r147149524
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ---
    @@ -239,7 +239,15 @@ public void shutdown() {
     
     			if (queryService != null) {
     				stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
    -				stopFuture = Patterns.gracefulStop(queryService, stopTimeout);
    +
    +				try {
    +					stopFuture = Patterns.gracefulStop(queryService, stopTimeout);
    +				} catch (IllegalStateException ignored) {
    +					// this can happen if the underlying actor system has been stopped before shutting
    +					// the metric registry down
    +					// TODO: Pull the MetricQueryService actor out of the MetricRegistry
    +					LOG.debug("Cannot gracefully stop the metric query service actor.");
    --- End diff --
    
    include exception


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r147654224
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java ---
    @@ -28,15 +28,15 @@
     import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
     import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
     import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
    -import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryImpl;
     import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
     
     import java.util.UUID;
     
     public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
     	
    -	private static final MetricRegistry EMPTY_REGISTRY = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
    +	private static final MetricRegistryImpl EMPTY_REGISTRY = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
    --- End diff --
    
    This could be a no-op registry.


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r148226047
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---
    @@ -166,4 +169,12 @@ void notifySlotAvailable(
     	 * @return Future containing the resource overview
     	 */
     	CompletableFuture<ResourceOverview> requestResourceOverview(@RpcTimeout Time timeout);
    +
    +	/**
    +	 * Requests the paths for the TaskManager's {@link MetricQueryService} to query.
    +	 *
    +	 * @param timeout for the asynchronous operation
    +	 * @return Future containing the collection of instance ids and the corresponding metric query service path
    --- End diff --
    
    Good catch.


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r147416486
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ---
    @@ -239,7 +239,15 @@ public void shutdown() {
     
     			if (queryService != null) {
     				stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
    -				stopFuture = Patterns.gracefulStop(queryService, stopTimeout);
    +
    +				try {
    +					stopFuture = Patterns.gracefulStop(queryService, stopTimeout);
    +				} catch (IllegalStateException ignored) {
    +					// this can happen if the underlying actor system has been stopped before shutting
    +					// the metric registry down
    +					// TODO: Pull the MetricQueryService actor out of the MetricRegistry
    +					LOG.debug("Cannot gracefully stop the metric query service actor.");
    --- End diff --
    
    Giving it a second look, I deliberately did not include the exception, because it can only happen if the `ActorSystem` has been shut down before. I'll change the debug log message instead.


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r147666537
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---
    @@ -166,4 +169,12 @@ void notifySlotAvailable(
     	 * @return Future containing the resource overview
     	 */
     	CompletableFuture<ResourceOverview> requestResourceOverview(@RpcTimeout Time timeout);
    +
    +	/**
    +	 * Requests the paths for the TaskManager's {@link MetricQueryService} to query.
    +	 *
    +	 * @param timeout for the asynchronous operation
    +	 * @return Future containing the collection of instance ids and the corresponding metric query service path
    --- End diff --
    
    resource ids?


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r147415810
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java ---
    @@ -113,7 +113,7 @@ protected void startClusterComponents(Configuration configuration, RpcService rp
     		RpcService rpcService,
     		HighAvailabilityServices highAvailabilityServices,
     		HeartbeatServices heartbeatServices,
    -		MetricRegistry metricRegistry,
    +		MetricRegistryImpl metricRegistry,
    --- End diff --
    
    You're right. I'll go over the different places again and try to fix it.


---

[GitHub] flink issue #4872: [FLINK-7876] Register TaskManagerMetricGroup under Resour...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4872
  
    This PR does not strictly depend on the `MetricFetcher` changes. The reason why they are is simply that I directly wanted to adapt the `MetricFetcher` which I touched with #4852. It just turned out later that I would be fixing FLINK-7100 with this PR as well.
    
    I actually think that #4852 can also be merged into the release branch. However, if you insist, then I can try to disentangle this PR from the `MetricFetcher` changes.


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r147665524
  
    --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala ---
    @@ -31,7 +31,8 @@ import org.apache.flink.runtime.instance.InstanceManager
     import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
     import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
     import org.apache.flink.runtime.leaderelection.LeaderElectionService
    -import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
    +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
    +import org.apache.flink.runtime.metrics.{MetricRegistryImpl => FlinkMetricRegistry}
    --- End diff --
    
    unused


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r147657678
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---
    @@ -80,7 +80,6 @@
     import akka.actor.ActorRef;
     import akka.actor.ActorSystem;
     import akka.testkit.JavaTestKit;
    -
    --- End diff --
    
    completely unrelated change, revert.


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4872


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r148226096
  
    --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala ---
    @@ -31,7 +31,8 @@ import org.apache.flink.runtime.instance.InstanceManager
     import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
     import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
     import org.apache.flink.runtime.leaderelection.LeaderElectionService
    -import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
    +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
    +import org.apache.flink.runtime.metrics.{MetricRegistryImpl => FlinkMetricRegistry}
    --- End diff --
    
    Will remove it.


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r147655034
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java ---
    @@ -76,25 +74,22 @@ public void testUpdate() throws Exception {
     		JobID jobID = new JobID();
     		InstanceID tmID = new InstanceID();
    --- End diff --
    
    we no longer need this field; generated the resource ID with ResourceID#generate instead.


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r148226738
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---
    @@ -163,12 +156,14 @@ public JobLeaderService getJobLeaderService() {
     	 *
     	 * @param resourceID resource ID of the task manager
     	 * @param taskManagerServicesConfiguration task manager configuration
    +	 * @param metricRegistry to register the TaskManagerMetricGroup
     	 * @return task manager components
     	 * @throws Exception
     	 */
     	public static TaskManagerServices fromConfiguration(
    --- End diff --
    
    True, I'll pull the `TaskManagerMetricGroup` instantiation out of the `TaskManagerServices#fromConfiguration`.


---

[GitHub] flink issue #4872: [FLINK-7876] Register TaskManagerMetricGroup under Resour...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4872
  
    Does this PR actually depend on the MetricFetcher changes?


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r147415854
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ---
    @@ -239,7 +239,15 @@ public void shutdown() {
     
     			if (queryService != null) {
     				stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
    -				stopFuture = Patterns.gracefulStop(queryService, stopTimeout);
    +
    +				try {
    +					stopFuture = Patterns.gracefulStop(queryService, stopTimeout);
    +				} catch (IllegalStateException ignored) {
    +					// this can happen if the underlying actor system has been stopped before shutting
    +					// the metric registry down
    +					// TODO: Pull the MetricQueryService actor out of the MetricRegistry
    +					LOG.debug("Cannot gracefully stop the metric query service actor.");
    --- End diff --
    
    good point


---

[GitHub] flink issue #4872: [FLINK-7876] Register TaskManagerMetricGroup under Resour...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4872
  
    Thanks for the review @zentol. I've addressed your comments and rebased onto the latest master. If Travis gives green light, then I'll merge this PR.


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r147657172
  
    --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---
    @@ -42,7 +42,8 @@ import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, Submitt
     import org.apache.flink.runtime.leaderelection.LeaderElectionService
     import org.apache.flink.runtime.messages.JobManagerMessages
     import org.apache.flink.runtime.messages.JobManagerMessages._
    -import org.apache.flink.runtime.metrics.MetricRegistry
    +import org.apache.flink.runtime.metrics.MetricRegistryImpl
    --- End diff --
    
    unused import


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r147151661
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java ---
    @@ -113,7 +113,7 @@ protected void startClusterComponents(Configuration configuration, RpcService rp
     		RpcService rpcService,
     		HighAvailabilityServices highAvailabilityServices,
     		HeartbeatServices heartbeatServices,
    -		MetricRegistry metricRegistry,
    +		MetricRegistryImpl metricRegistry,
    --- End diff --
    
    This should be typed to MetricRegistry, and i suspect this also applies to many other places.


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r147656394
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---
    @@ -163,12 +156,14 @@ public JobLeaderService getJobLeaderService() {
     	 *
     	 * @param resourceID resource ID of the task manager
     	 * @param taskManagerServicesConfiguration task manager configuration
    +	 * @param metricRegistry to register the TaskManagerMetricGroup
     	 * @return task manager components
     	 * @throws Exception
     	 */
     	public static TaskManagerServices fromConfiguration(
    --- End diff --
    
    Seems a bit dirty that the `fromConfiguration` method accepts a non-configuration object.


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r148226893
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java ---
    @@ -76,25 +74,22 @@ public void testUpdate() throws Exception {
     		JobID jobID = new JobID();
     		InstanceID tmID = new InstanceID();
    --- End diff --
    
    Good catch. Will change it.


---

[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4872#discussion_r148227159
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---
    @@ -80,7 +80,6 @@
     import akka.actor.ActorRef;
     import akka.actor.ActorSystem;
     import akka.testkit.JavaTestKit;
    -
    --- End diff --
    
    will do.


---