You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by pnowojski <gi...@git.apache.org> on 2017/10/16 15:01:04 UTC

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

GitHub user pnowojski opened a pull request:

    https://github.com/apache/flink/pull/4840

    [FLINK-7368][metrics] Make MetricStore ThreadSafe class

    Remove external synchronisation on MetricStore
    
    ## What is the purpose of the change
    
    This is a refactor that makes `MetricStore` thread safe. It achieves it by pulling in all modifying methods and by returning immutable copies in getters.
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as `MetricStoreTest` or `MetricFetcherTest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink flink7368

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4840.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4840
    
----
commit e49438e76c1ffb92b24837d8b958ae491bb0f9ca
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-10-16T14:53:14Z

    [FLINK-7368][metrics] Make MetricStore ThreadSafe class
    
    Remove external synchronization on MetricStore

----


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146627036
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -260,50 +293,66 @@ public String getMetric(String name, String defaultValue) {
     				? value
     				: defaultValue;
     		}
    -	}
     
    -	/**
    -	 * Sub-structure containing metrics of the JobManager.
    -	 */
    -	public static class JobManagerMetricStore extends ComponentMetricStore {
    +		public static ComponentMetricStore unmodifiable(ComponentMetricStore source) {
    +			if (source == null) {
    +				return null;
    +			}
    +			return new ComponentMetricStore(unmodifiableMap(source.metrics));
    +		}
     	}
     
     	/**
     	 * Sub-structure containing metrics of a single TaskManager.
     	 */
    +	@ThreadSafe
     	public static class TaskManagerMetricStore extends ComponentMetricStore {
    -		public final Set<String> garbageCollectorNames = new HashSet<>();
    +		public final Set<String> garbageCollectorNames;
    +
    +		public TaskManagerMetricStore() {
    +			this(new ConcurrentHashMap<>(), ConcurrentHashMap.newKeySet());
    +		}
    +
    +		public TaskManagerMetricStore(Map<String, String> metrics, Set<String> garbageCollectorNames) {
    +			super(metrics);
    +			this.garbageCollectorNames = checkNotNull(garbageCollectorNames);
    +		}
     
     		public void addGarbageCollectorName(String name) {
     			garbageCollectorNames.add(name);
     		}
    +
    +		public static TaskManagerMetricStore unmodifiable(TaskManagerMetricStore source) {
    --- End diff --
    
    If the returned list is a fresh copy independent of the map it can be modifiable; the user can do whatever he wants with it.


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146825012
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -38,29 +43,136 @@
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Nested data-structure to store metrics.
    - *
    - * <p>This structure is not thread-safe.
      */
    +@ThreadSafe
     public class MetricStore {
     	private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class);
     
    -	final JobManagerMetricStore jobManager = new JobManagerMetricStore();
    -	final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
    -	final Map<String, JobMetricStore> jobs = new HashMap<>();
    +	private final ComponentMetricStore jobManager = new ComponentMetricStore();
    +	private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
    +	private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
    +
    +	/**
    +	 * Remove not active task managers.
    +	 *
    +	 * @param activeTaskManagers to retain.
    +	 */
    +	public synchronized void retainTaskManagers(List<String> activeTaskManagers) {
    +		taskManagers.keySet().retainAll(activeTaskManagers);
    +	}
    +
    +	/**
    +	 * Remove not active task managers.
    +	 *
    +	 * @param activeJobs to retain.
    +	 */
    +	public synchronized void retainJobs(List<String> activeJobs) {
    +		jobs.keySet().retainAll(activeJobs);
    +	}
    +
    +	/**
    +	 * Add metric dumps to the store.
    +	 *
    +	 * @param metricDumps to add.
    +	 */
    +	public synchronized void addAll(List<MetricDump> metricDumps) {
    +		for (MetricDump metric : metricDumps) {
    +			add(metric);
    +		}
    +	}
     
     	// -----------------------------------------------------------------------------------------------------------------
    -	// Adding metrics
    +	// Accessors for sub MetricStores
     	// -----------------------------------------------------------------------------------------------------------------
    -	public void add(MetricDump metric) {
    +
    +	/**
    +	 * Returns the {@link ComponentMetricStore}.
    +	 *
    +	 * @return JobManagerMetricStore
    --- End diff --
    
    outdated class reference


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4840


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146582951
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -260,50 +293,66 @@ public String getMetric(String name, String defaultValue) {
     				? value
     				: defaultValue;
     		}
    -	}
     
    -	/**
    -	 * Sub-structure containing metrics of the JobManager.
    -	 */
    -	public static class JobManagerMetricStore extends ComponentMetricStore {
    +		public static ComponentMetricStore unmodifiable(ComponentMetricStore source) {
    +			if (source == null) {
    +				return null;
    +			}
    +			return new ComponentMetricStore(unmodifiableMap(source.metrics));
    +		}
     	}
     
     	/**
     	 * Sub-structure containing metrics of a single TaskManager.
     	 */
    +	@ThreadSafe
     	public static class TaskManagerMetricStore extends ComponentMetricStore {
    -		public final Set<String> garbageCollectorNames = new HashSet<>();
    +		public final Set<String> garbageCollectorNames;
    +
    +		public TaskManagerMetricStore() {
    +			this(new ConcurrentHashMap<>(), ConcurrentHashMap.newKeySet());
    +		}
    +
    +		public TaskManagerMetricStore(Map<String, String> metrics, Set<String> garbageCollectorNames) {
    +			super(metrics);
    +			this.garbageCollectorNames = checkNotNull(garbageCollectorNames);
    +		}
     
     		public void addGarbageCollectorName(String name) {
     			garbageCollectorNames.add(name);
     		}
    +
    +		public static TaskManagerMetricStore unmodifiable(TaskManagerMetricStore source) {
    --- End diff --
    
    In which case we should rather modify the store to not allow writes in the first place, instead of opting for unmodifiable collections that are pretty much a hack. "here, have an object that fails for 50% of the defined methods"; that's hardly good design is it.
    
    Till suggested dedicated read methods that return metrics, something like `List<String> getMetrics(List<String> metricNames)` instead of exposing the metric maps.
    
    This would make the interface cleaner and would allow us to simplify the synchronization.
    
    



---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146819586
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -260,50 +293,66 @@ public String getMetric(String name, String defaultValue) {
     				? value
     				: defaultValue;
     		}
    -	}
     
    -	/**
    -	 * Sub-structure containing metrics of the JobManager.
    -	 */
    -	public static class JobManagerMetricStore extends ComponentMetricStore {
    +		public static ComponentMetricStore unmodifiable(ComponentMetricStore source) {
    +			if (source == null) {
    +				return null;
    +			}
    +			return new ComponentMetricStore(unmodifiableMap(source.metrics));
    +		}
     	}
     
     	/**
     	 * Sub-structure containing metrics of a single TaskManager.
     	 */
    +	@ThreadSafe
     	public static class TaskManagerMetricStore extends ComponentMetricStore {
    -		public final Set<String> garbageCollectorNames = new HashSet<>();
    +		public final Set<String> garbageCollectorNames;
    +
    +		public TaskManagerMetricStore() {
    +			this(new ConcurrentHashMap<>(), ConcurrentHashMap.newKeySet());
    +		}
    +
    +		public TaskManagerMetricStore(Map<String, String> metrics, Set<String> garbageCollectorNames) {
    +			super(metrics);
    +			this.garbageCollectorNames = checkNotNull(garbageCollectorNames);
    +		}
     
     		public void addGarbageCollectorName(String name) {
     			garbageCollectorNames.add(name);
     		}
    +
    +		public static TaskManagerMetricStore unmodifiable(TaskManagerMetricStore source) {
    --- End diff --
    
    Then you pay costs of copying all of the objects, which is much higher compared to the synchronisation. But yes, you can do that. In that case you don't need concurrent hash maps. 
    
    If you intend to go in other direction with this and leave the current code in the master as it is, please log the work in Jira for this after the release (I will close release blocker issue for which I started this PR). However I have an impression that any follow up work would be easier on top of this change and in the mean time this is still way better then current code on master branch with this external synchronisation.  


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146825259
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -181,74 +265,23 @@ private void addMetric(Map<String, String> target, String name, MetricDump metri
     	}
     
     	// -----------------------------------------------------------------------------------------------------------------
    -	// Accessors for sub MetricStores
    +	// sub MetricStore classes
     	// -----------------------------------------------------------------------------------------------------------------
     
     	/**
    -	 * Returns the {@link JobManagerMetricStore}.
    -	 *
    -	 * @return JobManagerMetricStore
    -	 */
    -	public JobManagerMetricStore getJobManagerMetricStore() {
    -		return jobManager;
    -	}
    -
    -	/**
    -	 * Returns the {@link TaskManagerMetricStore} for the given taskmanager ID.
    -	 *
    -	 * @param tmID taskmanager ID
    -	 * @return TaskManagerMetricStore for the given ID, or null if no store for the given argument exists
    +	 * Structure containing metrics of a single component.
     	 */
    -	public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) {
    -		return taskManagers.get(tmID);
    -	}
    +	@ThreadSafe
    +	public static class ComponentMetricStore {
    +		public final Map<String, String> metrics;
     
    -	/**
    -	 * Returns the {@link JobMetricStore} for the given job ID.
    -	 *
    -	 * @param jobID job ID
    -	 * @return JobMetricStore for the given ID, or null if no store for the given argument exists
    -	 */
    -	public JobMetricStore getJobMetricStore(String jobID) {
    -		return jobs.get(jobID);
    -	}
    -
    -	/**
    -	 * Returns the {@link TaskMetricStore} for the given job/task ID.
    -	 *
    -	 * @param jobID  job ID
    -	 * @param taskID task ID
    -	 * @return TaskMetricStore for given IDs, or null if no store for the given arguments exists
    -	 */
    -	public TaskMetricStore getTaskMetricStore(String jobID, String taskID) {
    -		JobMetricStore job = getJobMetricStore(jobID);
    -		if (job == null) {
    -			return null;
    +		public ComponentMetricStore() {
    --- End diff --
    
    all constructors can be private


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146576511
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -260,50 +293,66 @@ public String getMetric(String name, String defaultValue) {
     				? value
     				: defaultValue;
     		}
    -	}
     
    -	/**
    -	 * Sub-structure containing metrics of the JobManager.
    -	 */
    -	public static class JobManagerMetricStore extends ComponentMetricStore {
    +		public static ComponentMetricStore unmodifiable(ComponentMetricStore source) {
    +			if (source == null) {
    +				return null;
    +			}
    +			return new ComponentMetricStore(unmodifiableMap(source.metrics));
    +		}
     	}
     
     	/**
     	 * Sub-structure containing metrics of a single TaskManager.
     	 */
    +	@ThreadSafe
     	public static class TaskManagerMetricStore extends ComponentMetricStore {
    -		public final Set<String> garbageCollectorNames = new HashSet<>();
    +		public final Set<String> garbageCollectorNames;
    +
    +		public TaskManagerMetricStore() {
    +			this(new ConcurrentHashMap<>(), ConcurrentHashMap.newKeySet());
    +		}
    +
    +		public TaskManagerMetricStore(Map<String, String> metrics, Set<String> garbageCollectorNames) {
    +			super(metrics);
    +			this.garbageCollectorNames = checkNotNull(garbageCollectorNames);
    +		}
     
     		public void addGarbageCollectorName(String name) {
     			garbageCollectorNames.add(name);
     		}
    +
    +		public static TaskManagerMetricStore unmodifiable(TaskManagerMetricStore source) {
    --- End diff --
    
    It's a matter of clean design and maintaining it in the future. You only *know* that we do not modify this map **at the moment**. Are you sure that every committer reviewing changes in this parts of the code will always spot any such modifications without `unmodifiable` assertion?  On the other hand removing `unmodifiable` in a PR is big yellow warning light that's something is going on.
    
    Following this logic, why don't we make all methods/fields `public`, since we know that we will never try to access supposed to be `private` fields?


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski closed the pull request at:

    https://github.com/apache/flink/pull/4840


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146594528
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -260,50 +293,66 @@ public String getMetric(String name, String defaultValue) {
     				? value
     				: defaultValue;
     		}
    -	}
     
    -	/**
    -	 * Sub-structure containing metrics of the JobManager.
    -	 */
    -	public static class JobManagerMetricStore extends ComponentMetricStore {
    +		public static ComponentMetricStore unmodifiable(ComponentMetricStore source) {
    +			if (source == null) {
    +				return null;
    +			}
    +			return new ComponentMetricStore(unmodifiableMap(source.metrics));
    +		}
     	}
     
     	/**
     	 * Sub-structure containing metrics of a single TaskManager.
     	 */
    +	@ThreadSafe
     	public static class TaskManagerMetricStore extends ComponentMetricStore {
    -		public final Set<String> garbageCollectorNames = new HashSet<>();
    +		public final Set<String> garbageCollectorNames;
    +
    +		public TaskManagerMetricStore() {
    +			this(new ConcurrentHashMap<>(), ConcurrentHashMap.newKeySet());
    +		}
    +
    +		public TaskManagerMetricStore(Map<String, String> metrics, Set<String> garbageCollectorNames) {
    +			super(metrics);
    +			this.garbageCollectorNames = checkNotNull(garbageCollectorNames);
    +		}
     
     		public void addGarbageCollectorName(String name) {
     			garbageCollectorNames.add(name);
     		}
    +
    +		public static TaskManagerMetricStore unmodifiable(TaskManagerMetricStore source) {
    --- End diff --
    
    Would you rather have a runtime error about illegal/unintended write access or none at all? Java sucks in this regard (missing const-correctness) :(
    
    Exposing `List<...>` have the same issue.


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146552100
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -38,29 +43,136 @@
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Nested data-structure to store metrics.
    - *
    - * <p>This structure is not thread-safe.
      */
    +@ThreadSafe
     public class MetricStore {
     	private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class);
     
    -	final JobManagerMetricStore jobManager = new JobManagerMetricStore();
    -	final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
    -	final Map<String, JobMetricStore> jobs = new HashMap<>();
    +	private final ComponentMetricStore jobManager = new ComponentMetricStore();
    +	private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
    +	private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
    +
    +	/**
    +	 * Remove not active task managers.
    +	 *
    +	 * @param activeTaskManagers to retain.
    +	 */
    +	public synchronized void retainTaskManagers(List<String> activeTaskManagers) {
    --- End diff --
    
    but now we're paying the synchronization costs twice, once for the synchronized keyword and once again for every access to the map. If every method is synchronized the maps don't have to be concurrent hash maps.


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146574784
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -38,29 +43,136 @@
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Nested data-structure to store metrics.
    - *
    - * <p>This structure is not thread-safe.
      */
    +@ThreadSafe
     public class MetricStore {
     	private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class);
     
    -	final JobManagerMetricStore jobManager = new JobManagerMetricStore();
    -	final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
    -	final Map<String, JobMetricStore> jobs = new HashMap<>();
    +	private final ComponentMetricStore jobManager = new ComponentMetricStore();
    +	private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
    +	private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
    +
    +	/**
    +	 * Remove not active task managers.
    +	 *
    +	 * @param activeTaskManagers to retain.
    +	 */
    +	public synchronized void retainTaskManagers(List<String> activeTaskManagers) {
    --- End diff --
    
    1. performance here is not that big a deal, but code correctness is. With `synchronized` it is just easier to implement any changes here in a thread safe manner. Without it, any new developer coming here will have to understand much more assumptions about this code, like whether consistency matters here or nor? whether order of the operations/access to the fields is important or nor? etc...
    
    2. even with `synchronized` we still need either concurrent hash maps, because we return and make them visible to the outside world by getters. So we either need concurrent hash maps or return copies of them.


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r145611565
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -38,29 +43,136 @@
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Nested data-structure to store metrics.
    - *
    - * <p>This structure is not thread-safe.
      */
    +@ThreadSafe
     public class MetricStore {
     	private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class);
     
    -	final JobManagerMetricStore jobManager = new JobManagerMetricStore();
    -	final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
    -	final Map<String, JobMetricStore> jobs = new HashMap<>();
    +	private final ComponentMetricStore jobManager = new ComponentMetricStore();
    +	private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
    +	private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
    +
    +	/**
    +	 * Remove not active task managers.
    +	 *
    +	 * @param activeTaskManagers to retain.
    +	 */
    +	public synchronized void retainTaskManagers(List<String> activeTaskManagers) {
    --- End diff --
    
    Strictly speaking right now it isn't. However without synchronising those methods, implementing them is much harder. For example I made one mistake where I implemented:
    ```
    if (!jobs.containKey(jobID)) {
      return null;
    }
    return jobs.get(jobID).getTaskMetricStore(taskID)
    ```
    Also without synchronising it's harder to understand correctness of code, that's handling three separate fields (whether this is a correct access order; whether the state of variable is/should be coherent, like if existence of key in one map, should imply existence a key on another, etc). 
    
    Long story short, synchronising is not an issue here, but helps with reasoning about this code.


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146831312
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java ---
    @@ -79,7 +79,7 @@ public void addIOMetrics(AccessExecution attempt, @Nullable MetricFetcher fetche
     				fetcher.update();
     				MetricStore metricStore = fetcher.getMetricStore();
     				synchronized (metricStore) {
    --- End diff --
    
    Don't know why I have missed it :(


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146824504
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java ---
    @@ -79,7 +79,7 @@ public void addIOMetrics(AccessExecution attempt, @Nullable MetricFetcher fetche
     				fetcher.update();
     				MetricStore metricStore = fetcher.getMetricStore();
     				synchronized (metricStore) {
    --- End diff --
    
    leftover synchronization


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146824632
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -38,29 +43,136 @@
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Nested data-structure to store metrics.
    - *
    - * <p>This structure is not thread-safe.
      */
    +@ThreadSafe
     public class MetricStore {
     	private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class);
     
    -	final JobManagerMetricStore jobManager = new JobManagerMetricStore();
    -	final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
    -	final Map<String, JobMetricStore> jobs = new HashMap<>();
    +	private final ComponentMetricStore jobManager = new ComponentMetricStore();
    +	private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
    +	private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
    +
    +	/**
    +	 * Remove not active task managers.
    +	 *
    +	 * @param activeTaskManagers to retain.
    +	 */
    +	public synchronized void retainTaskManagers(List<String> activeTaskManagers) {
    +		taskManagers.keySet().retainAll(activeTaskManagers);
    +	}
    +
    +	/**
    +	 * Remove not active task managers.
    +	 *
    +	 * @param activeJobs to retain.
    +	 */
    +	public synchronized void retainJobs(List<String> activeJobs) {
    --- End diff --
    
    can be package private


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146824968
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -38,29 +43,136 @@
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Nested data-structure to store metrics.
    - *
    - * <p>This structure is not thread-safe.
      */
    +@ThreadSafe
     public class MetricStore {
     	private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class);
     
    -	final JobManagerMetricStore jobManager = new JobManagerMetricStore();
    -	final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
    -	final Map<String, JobMetricStore> jobs = new HashMap<>();
    +	private final ComponentMetricStore jobManager = new ComponentMetricStore();
    +	private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
    +	private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
    +
    +	/**
    +	 * Remove not active task managers.
    +	 *
    +	 * @param activeTaskManagers to retain.
    +	 */
    +	public synchronized void retainTaskManagers(List<String> activeTaskManagers) {
    +		taskManagers.keySet().retainAll(activeTaskManagers);
    +	}
    +
    +	/**
    +	 * Remove not active task managers.
    +	 *
    +	 * @param activeJobs to retain.
    +	 */
    +	public synchronized void retainJobs(List<String> activeJobs) {
    +		jobs.keySet().retainAll(activeJobs);
    +	}
    +
    +	/**
    +	 * Add metric dumps to the store.
    +	 *
    +	 * @param metricDumps to add.
    +	 */
    +	public synchronized void addAll(List<MetricDump> metricDumps) {
    +		for (MetricDump metric : metricDumps) {
    +			add(metric);
    +		}
    +	}
     
     	// -----------------------------------------------------------------------------------------------------------------
    -	// Adding metrics
    +	// Accessors for sub MetricStores
     	// -----------------------------------------------------------------------------------------------------------------
    -	public void add(MetricDump metric) {
    +
    +	/**
    +	 * Returns the {@link ComponentMetricStore}.
    --- End diff --
    
    javadoc is missing any mention of jobmanager


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146825384
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -260,50 +293,66 @@ public String getMetric(String name, String defaultValue) {
     				? value
     				: defaultValue;
     		}
    -	}
     
    -	/**
    -	 * Sub-structure containing metrics of the JobManager.
    -	 */
    -	public static class JobManagerMetricStore extends ComponentMetricStore {
    +		public static ComponentMetricStore unmodifiable(ComponentMetricStore source) {
    --- End diff --
    
    can be private


---

[GitHub] flink issue #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe class

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4840
  
    Thanks!


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146553570
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -260,50 +293,66 @@ public String getMetric(String name, String defaultValue) {
     				? value
     				: defaultValue;
     		}
    -	}
     
    -	/**
    -	 * Sub-structure containing metrics of the JobManager.
    -	 */
    -	public static class JobManagerMetricStore extends ComponentMetricStore {
    +		public static ComponentMetricStore unmodifiable(ComponentMetricStore source) {
    +			if (source == null) {
    +				return null;
    +			}
    +			return new ComponentMetricStore(unmodifiableMap(source.metrics));
    +		}
     	}
     
     	/**
     	 * Sub-structure containing metrics of a single TaskManager.
     	 */
    +	@ThreadSafe
     	public static class TaskManagerMetricStore extends ComponentMetricStore {
    -		public final Set<String> garbageCollectorNames = new HashSet<>();
    +		public final Set<String> garbageCollectorNames;
    +
    +		public TaskManagerMetricStore() {
    +			this(new ConcurrentHashMap<>(), ConcurrentHashMap.newKeySet());
    +		}
    +
    +		public TaskManagerMetricStore(Map<String, String> metrics, Set<String> garbageCollectorNames) {
    +			super(metrics);
    +			this.garbageCollectorNames = checkNotNull(garbageCollectorNames);
    +		}
     
     		public void addGarbageCollectorName(String name) {
     			garbageCollectorNames.add(name);
     		}
    +
    +		public static TaskManagerMetricStore unmodifiable(TaskManagerMetricStore source) {
    --- End diff --
    
    I'm wondering whether we really need this unmodifiable business. Yes, it's technically a good idea, but the access to the MetricStore is limited and fully under our control; so we _know_ that we never try to modify the map.


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by pnowojski <gi...@git.apache.org>.
GitHub user pnowojski reopened a pull request:

    https://github.com/apache/flink/pull/4840

    [FLINK-7368][metrics] Make MetricStore ThreadSafe class

    Remove external synchronisation on MetricStore
    
    ## What is the purpose of the change
    
    This is a refactor that makes `MetricStore` thread safe. It achieves it by pulling in all modifying methods and by returning immutable copies in getters.
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as `MetricStoreTest` or `MetricFetcherTest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink flink7368

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4840.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4840
    
----
commit b19077430808923e047b808f68efd47f4c9527e0
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-10-16T14:53:14Z

    [FLINK-7368][metrics] Make MetricStore ThreadSafe class
    
    Remove external synchronization on MetricStore

commit b6be2ce229ec3c97cd3bc3861f1989e971599fd7
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-10-25T11:53:16Z

    fixup! [FLINK-7368][metrics] Make MetricStore ThreadSafe class

----


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146825517
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java ---
    @@ -58,9 +59,9 @@ public void testMalformedNameHandling() {
     		store.add(cd);
     
     		//-----verify that no side effects occur
    -		assertEquals(0, store.jobManager.metrics.size());
    -		assertEquals(0, store.taskManagers.size());
    -		assertEquals(0, store.jobs.size());
    +		assertTrue(store.getTaskManagers().isEmpty());
    --- End diff --
    
    i actually prefer the previous version as it prints the actual value.


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r145565409
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -38,29 +43,136 @@
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Nested data-structure to store metrics.
    - *
    - * <p>This structure is not thread-safe.
      */
    +@ThreadSafe
     public class MetricStore {
     	private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class);
     
    -	final JobManagerMetricStore jobManager = new JobManagerMetricStore();
    -	final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
    -	final Map<String, JobMetricStore> jobs = new HashMap<>();
    +	private final ComponentMetricStore jobManager = new ComponentMetricStore();
    +	private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
    +	private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
    +
    +	/**
    +	 * Remove not active task managers.
    +	 *
    +	 * @param activeTaskManagers to retain.
    +	 */
    +	public synchronized void retainTaskManagers(List<String> activeTaskManagers) {
    --- End diff --
    
    Isn't the method synchronization unnecessary now?


---

[GitHub] flink pull request #4840: [FLINK-7368][metrics] Make MetricStore ThreadSafe ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146824651
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---
    @@ -38,29 +43,136 @@
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
     import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Nested data-structure to store metrics.
    - *
    - * <p>This structure is not thread-safe.
      */
    +@ThreadSafe
     public class MetricStore {
     	private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class);
     
    -	final JobManagerMetricStore jobManager = new JobManagerMetricStore();
    -	final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
    -	final Map<String, JobMetricStore> jobs = new HashMap<>();
    +	private final ComponentMetricStore jobManager = new ComponentMetricStore();
    +	private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
    +	private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
    +
    +	/**
    +	 * Remove not active task managers.
    +	 *
    +	 * @param activeTaskManagers to retain.
    +	 */
    +	public synchronized void retainTaskManagers(List<String> activeTaskManagers) {
    +		taskManagers.keySet().retainAll(activeTaskManagers);
    +	}
    +
    +	/**
    +	 * Remove not active task managers.
    +	 *
    +	 * @param activeJobs to retain.
    +	 */
    +	public synchronized void retainJobs(List<String> activeJobs) {
    +		jobs.keySet().retainAll(activeJobs);
    +	}
    +
    +	/**
    +	 * Add metric dumps to the store.
    +	 *
    +	 * @param metricDumps to add.
    +	 */
    +	public synchronized void addAll(List<MetricDump> metricDumps) {
    --- End diff --
    
    package private


---