You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/04/30 23:16:31 UTC

[1/2] drill git commit: DRILL-2916: Correctly set minimum per fragment in AssignmentCreator

Repository: drill
Updated Branches:
  refs/heads/master f29444fa2 -> f8a91f14b


DRILL-2916: Correctly set minimum per fragment in AssignmentCreator


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0f6c7ff5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0f6c7ff5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0f6c7ff5

Branch: refs/heads/master
Commit: 0f6c7ff506ba0ead612c77830e4b6daac9a7541c
Parents: f29444f
Author: Steven Phillips <sm...@apache.org>
Authored: Thu Apr 30 11:02:33 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Thu Apr 30 11:06:29 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/store/schedule/AssignmentCreator.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0f6c7ff5/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index bfc104f..1f4d8cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -247,14 +247,14 @@ public class AssignmentCreator<T extends CompleteWork> {
       FragIteratorWrapper wrapper = new FragIteratorWrapper();
       wrapper.iter = Iterators.cycle(mmap.get(endpoint));
       wrapper.maxCount = maxWork * mmap.get(endpoint).size();
-      wrapper.minCount = Math.max((maxWork - 1) * mmap.get(endpoint).size(), 1);
+      wrapper.minCount = Math.max(maxWork - 1, 1) * mmap.get(endpoint).size();
       map.put(endpoint, wrapper);
     }
     return map;
   }
 
   /**
-   * A struct that holds an fragment iterator and keeps track of how many units have been assigned, as well as the maximum
+   * A struct that holds a fragment iterator and keeps track of how many units have been assigned, as well as the maximum
    * number of assignment it will accept
    */
   private static class FragIteratorWrapper {


[2/2] drill git commit: DRILL-2916: Allow system option to revert to old assignment logic

Posted by sm...@apache.org.
DRILL-2916: Allow system option to revert to old assignment logic


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f8a91f14
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f8a91f14
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f8a91f14

Branch: refs/heads/master
Commit: f8a91f14bcd8acc61de417b29d75cbb85789da25
Parents: 0f6c7ff
Author: Steven Phillips <sm...@apache.org>
Authored: Thu Apr 30 12:34:54 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Thu Apr 30 13:57:48 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   3 +
 .../server/options/SystemOptionManager.java     |   1 +
 .../exec/store/dfs/easy/EasyGroupScan.java      |   2 +-
 .../exec/store/parquet/ParquetGroupScan.java    |   2 +-
 .../exec/store/schedule/AssignmentCreator.java  |  13 +-
 .../store/schedule/OldAssignmentCreator.java    | 141 +++++++++++++++++++
 .../drill/exec/store/store/TestAssignment.java  |   2 +-
 7 files changed, 158 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f8a91f14/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 633f571..9f87b0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -227,4 +227,7 @@ public interface ExecConstants {
   public static final String NEW_VIEW_DEFAULT_PERMS_KEY = "new_view_default_permissions";
   public static final OptionValidator NEW_VIEW_DEFAULT_PERMS_VALIDATOR =
       new StringValidator(NEW_VIEW_DEFAULT_PERMS_KEY, "700");
+
+  public static final String USE_OLD_ASSIGNMENT_CREATOR = "exec.schedule.assignment.old";
+  public static final OptionValidator USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR = new BooleanValidator(USE_OLD_ASSIGNMENT_CREATOR, false);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8a91f14/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 9127c7c..33b2a4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -100,6 +100,7 @@ public class SystemOptionManager extends BaseOptionManager {
       ExecConstants.HASH_AGG_TABLE_FACTOR,
       ExecConstants.AVERAGE_FIELD_WIDTH,
       ExecConstants.NEW_VIEW_DEFAULT_PERMS_VALIDATOR,
+      ExecConstants.USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR,
       QueryClassLoader.JAVA_COMPILER_VALIDATOR,
       QueryClassLoader.JAVA_COMPILER_JANINO_MAXSIZE,
       QueryClassLoader.JAVA_COMPILER_DEBUG,

http://git-wip-us.apache.org/repos/asf/drill/blob/f8a91f14/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 1b333ac..39dc073 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -183,7 +183,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
 
   @Override
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
-    mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
+    mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks, formatPlugin.getContext());
   }
 
   private void createMappings(List<EndpointAffinity> affinities) {

http://git-wip-us.apache.org/repos/asf/drill/blob/f8a91f14/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 37f19ac..cf39518 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -384,7 +384,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   @Override
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) throws PhysicalOperatorSetupException {
 
-    this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos);
+    this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos, formatPlugin.getContext());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/f8a91f14/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index 1f4d8cb..e76826d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -30,12 +30,14 @@ import com.carrotsearch.hppc.cursors.ObjectLongCursor;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Maps;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.server.DrillbitContext;
 
 /**
  * The AssignmentCreator is responsible for assigning a set of work units to the available slices.
@@ -89,9 +91,14 @@ public class AssignmentCreator<T extends CompleteWork> {
    * @param units the list of work units to be assigned
    * @return A multimap that maps each minor fragment id to a list of work units
    */
-  public static <T extends CompleteWork> ListMultimap<Integer,T> getMappings(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
-    AssignmentCreator<T> creator = new AssignmentCreator(incomingEndpoints, units);
-    return creator.getMappings();
+  public static <T extends CompleteWork> ListMultimap<Integer,T> getMappings(List<DrillbitEndpoint> incomingEndpoints, List<T> units, DrillbitContext context) {
+    boolean useOldAssignmentCode = context == null ? false : context.getOptionManager().getOption(ExecConstants.USE_OLD_ASSIGNMENT_CREATOR).bool_val;
+    if (useOldAssignmentCode) {
+      return OldAssignmentCreator.getMappings(incomingEndpoints, units);
+    } else {
+      AssignmentCreator<T> creator = new AssignmentCreator(incomingEndpoints, units);
+      return creator.getMappings();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/f8a91f14/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
new file mode 100644
index 0000000..858cfef
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+
+/**
+ * The OldAssignmentCreator is responsible for assigning a set of work units to the available slices.
+ */
+public class OldAssignmentCreator<T extends CompleteWork> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssignmentCreator.class);
+
+  static final double[] ASSIGNMENT_CUTOFFS = { 0.99, 0.50, 0.25, 0.00 };
+  private final ArrayListMultimap<Integer, T> mappings;
+  private final List<DrillbitEndpoint> endpoints;
+
+
+
+
+  /**
+   * Given a set of endpoints to assign work to, attempt to evenly assign work based on affinity of work units to
+   * Drillbits.
+   *
+   * @param incomingEndpoints
+   *          The set of nodes to assign work to. Note that nodes can be listed multiple times if we want to have
+   *          multiple slices on a node working on the task simultaneously.
+   * @param units
+   *          The work units to assign.
+   * @return ListMultimap of Integer > List<CompleteWork> (based on their incoming order) to with
+   */
+  public static <T extends CompleteWork> ListMultimap<Integer, T> getMappings(List<DrillbitEndpoint> incomingEndpoints,
+                                                                              List<T> units) {
+    OldAssignmentCreator<T> creator = new OldAssignmentCreator<T>(incomingEndpoints, units);
+    return creator.mappings;
+  }
+
+   OldAssignmentCreator(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
+    logger.debug("Assigning {} units to {} endpoints", units.size(), incomingEndpoints.size());
+    Stopwatch watch = new Stopwatch();
+
+    Preconditions.checkArgument(incomingEndpoints.size() <= units.size(), String.format("Incoming endpoints %d "
+        + "is greater than number of row groups %d", incomingEndpoints.size(), units.size()));
+    this.mappings = ArrayListMultimap.create();
+    this.endpoints = Lists.newLinkedList(incomingEndpoints);
+
+    ArrayList<T> rowGroupList = new ArrayList<>(units);
+    for (double cutoff : ASSIGNMENT_CUTOFFS) {
+      scanAndAssign(rowGroupList, cutoff, false, false);
+    }
+    scanAndAssign(rowGroupList, 0.0, true, false);
+    scanAndAssign(rowGroupList, 0.0, true, true);
+
+    logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS));
+    Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned");
+    Preconditions.checkState(!units.isEmpty());
+
+  }
+
+  /**
+   *
+   * @param mappings
+   *          the mapping between fragment/endpoint and rowGroup
+   * @param endpoints
+   *          the list of drillbits, ordered by the corresponding fragment
+   * @param workunits
+   *          the list of rowGroups to assign
+   * @param requiredPercentage
+   *          the percentage of max bytes required to make an assignment
+   * @param assignAll
+   *          if true, will assign even if no affinity
+   */
+  private void scanAndAssign(List<T> workunits, double requiredPercentage, boolean assignAllToEmpty, boolean assignAll) {
+    Collections.sort(workunits);
+    int fragmentPointer = 0;
+    final boolean requireAffinity = requiredPercentage > 0;
+    int maxAssignments = (int) (workunits.size() / endpoints.size());
+
+    if (maxAssignments < 1) {
+      maxAssignments = 1;
+    }
+
+    for (Iterator<T> iter = workunits.iterator(); iter.hasNext();) {
+      T unit = iter.next();
+      for (int i = 0; i < endpoints.size(); i++) {
+        int minorFragmentId = (fragmentPointer + i) % endpoints.size();
+        DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
+        EndpointByteMap endpointByteMap = unit.getByteMap();
+        boolean haveAffinity = endpointByteMap.isSet(currentEndpoint);
+
+        if (assignAll
+            || (assignAllToEmpty && !mappings.containsKey(minorFragmentId))
+            || (!endpointByteMap.isEmpty() && (!requireAffinity || haveAffinity)
+            && (!mappings.containsKey(minorFragmentId) || mappings.get(minorFragmentId).size() < maxAssignments) && (!requireAffinity || endpointByteMap
+            .get(currentEndpoint) >= endpointByteMap.getMaxBytes() * requiredPercentage))) {
+
+          mappings.put(minorFragmentId, unit);
+          logger.debug("Assigned unit: {} to minorFragmentId: {}", unit, minorFragmentId);
+          // logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(),
+          // minorFragmentId, endpoints.get(minorFragmentId).getAddress());
+          // if (bytesPerEndpoint.get(currentEndpoint) != null) {
+          // // assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) / rowGroupInfo.getLength());
+          // } else {
+          // // assignmentAffinityStats.update(0);
+          // }
+          iter.remove();
+          fragmentPointer = (minorFragmentId + 1) % endpoints.size();
+          break;
+        }
+      }
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8a91f14/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
index 65d8cf7..1efc793 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
@@ -63,7 +63,7 @@ public class TestAssignment {
       incomingEndpoints.add(incomingEndpointsIterator.next());
     }
 
-    ListMultimap<Integer, CompleteFileWork> mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
+    ListMultimap<Integer, CompleteFileWork> mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks, null);
     System.out.println(mappings.keySet().size());
     for (int i = 0; i < width; i++) {
       Assert.assertTrue("no mapping for entry " + i, mappings.get(i) != null && mappings.get(i).size() > 0);