You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by bbayani <gi...@git.apache.org> on 2017/08/30 17:06:28 UTC

[GitHub] flink pull request #4628: [FLINK-7486]:[flink-mesos]:Support for adding uniq...

GitHub user bbayani opened a pull request:

    https://github.com/apache/flink/pull/4628

    [FLINK-7486]:[flink-mesos]:Support for adding unique attribute / grou…

    …p_by attribute constraints
    
    JIRA issue: FLINK-7486
    ## What is the purpose of the change
    - To be able to add soft constraint to balance the tasks across mesos-agents on basis of host constraint 
    
    ## Brief change log
    - Added new config 'mesos.constraints.soft.balanced'
    - Parsed the config in MesosTaskManagerParams and prepared list of BalancedConstraintParam objects
    - Prepared Set of TaskIds from workersInNew and workersInLaunch in MesosResourceManager/MesosFlinkResourceManager. Used this Set in coTaskGetter to get co-task-ids
    - Updated LaunchableMesosWorker to return soft constraints from BalancedConstraintParam objects
    
    ## Verifying this change
      - Manually verified the change by running flink on mesos cluster. The mesos worker had attributes AZ and hostname set. After adding the constraint, taskmanagers were launched on mesos-workers with unique AZ/hostname values.
     
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: don't know
      - The runtime per-record code paths (performance sensitive): don't know
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes (flink deployment on mesos)
    
    ## Documentation
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? docs
         The feature is controlled by config 'mesos.constraints.soft.balanced' in flink-conf.yaml
          The config is documented in docs.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bbayani/flink soft_balanced_constraint

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4628.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4628
    
----
commit c333344ba0b57d94bb3e1ae7c88e50d2a8549a8b
Author: bbayani <bb...@cisco.com>
Date:   2017-08-30T16:36:45Z

    [FLINK-7486]:[flink-mesos]:Support for adding unique attribute / group_by attribute constraints

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4628: [FLINK-7486]:[flink-mesos]:Support for adding uniq...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4628#discussion_r140032946
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java ---
    @@ -675,6 +679,42 @@ private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID)
     	}
     
     	/**
    +	 * Sets a coTaskGetter callback for evaluating balancing constraint.
    +	 */
    +	private void setCoTaskGetter() {
    +		for (MesosTaskManagerParameters.BalancedHostAttrConstraintParams param : taskManagerParameters.balancedConstraintParams()) {
    +			param.setCoTasksGetter(new Func1<String, Set<String>>() {
    +				@Override
    +				public Set<String> call(String s) {
    +					Map<String, Set<String>> taskToCoTasksMap = new HashMap<>();
    +					Set <String> taskIds = getTaskIdsSet();
    +					for (String taskId : taskIds) {
    +						Set <String> coTaskIds = new HashSet<>(taskIds);
    +						coTaskIds.remove(taskId);
    +						taskToCoTasksMap.put(taskId, coTaskIds);
    +					}
    +					return taskToCoTasksMap.get(s);
    +				}
    +			});
    +		}
    +	}
    +
    +	/**
    +	 * Compiles the set of task IDs in new/launch state.
    +	 * @return The unique TaskIDs
    +	 */
    +	private Set<String> getTaskIdsSet() {
    +		Set<String> taskIds = new HashSet<String>();
    +		List <MesosWorkerStore.Worker> workers = new ArrayList<MesosWorkerStore.Worker>();
    +		workers.addAll(this.workersInNew.values());
    +		workers.addAll(this.workersInLaunch.values());
    --- End diff --
    
    I think we don't have to add the `workersInNew.values()` and `workersInLaunch.values()` first to `workers` and then only to `taskIds`. We can directly add them to `taskIds`. Saves us one copy operation.


---

[GitHub] flink pull request #4628: [FLINK-7486]:[flink-mesos]:Support for adding uniq...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4628#discussion_r140034034
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java ---
    @@ -675,6 +679,42 @@ private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID)
     	}
     
     	/**
    +	 * Sets a coTaskGetter callback for evaluating balancing constraint.
    +	 */
    +	private void setCoTaskGetter() {
    +		for (MesosTaskManagerParameters.BalancedHostAttrConstraintParams param : taskManagerParameters.balancedConstraintParams()) {
    +			param.setCoTasksGetter(new Func1<String, Set<String>>() {
    +				@Override
    +				public Set<String> call(String s) {
    +					Map<String, Set<String>> taskToCoTasksMap = new HashMap<>();
    +					Set <String> taskIds = getTaskIdsSet();
    +					for (String taskId : taskIds) {
    +						Set <String> coTaskIds = new HashSet<>(taskIds);
    +						coTaskIds.remove(taskId);
    +						taskToCoTasksMap.put(taskId, coTaskIds);
    +					}
    +					return taskToCoTasksMap.get(s);
    --- End diff --
    
    I think you're doing a lot of redundant work here. Wouldn't `taskIds.remove(s)` simply do the same?


---

[GitHub] flink pull request #4628: [FLINK-7486]:[flink-mesos]:Support for adding uniq...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4628#discussion_r140051191
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java ---
    @@ -663,6 +666,7 @@ private void taskTerminated(Protos.TaskID taskID, Protos.TaskStatus status) {
     	// ------------------------------------------------------------------------
     
     	private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
    +		setCoTaskGetter();
    --- End diff --
    
    I believe that this approach is violating an Akka rule by passing a direct reference to the RM object to the launch coordinator.   I understand the challenge to be that Fenzo's `BalancedHostAttrConstraint` expects a `coTasksGetter` function ([ref](http://netflix.github.io/Fenzo/fenzo-core/com/netflix/fenzo/plugins/BalancedHostAttrConstraint.html#BalancedHostAttrConstraint-com.netflix.fenzo.functions.Func1-java.lang.String-int-)).   But that function cannot keep a reference to the RM object.  
    
     An imperfect solution would be to pass the `taskIds` to the `LaunchableMesosWorker` constructor; imperfect because the list wouldn't be dynamic.
    
    In truth we want all tasks to be balanced and so the getter is actually overkill.  If you look at the implementation of `BalancedHostAttrConstraint` ([ref](https://github.com/Netflix/Fenzo/blob/master/fenzo-core/src/main/java/com/netflix/fenzo/plugins/BalancedHostAttrConstraint.java)) it uses the function output to lookup tasks in the `TaskTrackerState`.   A custom constraint could simply use all tasks in `TaskTrackerState`  without need for a `coTasksGetter`.
    
    Once we get this sorted out, some of the code should be moved to a utility class.


---

[GitHub] flink pull request #4628: [FLINK-7486]:[flink-mesos]:Support for adding uniq...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4628#discussion_r140038861
  
    --- Diff: docs/ops/deployment/mesos.md ---
    @@ -226,6 +226,10 @@ When running Flink with Marathon, the whole Flink cluster including the job mana
     Takes a comma-separated list of key:value pairs corresponding to the attributes exposed by the target
     mesos agents.  Example: `az:eu-west-1a,series:t2`
     
    +`mesos.constraints.soft.balanced`: Soft Constraints for balancing the tasks across mesos based on agent attributes (**DEFAULT**: None).
    +Takes a comma-separated list of key=value pairs. Key corresponds to host attribute and value is number of expected unique values for given host attribute.
    +Example: `az=3,rack_id=4`
    --- End diff --
    
    This syntax seems confusing since it reads like a constraint that `az` will be equal to `3`.  How about: `az(3),rack_id(4)`.
    
    Feel free to use a regular expression to easily parse it (just a suggestion): `(?:(\w+)\((\d+)\)(?=,|$))`


---

[GitHub] flink pull request #4628: [FLINK-7486]:[flink-mesos]:Support for adding uniq...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4628#discussion_r140022455
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java ---
    @@ -155,7 +155,7 @@ public int getPorts() {
     
     		@Override
     		public List<? extends VMTaskFitnessCalculator> getSoftConstraints() {
    -			return null;
    +			return params.softConstraints();
    --- End diff --
    
    Let's pass the soft constraints to the launchable mesos worker in its constructor, rather than fetching them from the params.    The params aren't a good vehicle for this information.


---

[GitHub] flink pull request #4628: [FLINK-7486]:[flink-mesos]:Support for adding uniq...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4628#discussion_r140021958
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java ---
    @@ -366,6 +425,34 @@ public String call(String s) {
     	}
     
     	/**
    +	 * Internal class that stores the parsed information about soft constraint
    +	 * It encapsulates fields:
    +	 * 	1. Host attribute name
    +	 * 	2. Expected number of unique values for given host attribute
    +	 *	3. A callback coTaskGetter used while evaluating balancing constraint
    +	 */
    +	static class BalancedHostAttrConstraintParams {
    --- End diff --
    
    Please let us make this configuration class immutable.   Mark the `hostAttr` and `numOfExpectedUniqueValues` fields as final.  Move `coTasksGetter` elsewhere.


---

[GitHub] flink issue #4628: [FLINK-7486]:[flink-mesos]:Support for adding unique attr...

Posted by bbayani <gi...@git.apache.org>.
Github user bbayani commented on the issue:

    https://github.com/apache/flink/pull/4628
  
    @EronWright , @tillrohrmann Can you please review this?


---