You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/04/30 23:16:31 UTC
[1/2] drill git commit: DRILL-2916: Correctly set minimum per
fragment in AssignmentCreator
Repository: drill
Updated Branches:
refs/heads/master f29444fa2 -> f8a91f14b
DRILL-2916: Correctly set minimum per fragment in AssignmentCreator
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0f6c7ff5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0f6c7ff5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0f6c7ff5
Branch: refs/heads/master
Commit: 0f6c7ff506ba0ead612c77830e4b6daac9a7541c
Parents: f29444f
Author: Steven Phillips <sm...@apache.org>
Authored: Thu Apr 30 11:02:33 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Thu Apr 30 11:06:29 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/store/schedule/AssignmentCreator.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/0f6c7ff5/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index bfc104f..1f4d8cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -247,14 +247,14 @@ public class AssignmentCreator<T extends CompleteWork> {
FragIteratorWrapper wrapper = new FragIteratorWrapper();
wrapper.iter = Iterators.cycle(mmap.get(endpoint));
wrapper.maxCount = maxWork * mmap.get(endpoint).size();
- wrapper.minCount = Math.max((maxWork - 1) * mmap.get(endpoint).size(), 1);
+ wrapper.minCount = Math.max(maxWork - 1, 1) * mmap.get(endpoint).size();
map.put(endpoint, wrapper);
}
return map;
}
/**
- * A struct that holds an fragment iterator and keeps track of how many units have been assigned, as well as the maximum
+ * A struct that holds a fragment iterator and keeps track of how many units have been assigned, as well as the maximum
* number of assignment it will accept
*/
private static class FragIteratorWrapper {
[2/2] drill git commit: DRILL-2916: Allow system option to revert to
old assignment logic
Posted by sm...@apache.org.
DRILL-2916: Allow system option to revert to old assignment logic
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f8a91f14
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f8a91f14
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f8a91f14
Branch: refs/heads/master
Commit: f8a91f14bcd8acc61de417b29d75cbb85789da25
Parents: 0f6c7ff
Author: Steven Phillips <sm...@apache.org>
Authored: Thu Apr 30 12:34:54 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Thu Apr 30 13:57:48 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 3 +
.../server/options/SystemOptionManager.java | 1 +
.../exec/store/dfs/easy/EasyGroupScan.java | 2 +-
.../exec/store/parquet/ParquetGroupScan.java | 2 +-
.../exec/store/schedule/AssignmentCreator.java | 13 +-
.../store/schedule/OldAssignmentCreator.java | 141 +++++++++++++++++++
.../drill/exec/store/store/TestAssignment.java | 2 +-
7 files changed, 158 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/f8a91f14/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 633f571..9f87b0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -227,4 +227,7 @@ public interface ExecConstants {
public static final String NEW_VIEW_DEFAULT_PERMS_KEY = "new_view_default_permissions";
public static final OptionValidator NEW_VIEW_DEFAULT_PERMS_VALIDATOR =
new StringValidator(NEW_VIEW_DEFAULT_PERMS_KEY, "700");
+
+ public static final String USE_OLD_ASSIGNMENT_CREATOR = "exec.schedule.assignment.old";
+ public static final OptionValidator USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR = new BooleanValidator(USE_OLD_ASSIGNMENT_CREATOR, false);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8a91f14/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 9127c7c..33b2a4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -100,6 +100,7 @@ public class SystemOptionManager extends BaseOptionManager {
ExecConstants.HASH_AGG_TABLE_FACTOR,
ExecConstants.AVERAGE_FIELD_WIDTH,
ExecConstants.NEW_VIEW_DEFAULT_PERMS_VALIDATOR,
+ ExecConstants.USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR,
QueryClassLoader.JAVA_COMPILER_VALIDATOR,
QueryClassLoader.JAVA_COMPILER_JANINO_MAXSIZE,
QueryClassLoader.JAVA_COMPILER_DEBUG,
http://git-wip-us.apache.org/repos/asf/drill/blob/f8a91f14/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 1b333ac..39dc073 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -183,7 +183,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
- mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
+ mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks, formatPlugin.getContext());
}
private void createMappings(List<EndpointAffinity> affinities) {
http://git-wip-us.apache.org/repos/asf/drill/blob/f8a91f14/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 37f19ac..cf39518 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -384,7 +384,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) throws PhysicalOperatorSetupException {
- this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos);
+ this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos, formatPlugin.getContext());
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/f8a91f14/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index 1f4d8cb..e76826d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -30,12 +30,14 @@ import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.server.DrillbitContext;
/**
* The AssignmentCreator is responsible for assigning a set of work units to the available slices.
@@ -89,9 +91,14 @@ public class AssignmentCreator<T extends CompleteWork> {
* @param units the list of work units to be assigned
* @return A multimap that maps each minor fragment id to a list of work units
*/
- public static <T extends CompleteWork> ListMultimap<Integer,T> getMappings(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
- AssignmentCreator<T> creator = new AssignmentCreator(incomingEndpoints, units);
- return creator.getMappings();
+ public static <T extends CompleteWork> ListMultimap<Integer,T> getMappings(List<DrillbitEndpoint> incomingEndpoints, List<T> units, DrillbitContext context) {
+ boolean useOldAssignmentCode = context == null ? false : context.getOptionManager().getOption(ExecConstants.USE_OLD_ASSIGNMENT_CREATOR).bool_val;
+ if (useOldAssignmentCode) {
+ return OldAssignmentCreator.getMappings(incomingEndpoints, units);
+ } else {
+ AssignmentCreator<T> creator = new AssignmentCreator(incomingEndpoints, units);
+ return creator.getMappings();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/f8a91f14/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
new file mode 100644
index 0000000..858cfef
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+
+/**
+ * The OldAssignmentCreator is responsible for assigning a set of work units to the available slices.
+ */
+public class OldAssignmentCreator<T extends CompleteWork> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssignmentCreator.class);
+
+ static final double[] ASSIGNMENT_CUTOFFS = { 0.99, 0.50, 0.25, 0.00 };
+ private final ArrayListMultimap<Integer, T> mappings;
+ private final List<DrillbitEndpoint> endpoints;
+
+
+
+
+ /**
+ * Given a set of endpoints to assign work to, attempt to evenly assign work based on affinity of work units to
+ * Drillbits.
+ *
+ * @param incomingEndpoints
+ * The set of nodes to assign work to. Note that nodes can be listed multiple times if we want to have
+ * multiple slices on a node working on the task simultaneously.
+ * @param units
+ * The work units to assign.
+ * @return ListMultimap of Integer > List<CompleteWork> (based on their incoming order) to with
+ */
+ public static <T extends CompleteWork> ListMultimap<Integer, T> getMappings(List<DrillbitEndpoint> incomingEndpoints,
+ List<T> units) {
+ OldAssignmentCreator<T> creator = new OldAssignmentCreator<T>(incomingEndpoints, units);
+ return creator.mappings;
+ }
+
+ OldAssignmentCreator(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
+ logger.debug("Assigning {} units to {} endpoints", units.size(), incomingEndpoints.size());
+ Stopwatch watch = new Stopwatch();
+
+ Preconditions.checkArgument(incomingEndpoints.size() <= units.size(), String.format("Incoming endpoints %d "
+ + "is greater than number of row groups %d", incomingEndpoints.size(), units.size()));
+ this.mappings = ArrayListMultimap.create();
+ this.endpoints = Lists.newLinkedList(incomingEndpoints);
+
+ ArrayList<T> rowGroupList = new ArrayList<>(units);
+ for (double cutoff : ASSIGNMENT_CUTOFFS) {
+ scanAndAssign(rowGroupList, cutoff, false, false);
+ }
+ scanAndAssign(rowGroupList, 0.0, true, false);
+ scanAndAssign(rowGroupList, 0.0, true, true);
+
+ logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS));
+ Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned");
+ Preconditions.checkState(!units.isEmpty());
+
+ }
+
+ /**
+ *
+ * @param mappings
+ * the mapping between fragment/endpoint and rowGroup
+ * @param endpoints
+ * the list of drillbits, ordered by the corresponding fragment
+ * @param workunits
+ * the list of rowGroups to assign
+ * @param requiredPercentage
+ * the percentage of max bytes required to make an assignment
+ * @param assignAll
+ * if true, will assign even if no affinity
+ */
+ private void scanAndAssign(List<T> workunits, double requiredPercentage, boolean assignAllToEmpty, boolean assignAll) {
+ Collections.sort(workunits);
+ int fragmentPointer = 0;
+ final boolean requireAffinity = requiredPercentage > 0;
+ int maxAssignments = (int) (workunits.size() / endpoints.size());
+
+ if (maxAssignments < 1) {
+ maxAssignments = 1;
+ }
+
+ for (Iterator<T> iter = workunits.iterator(); iter.hasNext();) {
+ T unit = iter.next();
+ for (int i = 0; i < endpoints.size(); i++) {
+ int minorFragmentId = (fragmentPointer + i) % endpoints.size();
+ DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
+ EndpointByteMap endpointByteMap = unit.getByteMap();
+ boolean haveAffinity = endpointByteMap.isSet(currentEndpoint);
+
+ if (assignAll
+ || (assignAllToEmpty && !mappings.containsKey(minorFragmentId))
+ || (!endpointByteMap.isEmpty() && (!requireAffinity || haveAffinity)
+ && (!mappings.containsKey(minorFragmentId) || mappings.get(minorFragmentId).size() < maxAssignments) && (!requireAffinity || endpointByteMap
+ .get(currentEndpoint) >= endpointByteMap.getMaxBytes() * requiredPercentage))) {
+
+ mappings.put(minorFragmentId, unit);
+ logger.debug("Assigned unit: {} to minorFragmentId: {}", unit, minorFragmentId);
+ // logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(),
+ // minorFragmentId, endpoints.get(minorFragmentId).getAddress());
+ // if (bytesPerEndpoint.get(currentEndpoint) != null) {
+ // // assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) / rowGroupInfo.getLength());
+ // } else {
+ // // assignmentAffinityStats.update(0);
+ // }
+ iter.remove();
+ fragmentPointer = (minorFragmentId + 1) % endpoints.size();
+ break;
+ }
+ }
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8a91f14/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
index 65d8cf7..1efc793 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
@@ -63,7 +63,7 @@ public class TestAssignment {
incomingEndpoints.add(incomingEndpointsIterator.next());
}
- ListMultimap<Integer, CompleteFileWork> mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
+ ListMultimap<Integer, CompleteFileWork> mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks, null);
System.out.println(mappings.keySet().size());
for (int i = 0; i < width; i++) {
Assert.assertTrue("no mapping for entry " + i, mappings.get(i) != null && mappings.get(i).size() > 0);