You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2016/04/13 20:27:41 UTC
[1/2] drill git commit: DRILL-4593: Remove OldAssignmentCreator in
FileSystemPlugin
Repository: drill
Updated Branches:
refs/heads/master 9f4fff800 -> e9b6e8f3d
DRILL-4593: Remove OldAssignmentCreator in FileSystemPlugin
+ Remove dead code in ParquetGroupScan
this closes #473
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e9b6e8f3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e9b6e8f3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e9b6e8f3
Branch: refs/heads/master
Commit: e9b6e8f3ddadbd308b85ed6d88bcf878147ee77e
Parents: 10afc70
Author: vkorukanti <ve...@dremio.com>
Authored: Thu Apr 7 14:23:07 2016 -0700
Committer: vkorukanti <ve...@dremio.com>
Committed: Wed Apr 13 10:36:21 2016 -0700
----------------------------------------------------------------------
.../drill/exec/store/kudu/KuduGroupScan.java | 2 +-
.../org/apache/drill/exec/ExecConstants.java | 3 -
.../server/options/SystemOptionManager.java | 1 -
.../exec/store/dfs/easy/EasyGroupScan.java | 2 +-
.../exec/store/parquet/ParquetGroupScan.java | 39 +----
.../exec/store/schedule/AssignmentCreator.java | 13 +-
.../store/schedule/OldAssignmentCreator.java | 141 -------------------
.../drill/exec/store/store/TestAssignment.java | 2 +-
8 files changed, 7 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
index ff4295d..873f216 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
@@ -190,7 +190,7 @@ public class KuduGroupScan extends AbstractGroupScan {
*/
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
- assignments = AssignmentCreator.getMappings(incomingEndpoints, kuduWorkList, storagePlugin.getContext());
+ assignments = AssignmentCreator.getMappings(incomingEndpoints, kuduWorkList);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 963934d..a490116 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -267,9 +267,6 @@ public interface ExecConstants {
OptionValidator NEW_VIEW_DEFAULT_PERMS_VALIDATOR =
new StringValidator(NEW_VIEW_DEFAULT_PERMS_KEY, "700");
- String USE_OLD_ASSIGNMENT_CREATOR = "exec.schedule.assignment.old";
- OptionValidator USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR = new BooleanValidator(USE_OLD_ASSIGNMENT_CREATOR, false);
-
String CTAS_PARTITIONING_HASH_DISTRIBUTE = "store.partition.hash_distribute";
BooleanValidator CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR = new BooleanValidator(CTAS_PARTITIONING_HASH_DISTRIBUTE, false);
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a596d3a..0abdb76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -124,7 +124,6 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
ExecConstants.HASH_AGG_TABLE_FACTOR,
ExecConstants.AVERAGE_FIELD_WIDTH,
ExecConstants.NEW_VIEW_DEFAULT_PERMS_VALIDATOR,
- ExecConstants.USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR,
ExecConstants.CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR,
ExecConstants.ADMIN_USERS_VALIDATOR,
ExecConstants.ADMIN_USER_GROUPS_VALIDATOR,
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index ebea2f4..7a80db3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -195,7 +195,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
- mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks, formatPlugin.getContext());
+ mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
}
private void createMappings(List<EndpointAffinity> affinities) {
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 47172cc..5950b74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -34,7 +34,6 @@ import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
@@ -46,7 +45,6 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.ParquetOutputRecordWriter;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.TimedRunnable;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.DrillPathFilter;
import org.apache.drill.exec.store.dfs.FileSelection;
@@ -59,7 +57,6 @@ import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadataBase;
import org.apache.drill.exec.store.parquet.Metadata.RowGroupMetadata;
import org.apache.drill.exec.store.schedule.AffinityCreator;
import org.apache.drill.exec.store.schedule.AssignmentCreator;
-import org.apache.drill.exec.store.schedule.BlockMapBuilder;
import org.apache.drill.exec.store.schedule.CompleteWork;
import org.apache.drill.exec.store.schedule.EndpointByteMap;
import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
@@ -87,14 +84,12 @@ import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.joda.time.DateTimeUtils;
-import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
@@ -104,12 +99,8 @@ import com.google.common.collect.Sets;
@JsonTypeName("parquet-scan")
public class ParquetGroupScan extends AbstractFileGroupScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
- static final MetricRegistry metrics = DrillMetrics.getInstance();
- static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter");
-
private final List<ReadEntryWithPath> entries;
- private final Stopwatch watch = Stopwatch.createUnstarted();
private final ParquetFormatPlugin formatPlugin;
private final ParquetFormatConfig formatConfig;
private final DrillFileSystem fs;
@@ -716,8 +707,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
if (column.getNulls() != null) {
Long newCount = rowCount - column.getNulls();
columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount);
- } else {
-
}
}
} else {
@@ -790,36 +779,10 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
}
}
- private class BlockMapper extends TimedRunnable<Void> {
- private final BlockMapBuilder bmb;
- private final RowGroupInfo rgi;
-
- public BlockMapper(BlockMapBuilder bmb, RowGroupInfo rgi) {
- super();
- this.bmb = bmb;
- this.rgi = rgi;
- }
-
- @Override
- protected Void runInner() throws Exception {
- EndpointByteMap ebm = bmb.getEndpointByteMap(rgi);
- rgi.setEndpointByteMap(ebm);
- return null;
- }
-
- @Override
- protected IOException convertToIOException(Exception e) {
- return new IOException(String.format(
- "Failure while trying to get block locations for file %s starting at %d.", rgi.getPath(),
- rgi.getStart()));
- }
-
- }
-
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) throws PhysicalOperatorSetupException {
- this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos, formatPlugin.getContext());
+ this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos);
}
@Override public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index 632cf66..eed200e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -27,9 +27,7 @@ import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.server.DrillbitContext;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import com.google.common.base.Stopwatch;
@@ -91,14 +89,9 @@ public class AssignmentCreator<T extends CompleteWork> {
* @param units the list of work units to be assigned
* @return A multimap that maps each minor fragment id to a list of work units
*/
- public static <T extends CompleteWork> ListMultimap<Integer,T> getMappings(List<DrillbitEndpoint> incomingEndpoints, List<T> units, DrillbitContext context) {
- boolean useOldAssignmentCode = context == null ? false : context.getOptionManager().getOption(ExecConstants.USE_OLD_ASSIGNMENT_CREATOR).bool_val;
- if (useOldAssignmentCode) {
- return OldAssignmentCreator.getMappings(incomingEndpoints, units);
- } else {
- AssignmentCreator<T> creator = new AssignmentCreator<>(incomingEndpoints, units);
- return creator.getMappings();
- }
+ public static <T extends CompleteWork> ListMultimap<Integer,T> getMappings(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
+ AssignmentCreator<T> creator = new AssignmentCreator<>(incomingEndpoints, units);
+ return creator.getMappings();
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
deleted file mode 100644
index 48bb5f3..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.schedule;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-
-/**
- * The OldAssignmentCreator is responsible for assigning a set of work units to the available slices.
- */
-public class OldAssignmentCreator<T extends CompleteWork> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssignmentCreator.class);
-
- static final double[] ASSIGNMENT_CUTOFFS = { 0.99, 0.50, 0.25, 0.00 };
- private final ArrayListMultimap<Integer, T> mappings;
- private final List<DrillbitEndpoint> endpoints;
-
-
-
-
- /**
- * Given a set of endpoints to assign work to, attempt to evenly assign work based on affinity of work units to
- * Drillbits.
- *
- * @param incomingEndpoints
- * The set of nodes to assign work to. Note that nodes can be listed multiple times if we want to have
- * multiple slices on a node working on the task simultaneously.
- * @param units
- * The work units to assign.
- * @return ListMultimap of Integer > List<CompleteWork> (based on their incoming order) to with
- */
- public static <T extends CompleteWork> ListMultimap<Integer, T> getMappings(List<DrillbitEndpoint> incomingEndpoints,
- List<T> units) {
- OldAssignmentCreator<T> creator = new OldAssignmentCreator<T>(incomingEndpoints, units);
- return creator.mappings;
- }
-
- OldAssignmentCreator(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
- logger.debug("Assigning {} units to {} endpoints", units.size(), incomingEndpoints.size());
- Stopwatch watch = Stopwatch.createUnstarted();
-
- Preconditions.checkArgument(incomingEndpoints.size() <= units.size(), String.format("Incoming endpoints %d "
- + "is greater than number of row groups %d", incomingEndpoints.size(), units.size()));
- this.mappings = ArrayListMultimap.create();
- this.endpoints = Lists.newLinkedList(incomingEndpoints);
-
- ArrayList<T> rowGroupList = new ArrayList<>(units);
- for (double cutoff : ASSIGNMENT_CUTOFFS) {
- scanAndAssign(rowGroupList, cutoff, false, false);
- }
- scanAndAssign(rowGroupList, 0.0, true, false);
- scanAndAssign(rowGroupList, 0.0, true, true);
-
- logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS));
- Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned");
- Preconditions.checkState(!units.isEmpty());
-
- }
-
- /**
- *
- * @param mappings
- * the mapping between fragment/endpoint and rowGroup
- * @param endpoints
- * the list of drillbits, ordered by the corresponding fragment
- * @param workunits
- * the list of rowGroups to assign
- * @param requiredPercentage
- * the percentage of max bytes required to make an assignment
- * @param assignAll
- * if true, will assign even if no affinity
- */
- private void scanAndAssign(List<T> workunits, double requiredPercentage, boolean assignAllToEmpty, boolean assignAll) {
- Collections.sort(workunits);
- int fragmentPointer = 0;
- final boolean requireAffinity = requiredPercentage > 0;
- int maxAssignments = (int) (workunits.size() / endpoints.size());
-
- if (maxAssignments < 1) {
- maxAssignments = 1;
- }
-
- for (Iterator<T> iter = workunits.iterator(); iter.hasNext();) {
- T unit = iter.next();
- for (int i = 0; i < endpoints.size(); i++) {
- int minorFragmentId = (fragmentPointer + i) % endpoints.size();
- DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
- EndpointByteMap endpointByteMap = unit.getByteMap();
- boolean haveAffinity = endpointByteMap.isSet(currentEndpoint);
-
- if (assignAll
- || (assignAllToEmpty && !mappings.containsKey(minorFragmentId))
- || (!endpointByteMap.isEmpty() && (!requireAffinity || haveAffinity)
- && (!mappings.containsKey(minorFragmentId) || mappings.get(minorFragmentId).size() < maxAssignments) && (!requireAffinity || endpointByteMap
- .get(currentEndpoint) >= endpointByteMap.getMaxBytes() * requiredPercentage))) {
-
- mappings.put(minorFragmentId, unit);
- logger.debug("Assigned unit: {} to minorFragmentId: {}", unit, minorFragmentId);
- // logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(),
- // minorFragmentId, endpoints.get(minorFragmentId).getAddress());
- // if (bytesPerEndpoint.get(currentEndpoint) != null) {
- // // assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) / rowGroupInfo.getLength());
- // } else {
- // // assignmentAffinityStats.update(0);
- // }
- iter.remove();
- fragmentPointer = (minorFragmentId + 1) % endpoints.size();
- break;
- }
- }
-
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
index 1efc793..65d8cf7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
@@ -63,7 +63,7 @@ public class TestAssignment {
incomingEndpoints.add(incomingEndpointsIterator.next());
}
- ListMultimap<Integer, CompleteFileWork> mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks, null);
+ ListMultimap<Integer, CompleteFileWork> mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
System.out.println(mappings.keySet().size());
for (int i = 0; i < width; i++) {
Assert.assertTrue("no mapping for entry " + i, mappings.get(i) != null && mappings.get(i).size() > 0);
[2/2] drill git commit: DRILL-4446: Support mandatory work assignment
to endpoint requirements of operators
Posted by ve...@apache.org.
DRILL-4446: Support mandatory work assignment to endpoint requirements of operators
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/10afc708
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/10afc708
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/10afc708
Branch: refs/heads/master
Commit: 10afc708600ea9f4cb0e7c2cd981b5b1001fea0d
Parents: 9f4fff8
Author: vkorukanti <ve...@gmail.com>
Authored: Wed Mar 2 13:08:36 2016 -0800
Committer: vkorukanti <ve...@dremio.com>
Committed: Wed Apr 13 10:36:21 2016 -0700
----------------------------------------------------------------------
.../drill/exec/physical/EndpointAffinity.java | 54 ++++-
.../exec/physical/base/AbstractGroupScan.java | 5 +
.../drill/exec/physical/base/AbstractStore.java | 6 +-
.../drill/exec/physical/base/HasAffinity.java | 9 +-
.../drill/exec/physical/config/Screen.java | 7 +-
.../planner/fragment/DistributionAffinity.java | 63 +++++
.../planner/fragment/FragmentParallelizer.java | 39 ++++
.../HardAffinityFragmentParallelizer.java | 150 ++++++++++++
.../planner/fragment/ParallelizationInfo.java | 9 +-
.../fragment/ParallelizationParameters.java | 44 ++++
.../planner/fragment/SimpleParallelizer.java | 154 ++----------
.../SoftAffinityFragmentParallelizer.java | 167 +++++++++++++
.../drill/exec/planner/fragment/Stats.java | 12 +-
.../exec/planner/fragment/StatsCollector.java | 4 +-
.../exec/planner/physical/DrillScanPrel.java | 2 +-
.../physical/HasDistributionAffinity.java | 28 +++
.../drill/exec/planner/physical/ScanPrel.java | 6 +
.../drill/exec/planner/physical/ScanPrule.java | 5 +-
.../drill/exec/planner/physical/ScreenPrel.java | 7 +-
.../visitor/ExcessiveExchangeIdentifier.java | 17 +-
.../planner/sql/handlers/FindLimit0Visitor.java | 46 +++-
.../drill/exec/store/sys/SystemTableScan.java | 15 +-
.../TestHardAffinityFragmentParallelizer.java | 232 +++++++++++++++++++
.../drill/exec/store/sys/TestSystemTable.java | 6 +
24 files changed, 937 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
index ce97a87..69f7b51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
@@ -31,6 +31,14 @@ public class EndpointAffinity {
private final DrillbitEndpoint endpoint;
private double affinity = 0.0d;
+ // Requires including this endpoint at least once? Default is not required.
+ private boolean mandatory;
+
+ /**
+ * Maximum allowed assignments for this endpoint. Default is {@link Integer#MAX_VALUE}
+ */
+ private int maxWidth = Integer.MAX_VALUE;
+
/**
* Create EndpointAffinity instance for given Drillbit endpoint. Affinity is initialized to 0. Affinity can be added
* after EndpointAffinity object creation using {@link #addAffinity(double)}.
@@ -54,6 +62,22 @@ public class EndpointAffinity {
}
/**
+ * Creates EndpointAffinity instance for given DrillbitEndpoint, affinity and mandatory assignment requirement flag.
+ * @param endpoint Drillbit endpoint
+ * @param affinity Initial affinity value
+ * @param mandatory Is this endpoint requires at least one mandatory assignment?
+ * @param maxWidth Maximum allowed assignments for this endpoint.
+ */
+ public EndpointAffinity(final DrillbitEndpoint endpoint, final double affinity, final boolean mandatory,
+ final int maxWidth) {
+ Preconditions.checkArgument(maxWidth >= 1, "MaxWidth for given endpoint should be at least one.");
+ this.endpoint = endpoint;
+ this.affinity = affinity;
+ this.mandatory = mandatory;
+ this.maxWidth = maxWidth;
+ }
+
+ /**
* Return the Drillbit endpoint in this instance.
*
* @return Drillbit endpoint.
@@ -87,12 +111,35 @@ public class EndpointAffinity {
}
/**
+ * Set the endpoint requires at least one assignment.
+ */
+ public void setAssignmentRequired() {
+ mandatory = true;
+ }
+
+ /**
* Is this endpoint required to be in fragment endpoint assignment list?
*
* @return Returns true for mandatory assignment, false otherwise.
*/
public boolean isAssignmentRequired() {
- return Double.POSITIVE_INFINITY == affinity;
+ return mandatory || Double.POSITIVE_INFINITY == affinity;
+ }
+
+ /**
+ * @return Maximum allowed assignments for this endpoint.
+ */
+ public int getMaxWidth() {
+ return maxWidth;
+ }
+
+ /**
+ * Set the new max width as the minimum of the the given value and current max width.
+ * @param maxWidth
+ */
+ public void setMaxWidth(final int maxWidth) {
+ Preconditions.checkArgument(maxWidth >= 1, "MaxWidth for given endpoint should be at least one.");
+ this.maxWidth = Math.min(this.maxWidth, maxWidth);
}
@Override
@@ -128,11 +175,12 @@ public class EndpointAffinity {
} else if (!endpoint.equals(other.endpoint)) {
return false;
}
- return true;
+ return mandatory == other.mandatory;
}
@Override
public String toString() {
- return "EndpointAffinity [endpoint=" + TextFormat.shortDebugString(endpoint) + ", affinity=" + affinity + "]";
+ return "EndpointAffinity [endpoint=" + TextFormat.shortDebugString(endpoint) + ", affinity=" + affinity +
+ ", mandatory=" + mandatory + ", maxWidth=" + maxWidth + "]";
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 57fbd00..ac42766 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -162,4 +163,8 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
return null;
}
+ @Override
+ public DistributionAffinity getDistributionAffinity() {
+ return DistributionAffinity.SOFT;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractStore.java
index 41fbe57..4edda22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractStore.java
@@ -18,7 +18,7 @@
package org.apache.drill.exec.physical.base;
-
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
public abstract class AbstractStore extends AbstractSingle implements Store, Root{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStore.class);
@@ -33,4 +33,8 @@ public abstract class AbstractStore extends AbstractSingle implements Store, Roo
}
+ @Override
+ public DistributionAffinity getDistributionAffinity() {
+ return DistributionAffinity.SOFT;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java
index 52462db..6b19173 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.drill.exec.physical.EndpointAffinity;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
/**
* Describes a physical operator that has affinity to particular nodes. Used for assignment decisions.
@@ -33,5 +34,11 @@ public interface HasAffinity extends PhysicalOperator {
* @return List of EndpointAffinity objects.
*/
@JsonIgnore
- public List<EndpointAffinity> getOperatorAffinity();
+ List<EndpointAffinity> getOperatorAffinity();
+
+ /**
+ * Get distribution affinity which describes the parallelization strategy of the operator.
+ */
+ @JsonIgnore
+ DistributionAffinity getDistributionAffinity();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
index 97c2405..4eda79e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.physical.base.AbstractStore;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
@@ -48,7 +49,7 @@ public class Screen extends AbstractStore {
@Override
public List<EndpointAffinity> getOperatorAffinity() {
- return Collections.singletonList(new EndpointAffinity(endpoint, Double.POSITIVE_INFINITY));
+ return Collections.singletonList(new EndpointAffinity(endpoint, 1, true, /* maxWidth = */ 1));
}
@Override
@@ -102,4 +103,8 @@ public class Screen extends AbstractStore {
return CoreOperatorType.SCREEN_VALUE;
}
+ @Override
+ public DistributionAffinity getDistributionAffinity() {
+ return DistributionAffinity.HARD;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributionAffinity.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributionAffinity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributionAffinity.java
new file mode 100644
index 0000000..d26d413
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributionAffinity.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+/**
+ * Describes an operator's endpoint assignment requirements. Ordering is from no assignment requirement to mandatory
+ * assignment requirements. Changes/new addition should keep the order of increasing restrictive assignment requirement.
+ */
+public enum DistributionAffinity {
+ /**
+ * No affinity to any endpoints. Operator can run on any endpoint.
+ */
+ NONE(SoftAffinityFragmentParallelizer.INSTANCE),
+
+ /**
+ * Operator has soft distribution affinity to one or more endpoints. Operator performs better when fragments are
+ * assigned to the endpoints with affinity, but not a mandatory requirement.
+ */
+ SOFT(SoftAffinityFragmentParallelizer.INSTANCE),
+
+ /**
+ * Hard distribution affinity to one or more endpoints. Fragments having the operator must be scheduled on the nodes
+ * with affinity.
+ */
+ HARD(HardAffinityFragmentParallelizer.INSTANCE);
+
+ private final FragmentParallelizer fragmentParallelizer;
+
+ DistributionAffinity(final FragmentParallelizer fragmentParallelizer) {
+ this.fragmentParallelizer = fragmentParallelizer;
+ }
+
+ /**
+ * @return {@link FragmentParallelizer} implementation.
+ */
+ public FragmentParallelizer getFragmentParallelizer() {
+ return fragmentParallelizer;
+ }
+
+ /**
+ * Is the current DistributionAffinity less restrictive than the given DistributionAffinity?
+ * @param distributionAffinity
+ * @return
+ */
+ public boolean isLessRestrictiveThan(final DistributionAffinity distributionAffinity) {
+ return ordinal() < distributionAffinity.ordinal();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/FragmentParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/FragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/FragmentParallelizer.java
new file mode 100644
index 0000000..5237098
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/FragmentParallelizer.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.Collection;
+
+/**
+ * Generic interface to provide different parallelization strategies for MajorFragments.
+ */
+public interface FragmentParallelizer {
+ /**
+ * Parallelize the given fragment.
+ *
+ * @param fragment
+ * @param parameters
+ * @param activeEndpoints
+ * @throws PhysicalOperatorSetupException
+ */
+ void parallelizeFragment(final Wrapper fragment, final ParallelizationParameters parameters,
+ final Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException;
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
new file mode 100644
index 0000000..550dcb2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Implementation of {@link FragmentParallelizer} where fragment requires running on a given set of endpoints. Width
+ * per node is depended on the affinity to the endpoint and total width (calculated using costs)
+ */
+public class HardAffinityFragmentParallelizer implements FragmentParallelizer {
+ private static final Logger logger = org.slf4j.LoggerFactory.getLogger(HardAffinityFragmentParallelizer.class);
+
+ public static final HardAffinityFragmentParallelizer INSTANCE = new HardAffinityFragmentParallelizer();
+
+ private static String EOL = System.getProperty("line.separator");
+
+ private HardAffinityFragmentParallelizer() { /* singleton */}
+
+ @Override
+ public void parallelizeFragment(final Wrapper fragmentWrapper, final ParallelizationParameters parameters,
+ final Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
+
+ final Stats stats = fragmentWrapper.getStats();
+ final ParallelizationInfo pInfo = stats.getParallelizationInfo();
+
+ int totalMaxWidth = 0;
+
+ // Go through the affinity map and extract the endpoints that have mandatory assignment requirement
+ final Map<DrillbitEndpoint, EndpointAffinity> endpointPool = Maps.newHashMap();
+ for(Entry<DrillbitEndpoint, EndpointAffinity> entry : pInfo.getEndpointAffinityMap().entrySet()) {
+ if (entry.getValue().isAssignmentRequired()) {
+ endpointPool.put(entry.getKey(), entry.getValue());
+
+ // Limit the max width of the endpoint to allowed max width.
+ totalMaxWidth += Math.min(parameters.getMaxWidthPerNode(), entry.getValue().getMaxWidth());
+ if (totalMaxWidth < 0) {
+ // If the totalWidth overflows, just keep it at the max value.
+ totalMaxWidth = Integer.MAX_VALUE;
+ }
+ }
+ }
+
+ // Step 1: Find the width taking into various parameters
+ // 1.1. Find the parallelization based on cost. Use max cost of all operators in this fragment; this is consistent
+ // with the calculation that ExcessiveExchangeRemover uses.
+ int width = (int) Math.ceil(stats.getMaxCost() / parameters.getSliceTarget());
+
+ // 1.2. Make sure the width is at least the number of endpoints that require an assignment
+ width = Math.max(endpointPool.size(), width);
+
+ // 1.3. Cap the parallelization width by fragment level width limit and system level per query width limit
+ width = Math.max(1, Math.min(width, pInfo.getMaxWidth()));
+ checkOrThrow(endpointPool.size() <= width, logger,
+ "Number of mandatory endpoints ({}) that require an assignment is more than the allowed fragment max " +
+ "width ({}).", endpointPool.size(), pInfo.getMaxWidth());
+
+ // 1.4 Cap the parallelization width by global max query width
+ width = Math.max(1, Math.min(width, parameters.getMaxGlobalWidth()));
+ checkOrThrow(endpointPool.size() <= width, logger,
+ "Number of mandatory endpoints ({}) that require an assignment is more than the allowed global query " +
+ "width ({}).", endpointPool.size(), parameters.getMaxGlobalWidth());
+
+ // 1.5 Cap the parallelization width by max allowed parallelization per node
+ width = Math.max(1, Math.min(width, endpointPool.size()*parameters.getMaxWidthPerNode()));
+
+ // 1.6 Cap the parallelization width by total of max allowed width per node. The reason is if we the width is more,
+ // we end up allocating more work units to one or more endpoints that don't have those many work units.
+ width = Math.min(totalMaxWidth, width);
+
+ // Step 2: Select the endpoints
+ final Map<DrillbitEndpoint, Integer> endpoints = Maps.newHashMap();
+
+ // 2.1 First add each endpoint from the pool once so that the mandatory assignment requirement is fulfilled.
+ for(Entry<DrillbitEndpoint, EndpointAffinity> entry : endpointPool.entrySet()) {
+ endpoints.put(entry.getKey(), 1);
+ }
+ int totalAssigned = endpoints.size();
+
+ // 2.2 Assign the remaining slots to endpoints proportional to the affinity of each endpoint
+ int remainingSlots = width - endpoints.size();
+ while (remainingSlots > 0) {
+ for(EndpointAffinity epAf : endpointPool.values()) {
+ final int moreAllocation = (int) Math.ceil(epAf.getAffinity() * remainingSlots);
+ int currentAssignments = endpoints.get(epAf.getEndpoint());
+ for(int i=0;
+ i < moreAllocation &&
+ totalAssigned < width &&
+ currentAssignments < parameters.getMaxWidthPerNode() &&
+ currentAssignments < epAf.getMaxWidth();
+ i++) {
+ totalAssigned++;
+ currentAssignments++;
+ }
+ endpoints.put(epAf.getEndpoint(), currentAssignments);
+ }
+ final int previousRemainingSlots = remainingSlots;
+ remainingSlots = width - totalAssigned;
+ if (previousRemainingSlots == remainingSlots) {
+ logger.error("Can't parallelize fragment: " +
+ "Every mandatory node has exhausted the maximum width per node limit." + EOL +
+ "Endpoint pool: {}" + EOL + "Assignment so far: {}" + EOL + "Width: {}", endpointPool, endpoints, width);
+ throw new PhysicalOperatorSetupException("Can not parallelize fragment.");
+ }
+ }
+
+ final List<DrillbitEndpoint> assignedEndpoints = Lists.newArrayList();
+ for(Entry<DrillbitEndpoint, Integer> entry : endpoints.entrySet()) {
+ for(int i=0; i < entry.getValue(); i++) {
+ assignedEndpoints.add(entry.getKey());
+ }
+ }
+
+ fragmentWrapper.setWidth(width);
+ fragmentWrapper.assignEndpoints(assignedEndpoints);
+ }
+
+ private static void checkOrThrow(final boolean expr, final Logger logger, final String errMsg, Object... args)
+ throws PhysicalOperatorSetupException {
+ if (!expr) {
+ logger.error(errMsg, args);
+ throw new PhysicalOperatorSetupException("Can not parallelize fragment.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
index 8e775af..ffa843c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
@@ -121,8 +121,13 @@ public class ParallelizationInfo {
// Helper method to add the given EndpointAffinity to the global affinity map
private void addEndpointAffinity(EndpointAffinity epAff) {
- if (affinityMap.containsKey(epAff.getEndpoint())) {
- affinityMap.get(epAff.getEndpoint()).addAffinity(epAff.getAffinity());
+ final EndpointAffinity epAffAgg = affinityMap.get(epAff.getEndpoint());
+ if (epAffAgg != null) {
+ epAffAgg.addAffinity(epAff.getAffinity());
+ if (epAff.isAssignmentRequired()) {
+ epAffAgg.setAssignmentRequired();
+ }
+ epAffAgg.setMaxWidth(epAff.getMaxWidth());
} else {
affinityMap.put(epAff.getEndpoint(), epAff);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationParameters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationParameters.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationParameters.java
new file mode 100644
index 0000000..ea5d9e8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationParameters.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+/**
+ * Interface to implement for passing parameters to {@link FragmentParallelizer}.
+ */
+public interface ParallelizationParameters {
+
+ /**
+ * @return Configured max width per slice of work.
+ */
+ long getSliceTarget();
+
+ /**
+ * @return Configured maximum allowed number of parallelization units per node.
+ */
+ int getMaxWidthPerNode();
+
+ /**
+ * @return Configured maximum allowed number of parallelization units per all nodes in the cluster.
+ */
+ int getMaxGlobalWidth();
+
+ /**
+ * @return Factor by which a node with endpoint affinity will be favored while creating assignment.
+ */
+ double getAffinityFactor();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index e04a8a2..9aad9a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -19,19 +19,13 @@ package org.apache.drill.exec.planner.fragment;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ThreadLocalRandom;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DrillStringUtils;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
@@ -57,9 +51,7 @@ import org.apache.drill.exec.work.foreman.ForemanSetupException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
/**
@@ -68,17 +60,9 @@ import com.google.common.collect.Sets;
* parallelization for each major fragment will be determined. Once the amount of parallelization is done, assignment
* is done based on round robin assignment ordered by operator affinity (locality) to available execution Drillbits.
*/
-public class SimpleParallelizer {
+public class SimpleParallelizer implements ParallelizationParameters {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class);
- private static final Ordering<EndpointAffinity> ENDPOINT_AFFINITY_ORDERING = Ordering.from(new Comparator<EndpointAffinity>() {
- @Override
- public int compare(EndpointAffinity o1, EndpointAffinity o2) {
- // Sort in descending order of affinity values
- return Double.compare(o2.getAffinity(), o1.getAffinity());
- }
- });
-
private final long parallelizationThreshold;
private final int maxWidthPerNode;
private final int maxGlobalWidth;
@@ -100,6 +84,25 @@ public class SimpleParallelizer {
this.affinityFactor = affinityFactor;
}
+ @Override
+ public long getSliceTarget() {
+ return parallelizationThreshold;
+ }
+
+ @Override
+ public int getMaxWidthPerNode() {
+ return maxWidthPerNode;
+ }
+
+ @Override
+ public int getMaxGlobalWidth() {
+ return maxGlobalWidth;
+ }
+
+ @Override
+ public double getAffinityFactor() {
+ return affinityFactor;
+ }
/**
* Generate a set of assigned fragments based on the provided fragment tree. Do not allow parallelization stages
@@ -209,120 +212,13 @@ public class SimpleParallelizer {
}
}
- Fragment fragment = fragmentWrapper.getNode();
-
- // Step 1: Find stats. Stats include various factors including cost of physical operators, parallelizability of
+ // Find stats. Stats include various factors including cost of physical operators, parallelizability of
// work in physical operator and affinity of physical operator to certain nodes.
- fragment.getRoot().accept(new StatsCollector(planningSet), fragmentWrapper);
-
- // Step 2: Find the parallelization width of fragment
-
- final Stats stats = fragmentWrapper.getStats();
- final ParallelizationInfo parallelizationInfo = stats.getParallelizationInfo();
-
- // 2.1 Use max cost of all operators in this fragment; this is consistent with the
- // calculation that ExcessiveExchangeRemover uses
- // 2.1. Find the parallelization based on cost
- int width = (int) Math.ceil(stats.getMaxCost() / parallelizationThreshold);
-
- // 2.2. Cap the parallelization width by fragment level width limit and system level per query width limit
- width = Math.min(width, Math.min(parallelizationInfo.getMaxWidth(), maxGlobalWidth));
-
- // 2.3. Cap the parallelization width by system level per node width limit
- width = Math.min(width, maxWidthPerNode * activeEndpoints.size());
-
- // 2.4. Make sure width is at least the min width enforced by operators
- width = Math.max(parallelizationInfo.getMinWidth(), width);
-
- // 2.4. Make sure width is at most the max width enforced by operators
- width = Math.min(parallelizationInfo.getMaxWidth(), width);
-
- // 2.5 Finally make sure the width is at least one
- width = Math.max(1, width);
-
- fragmentWrapper.setWidth(width);
-
- List<DrillbitEndpoint> assignedEndpoints = findEndpoints(activeEndpoints,
- parallelizationInfo.getEndpointAffinityMap(), fragmentWrapper.getWidth());
- fragmentWrapper.assignEndpoints(assignedEndpoints);
- }
-
- // Assign endpoints based on the given endpoint list, affinity map and width.
- private List<DrillbitEndpoint> findEndpoints(Collection<DrillbitEndpoint> activeEndpoints,
- Map<DrillbitEndpoint, EndpointAffinity> endpointAffinityMap, final int width)
- throws PhysicalOperatorSetupException {
-
- final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
-
- if (endpointAffinityMap.size() > 0) {
- // Get EndpointAffinity list sorted in descending order of affinity values
- List<EndpointAffinity> sortedAffinityList = ENDPOINT_AFFINITY_ORDERING.immutableSortedCopy(endpointAffinityMap.values());
-
- // Find the number of mandatory nodes (nodes with +infinity affinity).
- int numRequiredNodes = 0;
- for(EndpointAffinity ep : sortedAffinityList) {
- if (ep.isAssignmentRequired()) {
- numRequiredNodes++;
- } else {
- // As the list is sorted in descending order of affinities, we don't need to go beyond the first occurrance
- // of non-mandatory node
- break;
- }
- }
-
- if (width < numRequiredNodes) {
- throw new PhysicalOperatorSetupException("Can not parallelize the fragment as the parallelization width (" + width + ") is " +
- "less than the number of mandatory nodes (" + numRequiredNodes + " nodes with +INFINITE affinity).");
- }
-
- // Find the maximum number of slots which should go to endpoints with affinity (See DRILL-825 for details)
- int affinedSlots =
- Math.max(1, (int) (affinityFactor * width / activeEndpoints.size())) * sortedAffinityList.size();
-
- // Make sure affined slots is at least the number of mandatory nodes
- affinedSlots = Math.max(affinedSlots, numRequiredNodes);
-
- // Cap the affined slots to max parallelization width
- affinedSlots = Math.min(affinedSlots, width);
-
- Iterator<EndpointAffinity> affinedEPItr = Iterators.cycle(sortedAffinityList);
-
- // Keep adding until we have selected "affinedSlots" number of endpoints.
- while(endpoints.size() < affinedSlots) {
- EndpointAffinity ea = affinedEPItr.next();
- endpoints.add(ea.getEndpoint());
- }
- }
-
- // add remaining endpoints if required
- if (endpoints.size() < width) {
- // Get a list of endpoints that are not part of the affinity endpoint list
- List<DrillbitEndpoint> endpointsWithNoAffinity;
- final Set<DrillbitEndpoint> endpointsWithAffinity = endpointAffinityMap.keySet();
-
- if (endpointAffinityMap.size() > 0) {
- endpointsWithNoAffinity = Lists.newArrayList();
- for (DrillbitEndpoint ep : activeEndpoints) {
- if (!endpointsWithAffinity.contains(ep)) {
- endpointsWithNoAffinity.add(ep);
- }
- }
- } else {
- endpointsWithNoAffinity = Lists.newArrayList(activeEndpoints); // Need to create a copy instead of an
- // immutable copy, because we need to shuffle the list (next statement) and Collections.shuffle() doesn't
- // support immutable copy as input.
- }
-
- // round robin with random start.
- Collections.shuffle(endpointsWithNoAffinity, ThreadLocalRandom.current());
- Iterator<DrillbitEndpoint> otherEPItr =
- Iterators.cycle(endpointsWithNoAffinity.size() > 0 ? endpointsWithNoAffinity : endpointsWithAffinity);
- while (endpoints.size() < width) {
- endpoints.add(otherEPItr.next());
- }
- }
+ fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper);
- return endpoints;
+ fragmentWrapper.getStats().getDistributionAffinity()
+ .getFragmentParallelizer()
+ .parallelizeFragment(fragmentWrapper, this, activeEndpoints);
}
private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
new file mode 100644
index 0000000..1ebed86
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Implementation of {@link FragmentParallelizer} where fragment has zero or more endpoints with affinities. Width
+ * per node is depended on the affinity to the endpoint and total width (calculated using costs). Based on various
+ * factors endpoints which have no affinity can be assigned to run the fragments.
+ */
+public class SoftAffinityFragmentParallelizer implements FragmentParallelizer {
+ public static final SoftAffinityFragmentParallelizer INSTANCE = new SoftAffinityFragmentParallelizer();
+
+ private static final Ordering<EndpointAffinity> ENDPOINT_AFFINITY_ORDERING =
+ Ordering.from(new Comparator<EndpointAffinity>() {
+ @Override
+ public int compare(EndpointAffinity o1, EndpointAffinity o2) {
+ // Sort in descending order of affinity values
+ return Double.compare(o2.getAffinity(), o1.getAffinity());
+ }
+ });
+
+ @Override
+ public void parallelizeFragment(final Wrapper fragmentWrapper, final ParallelizationParameters parameters,
+ final Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
+ final Fragment fragment = fragmentWrapper.getNode();
+
+ // Find the parallelization width of fragment
+ final Stats stats = fragmentWrapper.getStats();
+ final ParallelizationInfo parallelizationInfo = stats.getParallelizationInfo();
+
+ // 1. Find the parallelization based on cost. Use max cost of all operators in this fragment; this is consistent
+ // with the calculation that ExcessiveExchangeRemover uses.
+ int width = (int) Math.ceil(stats.getMaxCost() / parameters.getSliceTarget());
+
+ // 2. Cap the parallelization width by fragment level width limit and system level per query width limit
+ width = Math.min(width, Math.min(parallelizationInfo.getMaxWidth(), parameters.getMaxGlobalWidth()));
+
+ // 3. Cap the parallelization width by system level per node width limit
+ width = Math.min(width, parameters.getMaxWidthPerNode() * activeEndpoints.size());
+
+ // 4. Make sure width is at least the min width enforced by operators
+ width = Math.max(parallelizationInfo.getMinWidth(), width);
+
+ // 4. Make sure width is at most the max width enforced by operators
+ width = Math.min(parallelizationInfo.getMaxWidth(), width);
+
+ // 5 Finally make sure the width is at least one
+ width = Math.max(1, width);
+
+ fragmentWrapper.setWidth(width);
+
+ final List<DrillbitEndpoint> assignedEndpoints = findEndpoints(activeEndpoints,
+ parallelizationInfo.getEndpointAffinityMap(), fragmentWrapper.getWidth(), parameters);
+
+ fragmentWrapper.assignEndpoints(assignedEndpoints);
+ }
+
+ // Assign endpoints based on the given endpoint list, affinity map and width.
+ private List<DrillbitEndpoint> findEndpoints(final Collection<DrillbitEndpoint> activeEndpoints,
+ final Map<DrillbitEndpoint, EndpointAffinity> endpointAffinityMap, final int width,
+ final ParallelizationParameters parameters)
+ throws PhysicalOperatorSetupException {
+
+ final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+
+ if (endpointAffinityMap.size() > 0) {
+ // Get EndpointAffinity list sorted in descending order of affinity values
+ List<EndpointAffinity> sortedAffinityList = ENDPOINT_AFFINITY_ORDERING.immutableSortedCopy(endpointAffinityMap.values());
+
+ // Find the number of mandatory nodes (nodes with +infinity affinity).
+ int numRequiredNodes = 0;
+ for(EndpointAffinity ep : sortedAffinityList) {
+ if (ep.isAssignmentRequired()) {
+ numRequiredNodes++;
+ } else {
+ // As the list is sorted in descending order of affinities, we don't need to go beyond the first occurrance
+ // of non-mandatory node
+ break;
+ }
+ }
+
+ if (width < numRequiredNodes) {
+ throw new PhysicalOperatorSetupException("Can not parallelize the fragment as the parallelization width (" + width + ") is " +
+ "less than the number of mandatory nodes (" + numRequiredNodes + " nodes with +INFINITE affinity).");
+ }
+
+ // Find the maximum number of slots which should go to endpoints with affinity (See DRILL-825 for details)
+ int affinedSlots =
+ Math.max(1, (int) (parameters.getAffinityFactor() * width / activeEndpoints.size())) * sortedAffinityList.size();
+
+ // Make sure affined slots is at least the number of mandatory nodes
+ affinedSlots = Math.max(affinedSlots, numRequiredNodes);
+
+ // Cap the affined slots to max parallelization width
+ affinedSlots = Math.min(affinedSlots, width);
+
+ Iterator<EndpointAffinity> affinedEPItr = Iterators.cycle(sortedAffinityList);
+
+ // Keep adding until we have selected "affinedSlots" number of endpoints.
+ while(endpoints.size() < affinedSlots) {
+ EndpointAffinity ea = affinedEPItr.next();
+ endpoints.add(ea.getEndpoint());
+ }
+ }
+
+ // add remaining endpoints if required
+ if (endpoints.size() < width) {
+ // Get a list of endpoints that are not part of the affinity endpoint list
+ List<DrillbitEndpoint> endpointsWithNoAffinity;
+ final Set<DrillbitEndpoint> endpointsWithAffinity = endpointAffinityMap.keySet();
+
+ if (endpointAffinityMap.size() > 0) {
+ endpointsWithNoAffinity = Lists.newArrayList();
+ for (DrillbitEndpoint ep : activeEndpoints) {
+ if (!endpointsWithAffinity.contains(ep)) {
+ endpointsWithNoAffinity.add(ep);
+ }
+ }
+ } else {
+ endpointsWithNoAffinity = Lists.newArrayList(activeEndpoints); // Need to create a copy instead of an
+ // immutable copy, because we need to shuffle the list (next statement) and Collections.shuffle() doesn't
+ // support immutable copy as input.
+ }
+
+ // round robin with random start.
+ Collections.shuffle(endpointsWithNoAffinity, ThreadLocalRandom.current());
+ Iterator<DrillbitEndpoint> otherEPItr =
+ Iterators.cycle(endpointsWithNoAffinity.size() > 0 ? endpointsWithNoAffinity : endpointsWithAffinity);
+ while (endpoints.size() < width) {
+ endpoints.add(otherEPItr.next());
+ }
+ }
+
+ return endpoints;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
index b5b8ce4..03f16b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.planner.fragment;
-
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.planner.fragment.ParallelizationInfo.ParallelizationInfoCollector;
@@ -26,6 +25,7 @@ import java.util.List;
public class Stats {
private final ParallelizationInfoCollector collector = new ParallelizationInfoCollector();
private double maxCost = 0.0;
+ private DistributionAffinity distributionAffinity = DistributionAffinity.NONE;
public void addParallelizationInfo(ParallelizationInfo parallelizationInfo) {
collector.add(parallelizationInfo);
@@ -43,6 +43,12 @@ public class Stats {
collector.addMinWidth(minWidth);
}
+ public void setDistributionAffinity(final DistributionAffinity distributionAffinity) {
+ if (this.distributionAffinity.isLessRestrictiveThan(distributionAffinity)) {
+ this.distributionAffinity = distributionAffinity;
+ }
+ }
+
public void addEndpointAffinities(List<EndpointAffinity> endpointAffinityList) {
collector.addEndpointAffinities(endpointAffinityList);
}
@@ -59,4 +65,8 @@ public class Stats {
public double getMaxCost() {
return maxCost;
}
+
+ public DistributionAffinity getDistributionAffinity() {
+ return distributionAffinity;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index 4f4e0b5..74031e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -101,7 +101,9 @@ public class StatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeExcept
public Void visitOp(PhysicalOperator op, Wrapper wrapper) {
final Stats stats = wrapper.getStats();
if (op instanceof HasAffinity) {
- stats.addEndpointAffinities(((HasAffinity)op).getOperatorAffinity());
+ final HasAffinity hasAffinity = (HasAffinity)op;
+ stats.addEndpointAffinities(hasAffinity.getOperatorAffinity());
+ stats.setDistributionAffinity(hasAffinity.getDistributionAffinity());
}
stats.addCost(op.getCost());
for (PhysicalOperator child : op) {
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java
index a452bac..ae236c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.planner.physical;
import org.apache.drill.exec.physical.base.GroupScan;
-public interface DrillScanPrel extends Prel{
+public interface DrillScanPrel extends Prel, HasDistributionAffinity{
public GroupScan getGroupScan();
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HasDistributionAffinity.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HasDistributionAffinity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HasDistributionAffinity.java
new file mode 100644
index 0000000..42eed19
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HasDistributionAffinity.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
+
+/**
+ * Implement this interface if a Prel has distribution affinity requirements.
+ */
+public interface HasDistributionAffinity {
+
+ DistributionAffinity getDistributionAffinity();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index b100d90..0d42a69 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.calcite.rel.AbstractRelNode;
@@ -161,4 +162,9 @@ public class ScanPrel extends AbstractRelNode implements DrillScanPrel {
public boolean needsFinalColumnReordering() {
return true;
}
+
+ @Override
+ public DistributionAffinity getDistributionAffinity() {
+ return groupScan.getDistributionAffinity();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
index ee2bde5..d74edf1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.planner.physical;
import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.calcite.plan.RelOptRule;
@@ -37,7 +38,9 @@ public class ScanPrule extends Prule{
GroupScan groupScan = scan.getGroupScan();
- DrillDistributionTrait partition = groupScan.getMaxParallelizationWidth() > 1 ? DrillDistributionTrait.RANDOM_DISTRIBUTED : DrillDistributionTrait.SINGLETON;
+ DrillDistributionTrait partition =
+ (groupScan.getMaxParallelizationWidth() > 1 || groupScan.getDistributionAffinity() == DistributionAffinity.HARD)
+ ? DrillDistributionTrait.RANDOM_DISTRIBUTED : DrillDistributionTrait.SINGLETON;
final RelTraitSet traits = scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(partition);
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
index ef77dff..f2d10d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
@@ -24,13 +24,14 @@ import java.util.List;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.planner.common.DrillScreenRelBase;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
-public class ScreenPrel extends DrillScreenRelBase implements Prel {
+public class ScreenPrel extends DrillScreenRelBase implements Prel, HasDistributionAffinity {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenPrel.class);
@@ -79,4 +80,8 @@ public class ScreenPrel extends DrillScreenRelBase implements Prel {
return false;
}
+ @Override
+ public DistributionAffinity getDistributionAffinity() {
+ return DistributionAffinity.HARD;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
index 7686d56..2e95e7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical.visitor;
import java.util.Collections;
import java.util.List;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
import org.apache.drill.exec.planner.physical.ExchangePrel;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ScanPrel;
@@ -48,7 +49,10 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
MajorFragmentStat newFrag = new MajorFragmentStat();
Prel newChild = ((Prel) prel.getInput()).accept(this, newFrag);
- if (newFrag.isSingular() && parent.isSingular()) {
+ if (newFrag.isSingular() && parent.isSingular() &&
+ // if one of them has strict distribution or none, we can remove the exchange
+ (!newFrag.isDistributionStrict() || !parent.isDistributionStrict())
+ ) {
return newChild;
} else {
return (Prel) prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode) newChild));
@@ -57,7 +61,7 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
@Override
public Prel visitScreen(ScreenPrel prel, MajorFragmentStat s) throws RuntimeException {
- s.setSingular();
+ s.addScreen(prel);
RelNode child = ((Prel)prel.getInput()).accept(this, s);
return (Prel) prel.copy(prel.getTraitSet(), Collections.singletonList(child));
}
@@ -92,6 +96,7 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
}
class MajorFragmentStat {
+ private DistributionAffinity distributionAffinity = DistributionAffinity.NONE;
private double maxRows = 0d;
private int maxWidth = Integer.MAX_VALUE;
private boolean isMultiSubScan = false;
@@ -100,13 +105,15 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
maxRows = Math.max(prel.getRows(), maxRows);
}
- public void setSingular() {
+ public void addScreen(ScreenPrel screenPrel) {
maxWidth = 1;
+ distributionAffinity = screenPrel.getDistributionAffinity();
}
public void addScan(ScanPrel prel) {
maxWidth = Math.min(maxWidth, prel.getGroupScan().getMaxParallelizationWidth());
isMultiSubScan = prel.getGroupScan().getMinParallelizationWidth() > 1;
+ distributionAffinity = prel.getDistributionAffinity();
add(prel);
}
@@ -124,6 +131,10 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
}
return w == 1;
}
+
+ public boolean isDistributionStrict() {
+ return distributionAffinity == DistributionAffinity.HARD;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
index fa1fe07..e7460b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalIntersect;
import org.apache.calcite.rel.logical.LogicalJoin;
@@ -34,6 +35,7 @@ import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -41,14 +43,18 @@ import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
import org.apache.drill.exec.planner.logical.DrillDirectScanRel;
import org.apache.drill.exec.planner.logical.DrillLimitRel;
import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
import org.apache.drill.exec.planner.sql.TypeInferenceUtils;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.direct.DirectGroupScan;
+import java.io.IOException;
import java.util.List;
/**
@@ -114,7 +120,15 @@ public class FindLimit0Visitor extends RelShuttleImpl {
public static boolean containsLimit0(RelNode rel) {
FindLimit0Visitor visitor = new FindLimit0Visitor();
rel.accept(visitor);
- return visitor.isContains();
+
+ if (!visitor.isContains()) {
+ return false;
+ }
+
+ final FindHardDistributionScans hdVisitor = new FindHardDistributionScans();
+ rel.accept(hdVisitor);
+ // Can't optimize limit 0 if the query contains a table which has hard distribution requirement.
+ return !hdVisitor.contains();
}
private boolean contains = false;
@@ -200,7 +214,7 @@ public class FindLimit0Visitor extends RelShuttleImpl {
public final List<TypeProtos.DataMode> dataModes;
public RelDataTypeReader(List<String> columnNames, List<SqlTypeName> columnTypes,
- List<TypeProtos.DataMode> dataModes) {
+ List<TypeProtos.DataMode> dataModes) {
Preconditions.checkArgument(columnNames.size() == columnTypes.size() &&
columnTypes.size() == dataModes.size());
this.columnNames = columnNames;
@@ -234,4 +248,32 @@ public class FindLimit0Visitor extends RelShuttleImpl {
public void close() throws Exception {
}
}
+ /**
+ * Visitor to scan the RelNode tree and find if it contains any Scans that require hard distribution requirements.
+ */
+ private static class FindHardDistributionScans extends RelShuttleImpl {
+ private boolean contains;
+
+ @Override
+ public RelNode visit(TableScan scan) {
+ DrillTable unwrap;
+ unwrap = scan.getTable().unwrap(DrillTable.class);
+ if (unwrap == null) {
+ unwrap = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
+ }
+
+ try {
+ if (unwrap.getGroupScan().getDistributionAffinity() == DistributionAffinity.HARD) {
+ contains = true;
+ }
+ } catch (final IOException e) {
+ throw new DrillRuntimeException("Failed to get GroupScan from table.");
+ }
+ return scan;
+ }
+
+ public boolean contains() {
+ return contains;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index 962e2c6..b77ed23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.sys;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -121,7 +123,7 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
/**
* If distributed, the scan needs to happen on every node. Since width is enforced, the number of fragments equals
- * number of Drillbits. And here we set, endpoint affinities to Double.POSITIVE_INFINITY to ensure every
+ * number of Drillbits. And here we set, each endpoint as mandatory assignment required to ensure every
* Drillbit executes a fragment.
* @return the Drillbit endpoint affinities
*/
@@ -129,8 +131,10 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
public List<EndpointAffinity> getOperatorAffinity() {
if (table.isDistributed()) {
final List<EndpointAffinity> affinities = Lists.newArrayList();
- for (final DrillbitEndpoint endpoint : plugin.getContext().getBits()) {
- affinities.add(new EndpointAffinity(endpoint, Double.POSITIVE_INFINITY));
+ final Collection<DrillbitEndpoint> bits = plugin.getContext().getBits();
+ final double affinityPerNode = 1d / bits.size();
+ for (final DrillbitEndpoint endpoint : bits) {
+ affinities.add(new EndpointAffinity(endpoint, affinityPerNode, true, /* maxWidth = */ 1));
}
return affinities;
} else {
@@ -139,6 +143,11 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
}
@Override
+ public DistributionAffinity getDistributionAffinity() {
+ return table.isDistributed() ? DistributionAffinity.HARD : DistributionAffinity.SOFT;
+ }
+
+ @Override
public GroupScan clone(List<SchemaPath> columns) {
return this;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/test/java/org/apache/drill/exec/planner/fragment/TestHardAffinityFragmentParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/fragment/TestHardAffinityFragmentParallelizer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/fragment/TestHardAffinityFragmentParallelizer.java
new file mode 100644
index 0000000..a58404b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/fragment/TestHardAffinityFragmentParallelizer.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static java.lang.Integer.MAX_VALUE;
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT;
+import static org.apache.drill.exec.planner.fragment.HardAffinityFragmentParallelizer.INSTANCE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestHardAffinityFragmentParallelizer {
+
+ // Create a set of test endpoints
+ private static final DrillbitEndpoint N1_EP1 = newDrillbitEndpoint("node1", 30010);
+ private static final DrillbitEndpoint N1_EP2 = newDrillbitEndpoint("node1", 30011);
+ private static final DrillbitEndpoint N2_EP1 = newDrillbitEndpoint("node2", 30010);
+ private static final DrillbitEndpoint N2_EP2 = newDrillbitEndpoint("node2", 30011);
+ private static final DrillbitEndpoint N3_EP1 = newDrillbitEndpoint("node3", 30010);
+ private static final DrillbitEndpoint N3_EP2 = newDrillbitEndpoint("node3", 30011);
+ private static final DrillbitEndpoint N4_EP2 = newDrillbitEndpoint("node4", 30011);
+
+ @Mocked private Fragment fragment;
+ @Mocked private PhysicalOperator root;
+
+ private static final DrillbitEndpoint newDrillbitEndpoint(String address, int port) {
+ return DrillbitEndpoint.newBuilder().setAddress(address).setControlPort(port).build();
+ }
+
+ private static final ParallelizationParameters newParameters(final long threshold, final int maxWidthPerNode,
+ final int maxGlobalWidth) {
+ return new ParallelizationParameters() {
+ @Override
+ public long getSliceTarget() {
+ return threshold;
+ }
+
+ @Override
+ public int getMaxWidthPerNode() {
+ return maxWidthPerNode;
+ }
+
+ @Override
+ public int getMaxGlobalWidth() {
+ return maxGlobalWidth;
+ }
+
+ /**
+ * {@link HardAffinityFragmentParallelizer} doesn't use affinity factor.
+ * @return
+ */
+ @Override
+ public double getAffinityFactor() {
+ return 0.0f;
+ }
+ };
+ }
+
+ private final Wrapper newWrapper(double cost, int minWidth, int maxWidth, List<EndpointAffinity> epAffs) {
+ new NonStrictExpectations() {
+ {
+ fragment.getRoot(); result = root;
+ }
+ };
+
+ final Wrapper fragmentWrapper = new Wrapper(fragment, 1);
+ final Stats stats = fragmentWrapper.getStats();
+ stats.setDistributionAffinity(DistributionAffinity.HARD);
+ stats.addCost(cost);
+ stats.addMinWidth(minWidth);
+ stats.addMaxWidth(maxWidth);
+ stats.addEndpointAffinities(epAffs);
+
+ return fragmentWrapper;
+ }
+
+ @Test
+ public void simpleCase1() throws Exception {
+ final Wrapper wrapper = newWrapper(200, 1, 20, Collections.singletonList(new EndpointAffinity(N1_EP1, 1.0, true, MAX_VALUE)));
+ INSTANCE.parallelizeFragment(wrapper, newParameters(SLICE_TARGET_DEFAULT, 5, 20), null);
+
+ // Expect the fragment parallelization to be just one because:
+ // The cost (200) is below the threshold (SLICE_TARGET_DEFAULT) (which gives width of 200/10000 = ~1) and
+ assertEquals(1, wrapper.getWidth());
+
+ final List<DrillbitEndpoint> assignedEps = wrapper.getAssignedEndpoints();
+ assertEquals(1, assignedEps.size());
+ assertEquals(N1_EP1, assignedEps.get(0));
+ }
+
+ @Test
+ public void simpleCase2() throws Exception {
+ // Set the slice target to 1
+ final Wrapper wrapper = newWrapper(200, 1, 20, Collections.singletonList(new EndpointAffinity(N1_EP1, 1.0, true, MAX_VALUE)));
+ INSTANCE.parallelizeFragment(wrapper, newParameters(1, 5, 20), null);
+
+ // Expect the fragment parallelization to be 5:
+ // 1. the cost (200) is above the threshold (SLICE_TARGET_DEFAULT) (which gives 200/1=200 width) and
+ // 2. Max width per node is 5 (limits the width 200 to 5)
+ assertEquals(5, wrapper.getWidth());
+
+ final List<DrillbitEndpoint> assignedEps = wrapper.getAssignedEndpoints();
+ assertEquals(5, assignedEps.size());
+ for (DrillbitEndpoint ep : assignedEps) {
+ assertEquals(N1_EP1, ep);
+ }
+ }
+
+ @Test
+ public void multiNodeCluster1() throws Exception {
+ final Wrapper wrapper = newWrapper(200, 1, 20,
+ ImmutableList.of(
+ new EndpointAffinity(N1_EP1, 0.15, true, MAX_VALUE),
+ new EndpointAffinity(N1_EP2, 0.15, true, MAX_VALUE),
+ new EndpointAffinity(N2_EP1, 0.10, true, MAX_VALUE),
+ new EndpointAffinity(N3_EP2, 0.20, true, MAX_VALUE),
+ new EndpointAffinity(N4_EP2, 0.20, true, MAX_VALUE)
+ ));
+ INSTANCE.parallelizeFragment(wrapper, newParameters(SLICE_TARGET_DEFAULT, 5, 20), null);
+
+ // Expect the fragment parallelization to be 5 because:
+ // 1. The cost (200) is below the threshold (SLICE_TARGET_DEFAULT) (which gives width of 200/10000 = ~1) and
+ // 2. Number of mandoatory node assignments are 5 which overrides the cost based width of 1.
+ assertEquals(5, wrapper.getWidth());
+
+ // As there are 5 required eps and the width is 5, everyone gets assigned 1.
+ final List<DrillbitEndpoint> assignedEps = wrapper.getAssignedEndpoints();
+ assertEquals(5, assignedEps.size());
+ assertTrue(assignedEps.contains(N1_EP1));
+ assertTrue(assignedEps.contains(N1_EP2));
+ assertTrue(assignedEps.contains(N2_EP1));
+ assertTrue(assignedEps.contains(N3_EP2));
+ assertTrue(assignedEps.contains(N4_EP2));
+ }
+
+ @Test
+ public void multiNodeCluster2() throws Exception {
+ final Wrapper wrapper = newWrapper(200, 1, 20,
+ ImmutableList.of(
+ new EndpointAffinity(N1_EP2, 0.15, true, MAX_VALUE),
+ new EndpointAffinity(N2_EP2, 0.15, true, MAX_VALUE),
+ new EndpointAffinity(N3_EP1, 0.10, true, MAX_VALUE),
+ new EndpointAffinity(N4_EP2, 0.20, true, MAX_VALUE),
+ new EndpointAffinity(N1_EP1, 0.20, true, MAX_VALUE)
+ ));
+ INSTANCE.parallelizeFragment(wrapper, newParameters(1, 5, 20), null);
+
+ // Expect the fragment parallelization to be 20 because:
+ // 1. the cost (200) is above the threshold (SLICE_TARGET_DEFAULT) (which gives 200/1=200 width) and
+ // 2. Number of mandatory node assignments are 5 (current width 200 satisfies the requirement)
+ // 3. max fragment width is 20 which limits the width
+ assertEquals(20, wrapper.getWidth());
+
+ final List<DrillbitEndpoint> assignedEps = wrapper.getAssignedEndpoints();
+ assertEquals(20, assignedEps.size());
+ final HashMultiset<DrillbitEndpoint> counts = HashMultiset.create();
+ for(final DrillbitEndpoint ep : assignedEps) {
+ counts.add(ep);
+ }
+ // Each node gets at max 5.
+ assertTrue(counts.count(N1_EP2) <= 5);
+ assertTrue(counts.count(N2_EP2) <= 5);
+ assertTrue(counts.count(N3_EP1) <= 5);
+ assertTrue(counts.count(N4_EP2) <= 5);
+ assertTrue(counts.count(N1_EP1) <= 5);
+ }
+
+ @Test
+ public void multiNodeClusterNegative1() throws Exception {
+ final Wrapper wrapper = newWrapper(200, 1, 20,
+ ImmutableList.of(
+ new EndpointAffinity(N1_EP2, 0.15, true, MAX_VALUE),
+ new EndpointAffinity(N2_EP2, 0.15, true, MAX_VALUE),
+ new EndpointAffinity(N3_EP1, 0.10, true, MAX_VALUE),
+ new EndpointAffinity(N4_EP2, 0.20, true, MAX_VALUE),
+ new EndpointAffinity(N1_EP1, 0.20, true, MAX_VALUE)
+ ));
+
+ try {
+ INSTANCE.parallelizeFragment(wrapper, newParameters(1, 2, 2), null);
+ fail("Expected an exception, because max global query width (2) is less than the number of mandatory nodes (5)");
+ } catch (Exception e) {
+ // ok
+ }
+ }
+
+ @Test
+ public void multiNodeClusterNegative2() throws Exception {
+ final Wrapper wrapper = newWrapper(200, 1, 3,
+ ImmutableList.of(
+ new EndpointAffinity(N1_EP2, 0.15, true, MAX_VALUE),
+ new EndpointAffinity(N2_EP2, 0.15, true, MAX_VALUE),
+ new EndpointAffinity(N3_EP1, 0.10, true, MAX_VALUE),
+ new EndpointAffinity(N4_EP2, 0.20, true, MAX_VALUE),
+ new EndpointAffinity(N1_EP1, 0.20, true, MAX_VALUE)
+ ));
+
+ try {
+ INSTANCE.parallelizeFragment(wrapper, newParameters(1, 2, 2), null);
+ fail("Expected an exception, because max fragment width (3) is less than the number of mandatory nodes (5)");
+ } catch (Exception e) {
+ // ok
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
index e86fc28..4c29dbe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
@@ -19,11 +19,17 @@ package org.apache.drill.exec.store.sys;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.exec.ExecConstants;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestSystemTable extends BaseTestQuery {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSystemTable.class);
+ @BeforeClass
+ public static void setupMultiNodeCluster() throws Exception {
+ updateTestCluster(3, null);
+ }
+
@Test
public void alterSessionOption() throws Exception {