You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fhueske <gi...@git.apache.org> on 2014/12/10 17:02:10 UTC

[GitHub] incubator-flink pull request: [FLINK-1287] LocalizableSplitAssigne...

GitHub user fhueske opened a pull request:

    https://github.com/apache/incubator-flink/pull/258

    [FLINK-1287] LocalizableSplitAssigner prefers splits with less degrees of freedom

    The current LocalizableSplitAssigner assigns remote and local splits without priorities, i.e., each remote (local) split has the same probability of being assigned.
    With this change, splits are prioritised by the number of hosts they can be locally read from. Splits with fewer options to be locally read are preferred over others that can be locally accessed from many hosts.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fhueske/incubator-flink splitAssignment

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/258.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #258
    
----
commit 8900b91dc50a55d1149482c3f9c10dc3eaa5048c
Author: Fabian Hueske <fh...@apache.org>
Date:   2014-12-07T21:28:22Z

    [FLINK-1287] LocalizableSplitAssigner prefers splits with less degrees of freedom

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1287] LocalizableSplitAssigne...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-flink/pull/258


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1287] LocalizableSplitAssigne...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/258#discussion_r21744870
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java ---
    @@ -36,89 +33,111 @@
     
     /**
      * The locatable input split assigner assigns to each host splits that are local, before assigning
    - * splits that are not local. 
    + * splits that are not local.
      */
     public final class LocatableInputSplitAssigner implements InputSplitAssigner {
     
     	private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitAssigner.class);
     
    +	// unassigned input splits
    +	private final Set<LocatableInputSplitWithCount> unassigned = new HashSet<LocatableInputSplitWithCount>();
    +
    +	// input splits indexed by host for local assignment
    +	private final ConcurrentHashMap<String, LocatableInputSplitChooser> localPerHost = new ConcurrentHashMap<String, LocatableInputSplitChooser>();
    +
    +    // unassigned splits for remote assignment
    +	private final LocatableInputSplitChooser remoteSplitChooser;
     
    -	private final Set<LocatableInputSplit> unassigned = new HashSet<LocatableInputSplit>();
    -	
    -	private final ConcurrentHashMap<String, List<LocatableInputSplit>> localPerHost = new ConcurrentHashMap<String, List<LocatableInputSplit>>();
    -	
     	private int localAssignments;		// lock protected by the unassigned set lock
    -	
    +
     	private int remoteAssignments;		// lock protected by the unassigned set lock
     
     	// --------------------------------------------------------------------------------------------
    -	
    +
     	public LocatableInputSplitAssigner(Collection<LocatableInputSplit> splits) {
    -		this.unassigned.addAll(splits);
    +		for(LocatableInputSplit split : splits) {
    +			this.unassigned.add(new LocatableInputSplitWithCount(split));
    +		}
    +		this.remoteSplitChooser = new LocatableInputSplitChooser(unassigned);
     	}
    -	
    +
     	public LocatableInputSplitAssigner(LocatableInputSplit[] splits) {
    -		Collections.addAll(this.unassigned, splits);
    +		for(LocatableInputSplit split : splits) {
    +			this.unassigned.add(new LocatableInputSplitWithCount(split));
    +		}
    +		this.remoteSplitChooser = new LocatableInputSplitChooser(unassigned);
     	}
    -	
    +
     	// --------------------------------------------------------------------------------------------
     
     	@Override
     	public LocatableInputSplit getNextInputSplit(String host) {
    -		// for a null host, we return an arbitrary split
    +
    +		// for a null host, we return a remote split
     		if (host == null) {
    -			
    -			synchronized (this.unassigned) {
    -				Iterator<LocatableInputSplit> iter = this.unassigned.iterator();
    -				if (iter.hasNext()) {
    -					LocatableInputSplit next = iter.next();
    -					iter.remove();
    -					
    -					if (LOG.isInfoEnabled()) {
    -						LOG.info("Assigning split to null host (random assignment).");
    -					}
    -					
    -					remoteAssignments++;
    -					return next;
    -				} else {
    -					if (LOG.isDebugEnabled()) {
    -						LOG.debug("No more unassigned input splits remaining.");
    +			synchronized (this.remoteSplitChooser) {
    +				synchronized (this.unassigned) {
    +
    +					LocatableInputSplitWithCount split = this.remoteSplitChooser.getNextUnassignedMinLocalCountSplit(this.unassigned);
    +
    +					if (split != null) {
    +						// got a split to assign. Double check that it hasn't been assigned before.
    +						if (this.unassigned.remove(split)) {
    +							if (LOG.isInfoEnabled()) {
    +								LOG.info("Assigning split to null host (random assignment).");
    +							}
    +
    +							remoteAssignments++;
    +							return split.getSplit();
    +						} else {
    +							throw new IllegalStateException("Chosen InputSplit has already been assigned. This should not happen!");
    +						}
    +					} else {
    +						// all splits consumed
    +						if (LOG.isDebugEnabled()) {
    +							LOG.debug("No more unassigned input splits remaining.");
    +						}
    +						return null;
     					}
    -					return null;
     				}
     			}
     		}
    -		
    +
     		host = host.toLowerCase(Locale.US);
    -		
    +
     		// for any non-null host, we take the list of non-null splits
    -		List<LocatableInputSplit> localSplits = this.localPerHost.get(host);
    -		
    +		LocatableInputSplitChooser localSplits = this.localPerHost.get(host);
    +
     		// if we have no list for this host yet, create one
     		if (localSplits == null) {
    -			localSplits = new ArrayList<LocatableInputSplit>(16);
    -			
    +			localSplits = new LocatableInputSplitChooser();
    +
     			// lock the list, to be sure that others have to wait for that host's local list
     			synchronized (localSplits) {
    -				List<LocatableInputSplit> prior = this.localPerHost.putIfAbsent(host, localSplits);
    -				
    +				LocatableInputSplitChooser prior = this.localPerHost.putIfAbsent(host, localSplits);
    +
     				// if someone else beat us in the case to create this list, then we do not populate this one, but
     				// simply work with that other list
     				if (prior == null) {
     					// we are the first, we populate
    -					
    +
     					// first, copy the remaining splits to release the lock on the set early
     					// because that is shared among threads
    -					LocatableInputSplit[] remaining;
    +					LocatableInputSplitWithCount[] remaining;
     					synchronized (this.unassigned) {
    -						remaining = (LocatableInputSplit[]) this.unassigned.toArray(new LocatableInputSplit[this.unassigned.size()]);
    +						remaining = (LocatableInputSplitWithCount[]) this.unassigned.toArray(new LocatableInputSplitWithCount[this.unassigned.size()]);
    --- End diff --
    
    I think the cast is not necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1287] LocalizableSplitAssigne...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/258#issuecomment-66476877
  
    I tested the pull request on a cluster. The split assignment is now much better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1287] LocalizableSplitAssigne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/258#issuecomment-66782412
  
    Thanks for the review! Will merge it now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1287] LocalizableSplitAssigne...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/258#discussion_r21743890
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java ---
    @@ -36,89 +33,111 @@
     
     /**
      * The locatable input split assigner assigns to each host splits that are local, before assigning
    - * splits that are not local. 
    + * splits that are not local.
      */
     public final class LocatableInputSplitAssigner implements InputSplitAssigner {
     
     	private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitAssigner.class);
     
    +	// unassigned input splits
    +	private final Set<LocatableInputSplitWithCount> unassigned = new HashSet<LocatableInputSplitWithCount>();
    +
    +	// input splits indexed by host for local assignment
    +	private final ConcurrentHashMap<String, LocatableInputSplitChooser> localPerHost = new ConcurrentHashMap<String, LocatableInputSplitChooser>();
    +
    +    // unassigned splits for remote assignment
    --- End diff --
    
    indentation is off


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1287] LocalizableSplitAssigne...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/incubator-flink/pull/258#issuecomment-66782368
  
    Changes look good to me.
    
    I haven't tested it on a cluster, but Robert's tests should be fine. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1287] LocalizableSplitAssigne...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/258#discussion_r21745099
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java ---
    @@ -184,15 +209,159 @@ private static final boolean isLocal(String flinkHost, String[] hosts) {
     				return true;
     			}
     		}
    -		
    +
     		return false;
     	}
    -	
    +
     	public int getNumberOfLocalAssignments() {
     		return localAssignments;
     	}
    -	
    +
     	public int getNumberOfRemoteAssignments() {
     		return remoteAssignments;
     	}
    +
    +    /**
    +     * Wraps a LocatableInputSplit and adds a count for the number of observed hosts
    +     * that can access the split locally.
    +     */
    +	public static class LocatableInputSplitWithCount {
    +
    +		private final LocatableInputSplit split;
    +		private int localCount;
    +
    +		public LocatableInputSplitWithCount(LocatableInputSplit split) {
    +			this.split = split;
    +			this.localCount = 0;
    +		}
    +
    +		public void incrementLocalCount() {
    +			this.localCount++;
    +		}
    +
    +		public int getLocalCount() {
    +			return this.localCount;
    +		}
    +
    +		public LocatableInputSplit getSplit() {
    +			return this.split;
    +		}
    +
    +	}
    +
    +	/**
    +	 * Holds a list of LocatableInputSplits and returns the split with the lowest local count.
    +	 * The rational is that splits which are local on few hosts should be preferred over others which
    +     * have more degrees of freedom for local assignment.
    +	 *
    +	 * Internally, the splits are stored in a linked list. Sorting the list is not a good solution,
    +	 * as local counts are updated whenever a previously unseen host requests a split.
    +	 * Instead, we track the minimum local count and iteratively look for splits with that minimum count.
    +	 */
    +	public static class LocatableInputSplitChooser {
    --- End diff --
    
    we could decrease the visibility of this class (would also apply for the other one)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1287] LocalizableSplitAssigne...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/258#discussion_r21744330
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java ---
    @@ -184,15 +209,159 @@ private static final boolean isLocal(String flinkHost, String[] hosts) {
     				return true;
     			}
     		}
    -		
    +
     		return false;
     	}
    -	
    +
     	public int getNumberOfLocalAssignments() {
     		return localAssignments;
     	}
    -	
    +
     	public int getNumberOfRemoteAssignments() {
     		return remoteAssignments;
     	}
    +
    +    /**
    +     * Wraps a LocatableInputSplit and adds a count for the number of observed hosts
    +     * that can access the split locally.
    +     */
    +	public static class LocatableInputSplitWithCount {
    +
    +		private final LocatableInputSplit split;
    +		private int localCount;
    +
    +		public LocatableInputSplitWithCount(LocatableInputSplit split) {
    +			this.split = split;
    +			this.localCount = 0;
    +		}
    +
    +		public void incrementLocalCount() {
    +			this.localCount++;
    +		}
    +
    +		public int getLocalCount() {
    +			return this.localCount;
    +		}
    +
    +		public LocatableInputSplit getSplit() {
    +			return this.split;
    +		}
    +
    +	}
    +
    +	/**
    +	 * Holds a list of LocatableInputSplits and returns the split with the lowest local count.
    +	 * The rational is that splits which are local on few hosts should be preferred over others which
    +     * have more degrees of freedom for local assignment.
    --- End diff --
    
    indentation is off


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1287] LocalizableSplitAssigne...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/258#discussion_r21744326
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java ---
    @@ -184,15 +209,159 @@ private static final boolean isLocal(String flinkHost, String[] hosts) {
     				return true;
     			}
     		}
    -		
    +
     		return false;
     	}
    -	
    +
     	public int getNumberOfLocalAssignments() {
     		return localAssignments;
     	}
    -	
    +
     	public int getNumberOfRemoteAssignments() {
     		return remoteAssignments;
     	}
    +
    +    /**
    +     * Wraps a LocatableInputSplit and adds a count for the number of observed hosts
    --- End diff --
    
    indentation is off


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---