You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/04/28 22:20:56 UTC

drill git commit: DRILL-2725: Faster work assignment logic

Repository: drill
Updated Branches:
  refs/heads/master 927d1998f -> 5f1d6d7bb


DRILL-2725: Faster work assignment logic


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5f1d6d7b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5f1d6d7b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5f1d6d7b

Branch: refs/heads/master
Commit: 5f1d6d7bbe58d4e40abf911483fa1f24ecd71050
Parents: 927d199
Author: Steven Phillips <sp...@maprtech.com>
Authored: Thu Apr 2 23:37:16 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue Apr 28 11:52:21 2015 -0700

----------------------------------------------------------------------
 .../exec/store/schedule/AssignmentCreator.java  | 248 +++++++++++++------
 1 file changed, 170 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5f1d6d7b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index 7e9c4c9..b5eee5d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -17,15 +17,20 @@
  */
 package org.apache.drill.exec.store.schedule;
 
-import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
+import com.carrotsearch.hppc.cursors.ObjectLongCursor;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
-import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
@@ -37,104 +42,191 @@ import com.google.common.collect.Lists;
 public class AssignmentCreator<T extends CompleteWork> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssignmentCreator.class);
 
-  static final double[] ASSIGNMENT_CUTOFFS = { 0.99, 0.50, 0.25, 0.00 };
-  private final ArrayListMultimap<Integer, T> mappings;
-  private final List<DrillbitEndpoint> endpoints;
 
+  /**
+   * Comparator used to sort in order of decreasing affinity
+   */
+  private static Comparator<Entry<DrillbitEndpoint,Long>> comparator = new Comparator<Entry<DrillbitEndpoint,Long>>() {
+    @Override
+    public int compare(Entry<DrillbitEndpoint, Long> o1, Entry<DrillbitEndpoint,Long> o2) {
+      return (int) (o1.getValue() - o2.getValue());
+    }
+  };
+
+  /**
+   * the maximum number of work units to assign to any minor fragment
+   */
+  private int maxWork;
+
+  /**
+   * The units of work to be assigned
+   */
+  private List<T> units;
+
+  /**
+   * A list of DrillbitEndpoints, where the index in the list corresponds to the minor fragment id
+   */
+  private List<DrillbitEndpoint> incomingEndpoints;
 
+  private AssignmentCreator(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
+    this.incomingEndpoints = incomingEndpoints;
+    this.units = units;
+  }
 
   /**
-   * Given a set of endpoints to assign work to, attempt to evenly assign work based on affinity of work units to
-   * Drillbits.
+   * Assign each unit of work to a minor fragment, given that a list of DrillbitEndpoints, whose index in the list correspond
+   * to the minor fragment id for each fragment. A given DrillbitEndpoint can appear multiple times in this list. This method
+   * will try to assign work based on the affinity of each work unit, but will also evenly distribute the work units among
+   * all of the minor fragments
    *
-   * @param incomingEndpoints
-   *          The set of nodes to assign work to. Note that nodes can be listed multiple times if we want to have
-   *          multiple slices on a node working on the task simultaneously.
-   * @param units
-   *          The work units to assign.
-   * @return ListMultimap of Integer > List<CompleteWork> (based on their incoming order) to with
+   * @param incomingEndpoints The list of incomingEndpoints, indexed by minor fragment id
+   * @param units the list of work units to be assigned
+   * @return A multimap that maps each minor fragment id to a list of work units
    */
-  public static <T extends CompleteWork> ListMultimap<Integer, T> getMappings(List<DrillbitEndpoint> incomingEndpoints,
-      List<T> units) {
-    AssignmentCreator<T> creator = new AssignmentCreator<T>(incomingEndpoints, units);
-    return creator.mappings;
+  public static <T extends CompleteWork> ListMultimap<Integer,T> getMappings(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
+    AssignmentCreator<T> creator = new AssignmentCreator(incomingEndpoints, units);
+    return creator.getMappings();
   }
 
-  private AssignmentCreator(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
-    logger.debug("Assigning {} units to {} endpoints", units.size(), incomingEndpoints.size());
+  /**
+   * Does the work of creating the mappings for this AssignmentCreator
+   * @return the minor fragment id to work units mapping
+   */
+  private ListMultimap<Integer, T> getMappings() {
     Stopwatch watch = new Stopwatch();
+    watch.start();
+    maxWork = (int) Math.ceil(units.size() / ((float) incomingEndpoints.size()));
+    LinkedList<WorkEndpointListPair<T>> workList = getWorkList();
+    LinkedList<WorkEndpointListPair<T>> unassignedWorkList = Lists.newLinkedList();
+    Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators = getEndpointIterators();
+    ArrayListMultimap<Integer, T> mappings = ArrayListMultimap.create();
+
+    outer: for (WorkEndpointListPair<T> workPair : workList) {
+      List<DrillbitEndpoint> endpoints = workPair.sortedEndpoints;
+      for (DrillbitEndpoint endpoint : endpoints) {
+        FragIteratorWrapper iteratorWrapper = endpointIterators.get(endpoint);
+        if (iteratorWrapper == null) {
+          continue;
+        }
+        if (iteratorWrapper.count < iteratorWrapper.maxCount) {
+          Integer assignment = iteratorWrapper.iter.next();
+          iteratorWrapper.count++;
+          mappings.put(assignment, workPair.work);
+          continue outer;
+        }
+      }
+      unassignedWorkList.add(workPair);
+    }
 
-    Preconditions.checkArgument(incomingEndpoints.size() <= units.size(), String.format("Incoming endpoints %d "
-        + "is greater than number of row groups %d", incomingEndpoints.size(), units.size()));
-    this.mappings = ArrayListMultimap.create();
-    this.endpoints = Lists.newLinkedList(incomingEndpoints);
-
-    ArrayList<T> rowGroupList = new ArrayList<>(units);
-    for (double cutoff : ASSIGNMENT_CUTOFFS) {
-      scanAndAssign(rowGroupList, cutoff, false, false);
+    outer: for (FragIteratorWrapper iteratorWrapper : endpointIterators.values()) {
+      while (iteratorWrapper.count < iteratorWrapper.maxCount) {
+        WorkEndpointListPair<T> workPair = unassignedWorkList.poll();
+        if (workPair == null) {
+          break outer;
+        }
+        Integer assignment = iteratorWrapper.iter.next();
+        iteratorWrapper.count++;
+        mappings.put(assignment, workPair.work);
+      }
     }
-    scanAndAssign(rowGroupList, 0.0, true, false);
-    scanAndAssign(rowGroupList, 0.0, true, true);
 
-    logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS));
-    Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned");
-    Preconditions.checkState(!units.isEmpty());
+    logger.debug("Took {} ms to assign {} work units to {} fragments", watch.elapsed(TimeUnit.MILLISECONDS), units.size(), incomingEndpoints.size());
+    return mappings;
+  }
 
+  /**
+   * Builds the list of WorkEndpointListPairs, which pair a work unit with a list of endpoints sorted by affinity
+   * @return the list of WorkEndpointListPairs
+   */
+  private LinkedList<WorkEndpointListPair<T>> getWorkList() {
+    Stopwatch watch = new Stopwatch();
+    watch.start();
+    LinkedList<WorkEndpointListPair<T>> workList = Lists.newLinkedList();
+    for (T work : units) {
+      List<Map.Entry<DrillbitEndpoint,Long>> entries = Lists.newArrayList();
+      for (ObjectLongCursor<DrillbitEndpoint> cursor : work.getByteMap()) {
+        final DrillbitEndpoint ep = cursor.key;
+        final Long val = cursor.value;
+        Map.Entry<DrillbitEndpoint,Long> entry = new Entry() {
+
+          @Override
+          public Object getKey() {
+            return ep;
+          }
+
+          @Override
+          public Object getValue() {
+            return val;
+          }
+
+          @Override
+          public Object setValue(Object value) {
+            throw new UnsupportedOperationException();
+          }
+        };
+        entries.add(entry);
+      }
+      Collections.sort(entries, comparator);
+      List<DrillbitEndpoint> sortedEndpoints = Lists.newArrayList();
+      for (Entry<DrillbitEndpoint,Long> entry : entries) {
+        sortedEndpoints.add(entry.getKey());
+      }
+      workList.add(new WorkEndpointListPair<T>(work, sortedEndpoints));
+    }
+    return workList;
   }
 
   /**
-   *
-   * @param mappings
-   *          the mapping between fragment/endpoint and rowGroup
-   * @param endpoints
-   *          the list of drillbits, ordered by the corresponding fragment
-   * @param workunits
-   *          the list of rowGroups to assign
-   * @param requiredPercentage
-   *          the percentage of max bytes required to make an assignment
-   * @param assignAll
-   *          if true, will assign even if no affinity
+   *  A wrapper class around a work unit and its associated sort list of Endpoints (sorted by affinity in decreasing order)
    */
-  private void scanAndAssign(List<T> workunits, double requiredPercentage, boolean assignAllToEmpty, boolean assignAll) {
-    Collections.sort(workunits);
-    int fragmentPointer = 0;
-    final boolean requireAffinity = requiredPercentage > 0;
-    int maxAssignments = (int) (workunits.size() / endpoints.size());
-
-    if (maxAssignments < 1) {
-      maxAssignments = 1;
+  private static class WorkEndpointListPair<T> {
+    T work;
+    List<DrillbitEndpoint> sortedEndpoints;
+
+    WorkEndpointListPair(T work, List<DrillbitEndpoint> sortedEndpoints) {
+      this.work = work;
+      this.sortedEndpoints = sortedEndpoints;
     }
+  }
 
-    for (Iterator<T> iter = workunits.iterator(); iter.hasNext();) {
-      T unit = iter.next();
-      for (int i = 0; i < endpoints.size(); i++) {
-        int minorFragmentId = (fragmentPointer + i) % endpoints.size();
-        DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
-        EndpointByteMap endpointByteMap = unit.getByteMap();
-        boolean haveAffinity = endpointByteMap.isSet(currentEndpoint);
-
-        if (assignAll
-            || (assignAllToEmpty && !mappings.containsKey(minorFragmentId))
-            || (!endpointByteMap.isEmpty() && (!requireAffinity || haveAffinity)
-                && (!mappings.containsKey(minorFragmentId) || mappings.get(minorFragmentId).size() < maxAssignments) && (!requireAffinity || endpointByteMap
-                .get(currentEndpoint) >= endpointByteMap.getMaxBytes() * requiredPercentage))) {
-
-          mappings.put(minorFragmentId, unit);
-          logger.debug("Assigned unit: {} to minorFragmentId: {}", unit, minorFragmentId);
-          // logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(),
-          // minorFragmentId, endpoints.get(minorFragmentId).getAddress());
-          // if (bytesPerEndpoint.get(currentEndpoint) != null) {
-          // // assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) / rowGroupInfo.getLength());
-          // } else {
-          // // assignmentAffinityStats.update(0);
-          // }
-          iter.remove();
-          fragmentPointer = (minorFragmentId + 1) % endpoints.size();
-          break;
-        }
+  /**
+   * Groups minor fragments together by corresponding endpoint, and creates an iterator that can be used to evenly
+   * distribute work assigned to a given endpoint to all corresponding minor fragments evenly
+   *
+   * @return
+   */
+  private Map<DrillbitEndpoint,FragIteratorWrapper> getEndpointIterators() {
+    Stopwatch watch = new Stopwatch();
+    watch.start();
+    Map<DrillbitEndpoint,FragIteratorWrapper> map = Maps.newLinkedHashMap();
+    Map<DrillbitEndpoint,List<Integer>> mmap = Maps.newLinkedHashMap();
+    for (int i = 0; i < incomingEndpoints.size(); i++) {
+      DrillbitEndpoint endpoint = incomingEndpoints.get(i);
+      List<Integer> intList = mmap.get(incomingEndpoints.get(i));
+      if (intList == null) {
+        intList = Lists.newArrayList();
       }
+      intList.add(Integer.valueOf(i));
+      mmap.put(endpoint, intList);
+    }
 
+    for (DrillbitEndpoint endpoint : mmap.keySet()) {
+      FragIteratorWrapper wrapper = new FragIteratorWrapper();
+      wrapper.iter = Iterators.cycle(mmap.get(endpoint));
+      wrapper.maxCount = maxWork * mmap.get(endpoint).size();
+      map.put(endpoint, wrapper);
     }
+    return map;
+  }
+
+  /**
+   * A struct that holds an fragment iterator and keeps track of how many units have been assigned, as well as the maximum
+   * number of assignment it will accept
+   */
+  private static class FragIteratorWrapper {
+    int count = 0;
+    int maxCount;
+    Iterator<Integer> iter;
   }
 
 }