You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ti...@apache.org on 2017/01/25 14:46:40 UTC
[7/7] asterixdb git commit: Implements concurrent query management
support.
Implements concurrent query management support.
The following changes are included:
-- factor out JobManager, NodeManager, and ResourceManager from ClusterControllerService;
-- let each application plugin its own IJobCapacityController implementation;
-- let each job specify its required cluster capacity;
-- add a required cluster capacity estimation visitor for optimized query plans;
-- add admission control and queuing for queries, but always executes DDLs and DMLs immediately;
-- add tests for JobManger, NodeManager, ClusterCapacity, ClusterCapacityVisitor, and IJobCapacityController;
-- enlarge the -Xmx setting for ManagixSqlppExecutionTest.
Change-Id: I8fb6fda57efa139114dd234e08cc7de7129468c8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1424
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
BAD: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/e0c232d2
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/e0c232d2
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/e0c232d2
Branch: refs/heads/master
Commit: e0c232d2764307e25db767f397a2fd7e577bf338
Parents: 6224966
Author: Yingyi Bu <yi...@couchbase.com>
Authored: Tue Jan 24 09:02:45 2017 -0800
Committer: Till Westmann <ti...@apache.org>
Committed: Wed Jan 25 06:45:40 2017 -0800
----------------------------------------------------------------------
.../apache/asterix/api/common/APIFramework.java | 119 +--
.../app/resource/RequiredCapacityVisitor.java | 364 +++++++++
.../bootstrap/CCApplicationEntryPoint.java | 14 +-
.../bootstrap/ClusterLifecycleListener.java | 5 +-
.../bootstrap/GlobalRecoveryManager.java | 3 +-
.../bootstrap/NCApplicationEntryPoint.java | 15 +
.../asterix/messaging/CCMessageBroker.java | 5 +-
.../org/apache/asterix/util/ResourceUtils.java | 70 ++
.../asterix/api/common/APIFrameworkTest.java | 84 ++-
.../resource/RequiredCapacityVisitorTest.java | 174 +++++
.../queries_sqlpp/tpcds/q09/q09.3.query.sqlpp | 3 +
.../common/api/IClusterEventsSubscriber.java | 3 +-
.../integrationts/asterix-configuration.xml | 2 +-
.../cluster/RemoveNodeWorkResponse.java | 3 +-
asterixdb/asterix-runtime/pom.xml | 11 +
.../job/resource/JobCapacityController.java | 76 ++
.../asterix/runtime/util/RuntimeUtils.java | 4 +-
.../job/resource/JobCapacityControllerTest.java | 98 +++
.../server/test/SampleLocalClusterIT.java | 4 +-
.../common/exceptions/AlgebricksException.java | 20 +
...ialFirstRuleCheckFixpointRuleController.java | 2 +-
hyracks-fullstack/hyracks/hyracks-api/pom.xml | 5 +
.../api/application/ICCApplicationContext.java | 11 +-
.../application/ICCApplicationEntryPoint.java | 8 +-
.../application/IClusterLifecycleListener.java | 3 +-
.../application/INCApplicationEntryPoint.java | 10 +-
.../hyracks/api/client/NodeControllerInfo.java | 10 +-
...ionActivityClusterGraphGeneratorFactory.java | 2 -
.../hyracks/api/exceptions/ErrorCode.java | 19 +-
.../api/exceptions/HyracksDataException.java | 80 +-
.../api/exceptions/HyracksException.java | 104 ++-
.../hyracks/api/job/JobSpecification.java | 32 +-
.../org/apache/hyracks/api/job/JobStatus.java | 4 +-
.../api/job/resource/ClusterCapacity.java | 125 ++++
.../resource/DefaultJobCapacityController.java | 40 +
.../api/job/resource/IClusterCapacity.java | 76 ++
.../job/resource/IJobCapacityController.java | 60 ++
.../job/resource/IReadOnlyClusterCapacity.java | 64 ++
.../hyracks/api/job/resource/NodeCapacity.java | 58 ++
.../src/main/resources/errormsg/en.properties | 19 +-
.../api/job/resource/ClusterCapacityTest.java | 85 +++
.../hyracks-control/hyracks-control-cc/pom.xml | 11 +
.../hyracks/control/cc/ClientInterfaceIPCI.java | 13 +-
.../control/cc/ClusterControllerIPCI.java | 6 +-
.../control/cc/ClusterControllerService.java | 84 +--
.../hyracks/control/cc/NodeControllerState.java | 22 +-
.../cc/adminconsole/pages/IndexPage.java | 4 +-
.../cc/adminconsole/pages/JobDetailsPage.java | 2 +-
.../cc/application/CCApplicationContext.java | 3 +-
.../control/cc/cluster/INodeManager.java | 114 +++
.../hyracks/control/cc/cluster/NodeManager.java | 186 +++++
.../cc/executor/ActivityClusterPlanner.java | 448 +++++++++++
.../cc/executor/ActivityPartitionDetails.java | 53 ++
.../control/cc/executor/JobExecutor.java | 723 ++++++++++++++++++
.../cc/executor/PartitionConstraintSolver.java | 129 ++++
.../cc/executor/RankedRunnableTaskCluster.java | 64 ++
.../control/cc/executor/Runnability.java | 105 +++
.../hyracks/control/cc/job/ActivityPlan.java | 2 +-
.../hyracks/control/cc/job/IJobManager.java | 112 +++
.../hyracks/control/cc/job/JobManager.java | 306 ++++++++
.../apache/hyracks/control/cc/job/JobRun.java | 52 +-
.../cc/partitions/PartitionMatchMaker.java | 4 +-
.../control/cc/partitions/PartitionUtils.java | 8 +-
.../cc/scheduler/ActivityClusterPlanner.java | 448 -----------
.../cc/scheduler/ActivityPartitionDetails.java | 53 --
.../control/cc/scheduler/FIFOJobQueue.java | 103 +++
.../hyracks/control/cc/scheduler/IJobQueue.java | 55 ++
.../control/cc/scheduler/IResourceManager.java | 54 ++
.../control/cc/scheduler/JobScheduler.java | 745 -------------------
.../cc/scheduler/PartitionConstraintSolver.java | 129 ----
.../cc/scheduler/RankedRunnableTaskCluster.java | 51 --
.../control/cc/scheduler/ResourceManager.java | 52 ++
.../control/cc/scheduler/Runnability.java | 105 ---
.../control/cc/web/JobsRESTAPIFunction.java | 4 +-
.../control/cc/web/NodesRESTAPIFunction.java | 11 +-
.../hyracks/control/cc/web/WebServer.java | 3 -
.../control/cc/work/AbstractHeartbeatWork.java | 7 +-
.../cc/work/AbstractTaskLifecycleWork.java | 4 +-
.../control/cc/work/CliDeployBinaryWork.java | 15 +-
.../control/cc/work/CliUnDeployBinaryWork.java | 14 +-
.../control/cc/work/ClusterShutdownWork.java | 12 +-
.../control/cc/work/GatherStateDumpsWork.java | 14 +-
.../work/GetActivityClusterGraphJSONWork.java | 18 +-
.../cc/work/GetIpAddressNodeNameMapWork.java | 9 +-
.../hyracks/control/cc/work/GetJobInfoWork.java | 13 +-
.../control/cc/work/GetJobRunJSONWork.java | 23 +-
.../control/cc/work/GetJobStatusWork.java | 14 +-
.../cc/work/GetJobSummariesJSONWork.java | 19 +-
.../cc/work/GetNodeControllersInfoWork.java | 20 +-
.../control/cc/work/GetNodeDetailsJSONWork.java | 25 +-
.../cc/work/GetNodeSummariesJSONWork.java | 16 +-
.../control/cc/work/GetThreadDumpWork.java | 5 +-
.../hyracks/control/cc/work/JobCleanupWork.java | 101 +--
.../hyracks/control/cc/work/JobStartWork.java | 19 +-
.../cc/work/JobletCleanupNotificationWork.java | 32 +-
.../control/cc/work/NotifyDeployBinaryWork.java | 2 +-
.../control/cc/work/NotifyShutdownWork.java | 2 +-
.../control/cc/work/RegisterNodeWork.java | 31 +-
.../work/RegisterPartitionAvailibilityWork.java | 5 +-
.../cc/work/RegisterPartitionRequestWork.java | 5 +-
.../control/cc/work/RemoveDeadNodesWork.java | 57 +-
.../control/cc/work/ReportProfilesWork.java | 14 +-
.../control/cc/work/TaskCompleteWork.java | 6 +-
.../control/cc/work/TaskFailureWork.java | 6 +-
.../control/cc/work/UnregisterNodeWork.java | 14 +-
.../cc/work/WaitForJobCompletionWork.java | 42 +-
.../control/cc/cluster/NodeManagerTest.java | 165 ++++
.../hyracks/control/cc/job/JobManagerTest.java | 238 ++++++
.../control/common/base/INodeController.java | 3 +-
.../control/common/controllers/CCConfig.java | 9 +
.../common/controllers/NodeRegistration.java | 10 +-
.../common/deployment/DeploymentRun.java | 3 +-
.../control/common/ipc/CCNCFunctions.java | 8 +-
.../common/ipc/NodeControllerRemoteProxy.java | 4 +-
.../control/common/shutdown/ShutdownRun.java | 3 +-
.../control/nc/NodeControllerService.java | 9 +-
.../apache/hyracks/control/nc/io/IOManager.java | 2 +-
.../hyracks/control/nc/work/StartTasksWork.java | 8 +-
.../btree/helper/NCApplicationEntryPoint.java | 7 +
.../search/AbstractTOccurrenceSearcher.java | 4 +-
120 files changed, 5008 insertions(+), 2187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 90a8599..c4535cf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -61,9 +61,10 @@ import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.runtime.util.AppContextInfo;
import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
-import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.util.ResourceUtils;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -89,6 +90,7 @@ import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConf
import org.apache.hyracks.api.client.IClusterInfoCollector;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
@@ -238,8 +240,9 @@ public class APIFramework {
int parallelism = getParallelism(querySpecificConfig.get(CompilerProperties.COMPILER_PARALLELISM_KEY),
compilerProperties.getParallelism());
- builder.setClusterLocations(parallelism == CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE
- ? metadataProvider.getClusterLocations() : getComputationLocations(clusterInfoCollector, parallelism));
+ AlgebricksAbsolutePartitionConstraint computationLocations = chooseLocations(clusterInfoCollector, parallelism,
+ metadataProvider.getClusterLocations());
+ builder.setClusterLocations(computationLocations);
ICompiler compiler = compilerFactory.createCompiler(plan, metadataProvider, t.getVarCounter());
if (conf.isOptimize()) {
@@ -314,6 +317,14 @@ public class APIFramework {
metadataProvider.isWriteTransaction());
JobSpecification spec = compiler.createJob(AppContextInfo.INSTANCE, jobEventListenerFactory);
+ // When the top-level statement is a query, the statement parameter is null.
+ if (statement == null) {
+ // Sets a required capacity, only for read-only queries.
+ // DDLs and DMLs are considered not that frequent.
+ spec.setRequiredClusterCapacity(ResourceUtils.getRequiredCompacity(plan, computationLocations,
+ sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize));
+ }
+
if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
printPlanPrefix(conf, "Hyracks job");
if (rwQ != null) {
@@ -364,54 +375,78 @@ public class APIFramework {
}
}
- // Computes the location constraints based on user-configured parallelism parameter.
- // Note that the parallelism parameter is only a hint -- it will not be respected if it is too small or too large.
- private AlgebricksAbsolutePartitionConstraint getComputationLocations(IClusterInfoCollector clusterInfoCollector,
- int parallelismHint) throws AlgebricksException {
+ // Chooses the location constraints, i.e., whether to use storage parallelism or use a user-sepcified number
+ // of cores.
+ private AlgebricksAbsolutePartitionConstraint chooseLocations(IClusterInfoCollector clusterInfoCollector,
+ int parallelismHint, AlgebricksAbsolutePartitionConstraint storageLocations) throws AlgebricksException {
try {
Map<String, NodeControllerInfo> ncMap = clusterInfoCollector.getNodeControllerInfos();
- // Unifies the handling of non-positive parallelism.
- int parallelism = parallelismHint <= 0 ? -2 * ncMap.size() : parallelismHint;
-
- // Calculates per node parallelism, with load balance, i.e., randomly selecting nodes with larger
- // parallelism.
- int numNodes = ncMap.size();
- int numNodesWithOneMorePartition = parallelism % numNodes;
- int perNodeParallelismMin = parallelism / numNodes;
- int perNodeParallelismMax = parallelism / numNodes + 1;
- List<String> allNodes = new ArrayList<>();
- Set<String> selectedNodesWithOneMorePartition = new HashSet<>();
- for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) {
- allNodes.add(entry.getKey());
- }
- Random random = new Random();
- for (int index = numNodesWithOneMorePartition; index >= 1; --index) {
- int pick = random.nextInt(index);
- selectedNodesWithOneMorePartition.add(allNodes.get(pick));
- Collections.swap(allNodes, pick, index - 1);
- }
+ // Gets total number of cores in the cluster.
+ int totalNumCores = getTotalNumCores(ncMap);
- // Generates cluster locations, which has duplicates for a node if it contains more than one partitions.
- List<String> locations = new ArrayList<>();
- for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) {
- String nodeId = entry.getKey();
- int numCores = entry.getValue().getNumCores();
- int availableCores = numCores > 1 ? numCores - 1 : numCores; // Reserves one core for heartbeat.
- int nodeParallelism = selectedNodesWithOneMorePartition.contains(nodeId) ? perNodeParallelismMax
- : perNodeParallelismMin;
- int coresToUse = nodeParallelism >= 0 && nodeParallelism < availableCores ? nodeParallelism
- : availableCores;
- for (int count = 0; count < coresToUse; ++count) {
- locations.add(nodeId);
- }
+ // If storage parallelism is not larger than the total number of cores, we use the storage parallelism.
+ // Otherwise, we will use all available cores.
+ if (parallelismHint == CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE
+ && storageLocations.getLocations().length <= totalNumCores) {
+ return storageLocations;
}
- return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
- } catch (Exception e) {
+ return getComputationLocations(ncMap, parallelismHint);
+ } catch (HyracksException e) {
throw new AlgebricksException(e);
}
}
+ // Computes the location constraints based on user-configured parallelism parameter.
+ // Note that the parallelism parameter is only a hint -- it will not be respected if it is too small or too large.
+ private AlgebricksAbsolutePartitionConstraint getComputationLocations(Map<String, NodeControllerInfo> ncMap,
+ int parallelismHint) {
+ // Unifies the handling of non-positive parallelism.
+ int parallelism = parallelismHint <= 0 ? -2 * ncMap.size() : parallelismHint;
+
+ // Calculates per node parallelism, with load balance, i.e., randomly selecting nodes with larger
+ // parallelism.
+ int numNodes = ncMap.size();
+ int numNodesWithOneMorePartition = parallelism % numNodes;
+ int perNodeParallelismMin = parallelism / numNodes;
+ int perNodeParallelismMax = parallelism / numNodes + 1;
+ List<String> allNodes = new ArrayList<>();
+ Set<String> selectedNodesWithOneMorePartition = new HashSet<>();
+ for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) {
+ allNodes.add(entry.getKey());
+ }
+ Random random = new Random();
+ for (int index = numNodesWithOneMorePartition; index >= 1; --index) {
+ int pick = random.nextInt(index);
+ selectedNodesWithOneMorePartition.add(allNodes.get(pick));
+ Collections.swap(allNodes, pick, index - 1);
+ }
+
+ // Generates cluster locations, which has duplicates for a node if it contains more than one partitions.
+ List<String> locations = new ArrayList<>();
+ for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) {
+ String nodeId = entry.getKey();
+ int availableCores = entry.getValue().getNumAvailableCores();
+ int nodeParallelism = selectedNodesWithOneMorePartition.contains(nodeId) ? perNodeParallelismMax
+ : perNodeParallelismMin;
+ int coresToUse = nodeParallelism >= 0 && nodeParallelism < availableCores ? nodeParallelism
+ : availableCores;
+ for (int count = 0; count < coresToUse; ++count) {
+ locations.add(nodeId);
+ }
+ }
+ return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
+ }
+
+ // Gets the total number of available cores in the cluster.
+ private int getTotalNumCores(Map<String, NodeControllerInfo> ncMap) {
+ int sum = 0;
+ for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) {
+ sum += entry.getValue().getNumAvailableCores();
+ }
+ return sum;
+ }
+
// Gets the frame limit.
private int getFrameLimit(String parameter, long memBudgetInConfiguration, int frameSize) {
IPropertyInterpreter<Long> longBytePropertyInterpreter = PropertyInterpreters.getLongBytePropertyInterpreter();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
new file mode 100644
index 0000000..3a6bfee
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.resource;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+
+// The current implementation aggregates the memory requirement for each operator.
+// TODO(buyingyi): consider stages for calculating the memory requirement.
+public class RequiredCapacityVisitor implements ILogicalOperatorVisitor<Void, Void> {
+
+ private static final long MAX_BUFFER_PER_CONNECTION = 1L;
+
+ private final long numComputationPartitions;
+ private final long groupByMemorySize;
+ private final long joinMemorySize;
+ private final long sortMemorySize;
+ private final long frameSize;
+ private final IClusterCapacity clusterCapacity;
+ private final Set<ILogicalOperator> visitedOperators = new HashSet<>();
+ private long stageMemorySoFar = 0L;
+
+ public RequiredCapacityVisitor(int numComputationPartitions, int sortFrameLimit, int groupFrameLimit,
+ int joinFrameLimit, int frameSize, IClusterCapacity clusterCapacity) {
+ this.numComputationPartitions = numComputationPartitions;
+ this.frameSize = frameSize;
+ this.groupByMemorySize = groupFrameLimit * (long) frameSize;
+ this.joinMemorySize = joinFrameLimit * (long) frameSize;
+ this.sortMemorySize = sortFrameLimit * (long) frameSize;
+ this.clusterCapacity = clusterCapacity;
+ this.clusterCapacity.setAggregatedCores(1); // At least one core is needed.
+ }
+
+ @Override
+ public Void visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+ calculateMemoryUsageForBlockingOperators(op, groupByMemorySize);
+ return null;
+ }
+
+ @Override
+ public Void visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+ calculateMemoryUsageForBlockingOperators(op, joinMemorySize);
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+ calculateMemoryUsageForBlockingOperators(op, joinMemorySize);
+ return null;
+ }
+
+ @Override
+ public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+ calculateMemoryUsageForBlockingOperators(op, sortMemorySize);
+ return null;
+ }
+
+ @Override
+ public Void visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ // Makes sure that the downstream of a replicate operator is only visited once.
+ if (!visitedOperators.contains(op)) {
+ visitedOperators.add(op);
+ visitInternal(op, true);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+ // Makes sure that the downstream of a split operator is only visited once.
+ if (!visitedOperators.contains(op)) {
+ visitedOperators.add(op);
+ visitInternal(op, true);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+ calculateMemoryUsageForExchange(op);
+ return null;
+ }
+
+ @Override
+ public Void visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
+ throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ @Override
+ public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+ visitInternal(op, true);
+ return null;
+ }
+
+ // Calculates the memory usage for exchange operators.
+ private void calculateMemoryUsageForExchange(ExchangeOperator op) throws AlgebricksException {
+ visitInternal(op, false);
+ IPhysicalOperator physicalOperator = op.getPhysicalOperator();
+ PhysicalOperatorTag physicalOperatorTag = physicalOperator.getOperatorTag();
+ if (physicalOperatorTag == PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE
+ || physicalOperatorTag == PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
+ addOutputBuffer(op);
+ return;
+ }
+ stageMemorySoFar += 2L * MAX_BUFFER_PER_CONNECTION * numComputationPartitions * numComputationPartitions
+ * frameSize;
+ clusterCapacity.setAggregatedMemoryByteSize(stageMemorySoFar);
+ }
+
+ // Calculates the cluster-wide memory usage for blocking activities like group-by, sort, and join.
+ private void calculateMemoryUsageForBlockingOperators(ILogicalOperator op, long memSize)
+ throws AlgebricksException {
+ visitInternal(op, false);
+ if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+ || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) {
+ stageMemorySoFar += memSize * numComputationPartitions;
+ } else {
+ stageMemorySoFar += memSize;
+ }
+ clusterCapacity.setAggregatedMemoryByteSize(stageMemorySoFar);
+ }
+
+ // Recursively visits input operators of an operator and sets the CPU core usage.
+ private void visitInternal(ILogicalOperator op, boolean toAddOuputBuffer) throws AlgebricksException {
+ for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+ inputOpRef.getValue().accept(this, null);
+ }
+ if (toAddOuputBuffer) {
+ addOutputBuffer(op);
+ }
+ setAvailableCores(op);
+ }
+
+ // Adds output buffer for an operator.
+ private void addOutputBuffer(ILogicalOperator op) {
+ if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+ || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) {
+ stageMemorySoFar += frameSize * numComputationPartitions; // every operator needs one output buffer.
+ } else {
+ stageMemorySoFar += frameSize; // every operator needs one output buffer.
+ }
+ clusterCapacity.setAggregatedMemoryByteSize(stageMemorySoFar);
+ }
+
+ // Sets the number of available cores
+ private void setAvailableCores(ILogicalOperator op) {
+ if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+ || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) {
+ clusterCapacity.setAggregatedCores((int) numComputationPartitions);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 19c00db..5756e7d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -30,7 +30,6 @@ import javax.servlet.Servlet;
import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.api.http.servlet.APIServlet;
-import org.apache.asterix.api.http.servlet.FullAPIServlet;
import org.apache.asterix.api.http.servlet.ClusterAPIServlet;
import org.apache.asterix.api.http.servlet.ClusterCCDetailsAPIServlet;
import org.apache.asterix.api.http.servlet.ClusterNodeDetailsAPIServlet;
@@ -38,6 +37,7 @@ import org.apache.asterix.api.http.servlet.ConnectorAPIServlet;
import org.apache.asterix.api.http.servlet.DDLAPIServlet;
import org.apache.asterix.api.http.servlet.DiagnosticsAPIServlet;
import org.apache.asterix.api.http.servlet.FeedServlet;
+import org.apache.asterix.api.http.servlet.FullAPIServlet;
import org.apache.asterix.api.http.servlet.QueryAPIServlet;
import org.apache.asterix.api.http.servlet.QueryResultAPIServlet;
import org.apache.asterix.api.http.servlet.QueryServiceServlet;
@@ -47,8 +47,8 @@ import org.apache.asterix.api.http.servlet.ServletConstants;
import org.apache.asterix.api.http.servlet.ShutdownAPIServlet;
import org.apache.asterix.api.http.servlet.UpdateAPIServlet;
import org.apache.asterix.api.http.servlet.VersionAPIServlet;
-import org.apache.asterix.app.cc.ResourceIdManager;
import org.apache.asterix.app.cc.CompilerExtensionManager;
+import org.apache.asterix.app.cc.ResourceIdManager;
import org.apache.asterix.app.external.ExternalLibraryUtils;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.config.AsterixExtension;
@@ -62,11 +62,13 @@ import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
+import org.apache.asterix.runtime.job.resource.JobCapacityController;
import org.apache.asterix.runtime.util.AppContextInfo;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.control.cc.ClusterControllerService;
@@ -85,6 +87,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
private static IAsterixStateProxy proxy;
protected ICCApplicationContext appCtx;
protected CompilerExtensionManager ccExtensionManager;
+ private IJobCapacityController jobCapacityController;
@Override
public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
@@ -131,6 +134,8 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
ccAppCtx.setMessageBroker(messageBroker);
+
+ jobCapacityController = new JobCapacityController(controllerService.getResourceManager());
}
protected List<AsterixExtension> getExtensions() {
@@ -330,6 +335,11 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
}
+ @Override
+ public IJobCapacityController getJobCapacityController() {
+ return jobCapacityController;
+ }
+
public static synchronized void setAsterixStateProxy(IAsterixStateProxy proxy) {
CCApplicationEntryPoint.proxy = proxy;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 7a4ff13..41d8b0d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -19,6 +19,7 @@
package org.apache.asterix.hyracks.bootstrap;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -93,7 +94,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
}
@Override
- public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException {
+ public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException {
for (String deadNode : deadNodeIds) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("NC: " + deadNode + " left");
@@ -118,7 +119,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
}
}
- private void updateProgress(ClusterEventType eventType, Set<String> nodeIds) {
+ private void updateProgress(ClusterEventType eventType, Collection<String> nodeIds) {
List<IClusterManagementWorkResponse> completedResponses = new ArrayList<IClusterManagementWorkResponse>();
boolean isComplete = false;
for (IClusterManagementWorkResponse resp : pendingWorkResponses) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index f64f998..d437b5b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.hyracks.bootstrap;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -59,7 +60,7 @@ public class GlobalRecoveryManager implements IGlobalRecoveryMaanger {
}
@Override
- public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
+ public Set<IClusterManagementWork> notifyNodeFailure(Collection<String> deadNodeIds) {
setState(ClusterStateManager.INSTANCE.getState());
ClusterStateManager.INSTANCE.setGlobalRecoveryCompleted(false);
return Collections.emptySet();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 8998c6b..bc270df 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -32,6 +32,7 @@ import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.config.AsterixExtension;
import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.IPropertiesProvider;
@@ -51,6 +52,7 @@ import org.apache.asterix.transaction.management.resource.PersistentLocalResourc
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.application.INCApplicationEntryPoint;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.messages.IMessageBroker;
@@ -261,6 +263,19 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
performLocalCleanUp();
}
+ @Override
+ public NodeCapacity getCapacity() {
+ IPropertiesProvider propertiesProvider = (IPropertiesProvider) runtimeContext;
+ StorageProperties storageProperties = propertiesProvider.getStorageProperties();
+ // Deducts the reserved buffer cache size and memory component size from the maxium heap size,
+ // and deducts one core for processing heartbeats.
+ long memorySize = Runtime.getRuntime().maxMemory() - storageProperties.getBufferCacheSize()
+ - storageProperties.getMemoryComponentGlobalBudget();
+ int allCores = Runtime.getRuntime().availableProcessors();
+ int maximumCoresForComputation = allCores > 1 ? allCores - 1 : allCores;
+ return new NodeCapacity(memorySize, maximumCoresForComputation);
+ }
+
private void performLocalCleanUp() {
//Delete working area files from failed jobs
runtimeContext.getIOManager().deleteWorkspaceFiles();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index d1d7ff7..d785cce 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -28,6 +28,7 @@ import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
public class CCMessageBroker implements ICCMessageBroker {
@@ -49,8 +50,8 @@ public class CCMessageBroker implements ICCMessageBroker {
@Override
public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception {
- Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
- NodeControllerState state = nodeMap.get(nodeId);
+ INodeManager nodeManager = ccs.getNodeManager();
+ NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java
new file mode 100644
index 0000000..50a21bc
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.util;
+
+import org.apache.asterix.app.resource.RequiredCapacityVisitor;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+
+public class ResourceUtils {
+
+ private ResourceUtils() {
+ }
+
+ /**
+ * Calculates the required cluster capacity from a given query plan, the computation locations,
+ * the operator memory budgets, and frame size.
+ *
+ * @param plan,
+ * a given query plan.
+ * @param computationLocations,
+ * the partitions for computation.
+ * @param sortFrameLimit,
+ * the frame limit for one sorter partition.
+ * @param groupFrameLimit,
+ * the frame limit for one group-by partition.
+ * @param joinFrameLimit
+ * the frame limit for one joiner partition.
+ * @param frameSize
+ * the frame size used in query execution.
+ * @return the required cluster capacity for executing the query.
+ * @throws AlgebricksException
+ * if the query plan is malformed.
+ */
+ public static IClusterCapacity getRequiredCompacity(ILogicalPlan plan,
+ AlgebricksAbsolutePartitionConstraint computationLocations, int sortFrameLimit, int groupFrameLimit,
+ int joinFrameLimit, int frameSize)
+ throws AlgebricksException {
+ // Creates a cluster capacity visitor.
+ IClusterCapacity clusterCapacity = new ClusterCapacity();
+ RequiredCapacityVisitor visitor = new RequiredCapacityVisitor(computationLocations.getLocations().length,
+ sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize, clusterCapacity);
+
+ // There could be only one root operator for a top-level query plan.
+ ILogicalOperator rootOp = plan.getRoots().get(0).getValue();
+ rootOp.accept(visitor, null);
+ return clusterCapacity;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
index a79053e..e041021 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
@@ -27,8 +27,10 @@ import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
+import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IClusterInfoCollector;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.junit.Assert;
@@ -39,15 +41,16 @@ import junit.extensions.PA;
public class APIFrameworkTest {
@Test
- public void testGetComputationLocations() throws Exception {
+ public void testChooseLocations() throws Exception {
+ // Mocks cluster info collector.
IClusterInfoCollector clusterInfoCollector = mock(IClusterInfoCollector.class);
// Constructs mocked cluster nodes.
Map<String, NodeControllerInfo> map = new HashMap<>();
NodeControllerInfo nc1Info = mock(NodeControllerInfo.class);
- when(nc1Info.getNumCores()).thenReturn(4);
+ when(nc1Info.getNumAvailableCores()).thenReturn(1);
NodeControllerInfo nc2Info = mock(NodeControllerInfo.class);
- when(nc2Info.getNumCores()).thenReturn(4);
+ when(nc2Info.getNumAvailableCores()).thenReturn(1);
String nc1 = "nc1";
String nc2 = "nc2";
map.put(nc1, nc1Info);
@@ -57,10 +60,56 @@ public class APIFrameworkTest {
// Creates an APIFramework.
APIFramework apiFramework = new APIFramework(mock(ILangCompilationProvider.class));
+ // Tests large storage locations.
+ AlgebricksAbsolutePartitionConstraint storageLocations = new AlgebricksAbsolutePartitionConstraint(
+ new String[] { "node1", "node1", "node2" });
+ AlgebricksAbsolutePartitionConstraint computationLocations = (AlgebricksAbsolutePartitionConstraint) PA
+ .invokeMethod(apiFramework,
+ "chooseLocations(" + IClusterInfoCollector.class.getName() + ",int,"
+ + AlgebricksAbsolutePartitionConstraint.class.getName() + ")",
+ clusterInfoCollector, CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE, storageLocations);
+ Assert.assertTrue(computationLocations.getLocations().length == 2);
+
+ // Tests suitable storage locations.
+ storageLocations = new AlgebricksAbsolutePartitionConstraint(new String[] { "node1", "node2" });
+ computationLocations = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework,
+ "chooseLocations(" + IClusterInfoCollector.class.getName() + ",int,"
+ + AlgebricksAbsolutePartitionConstraint.class.getName() + ")",
+ clusterInfoCollector, CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE, storageLocations);
+ Assert.assertTrue(computationLocations.getLocations().length == 2);
+
+ // Tests small storage locations.
+ storageLocations = new AlgebricksAbsolutePartitionConstraint(new String[] { "node1" });
+ computationLocations = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework,
+ "chooseLocations(" + IClusterInfoCollector.class.getName() + ",int,"
+ + AlgebricksAbsolutePartitionConstraint.class.getName() + ")",
+ clusterInfoCollector, CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE, storageLocations);
+ Assert.assertTrue(computationLocations.getLocations().length == 1);
+
+ // Verifies the number of calls on clusterInfoCollector.getNodeControllerInfos() in
+ // APIFramework.chooseLocations(...).
+ verify(clusterInfoCollector, times(3)).getNodeControllerInfos();
+ }
+
+ @Test
+ public void testGetComputationLocations() throws AlgebricksException {
+ // Constructs mocked cluster nodes.
+ Map<String, NodeControllerInfo> map = new HashMap<>();
+ NodeControllerInfo nc1Info = mock(NodeControllerInfo.class);
+ when(nc1Info.getNumAvailableCores()).thenReturn(4);
+ NodeControllerInfo nc2Info = mock(NodeControllerInfo.class);
+ when(nc2Info.getNumAvailableCores()).thenReturn(4);
+ String nc1 = "nc1";
+ String nc2 = "nc2";
+ map.put(nc1, nc1Info);
+ map.put(nc2, nc2Info);
+
+ // Creates an APIFramework.
+ APIFramework apiFramework = new APIFramework(mock(ILangCompilationProvider.class));
+
// Tests odd number parallelism.
AlgebricksAbsolutePartitionConstraint loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(
- apiFramework, "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)",
- clusterInfoCollector, 5);
+ apiFramework, "getComputationLocations(java.util.Map,int)", map, 5);
int nc1Count = 0, nc2Count = 0;
String[] partitions = loc.getLocations();
for (String partition : partitions) {
@@ -78,7 +127,7 @@ public class APIFrameworkTest {
// Tests even number parallelism.
loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework,
- "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector, 8);
+ "getComputationLocations(java.util.Map,int)", map, 8);
nc1Count = 0;
nc2Count = 0;
partitions = loc.getLocations();
@@ -93,40 +142,35 @@ public class APIFrameworkTest {
Assert.assertTrue(nc1Count > 0);
Assert.assertTrue(nc2Count > 0);
Assert.assertTrue(Math.abs(nc1Count - nc2Count) == 0); // Tests load balance.
- // The maximum parallelism cannot be beyond n *(#core-1), where n is the number of NCs and #core is the number
+ // The maximum parallelism cannot be beyond n * core, where n is the number of NCs and #core is the number
// of cores per NC.
- Assert.assertTrue(partitions.length == 6);
+ Assert.assertTrue(partitions.length == 8);
// Tests the case when parallelism is one.
loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework,
- "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector, 1);
+ "getComputationLocations(java.util.Map,int)", map, 1);
Assert.assertTrue(loc.getLocations().length == 1);
// Tests the case when parallelism is a negative.
// In this case, the compiler has no idea and falls back to the default setting where all possible cores
// are used.
loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework,
- "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector,
- -100);
- Assert.assertTrue(loc.getLocations().length == 6);
+ "getComputationLocations(java.util.Map,int)", map, -100);
+ Assert.assertTrue(loc.getLocations().length == 8);
// Tests the case when parallelism is -1.
// In this case, the compiler has no idea and falls back to the default setting where all possible cores
// are used.
loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework,
- "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector, -1);
- Assert.assertTrue(loc.getLocations().length == 6);
+ "getComputationLocations(java.util.Map,int)", map, -1);
+ Assert.assertTrue(loc.getLocations().length == 8);
// Tests the case when parallelism is zero.
// In this case, the compiler has no idea and falls back to the default setting where all possible cores
// are used.
loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework,
- "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector, 0);
- Assert.assertTrue(loc.getLocations().length == 6);
-
- // Verifies the number of calls on clusterInfoCollector.getNodeControllerInfos() in
- // APIFramework.getComputationLocations(...).
- verify(clusterInfoCollector, times(6)).getNodeControllerInfos();
+ "getComputationLocations(java.util.Map,int)", map, 0);
+ Assert.assertTrue(loc.getLocations().length == 8);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java
new file mode 100644
index 0000000..cc18c31
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.resource;
+
+import java.util.Collections;
+
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RequiredCapacityVisitorTest {
+
+ private static final long MEMORY_BUDGET = 33554432L;
+ private static final int FRAME_SIZE = 32768;
+ private static final int FRAME_LIMIT = (int) (MEMORY_BUDGET / FRAME_SIZE);
+ private static final int PARALLELISM = 10;
+
+ @Test
+ public void testParallelGroupBy() throws AlgebricksException {
+ IClusterCapacity clusterCapacity = new ClusterCapacity();
+ RequiredCapacityVisitor visitor = makeComputationCapacityVisitor(PARALLELISM, clusterCapacity);
+
+ // Constructs a parallel group-by query plan.
+ GroupByOperator globalGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+ ExchangeOperator exchange = new ExchangeOperator();
+ exchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+ GroupByOperator localGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.LOCAL);
+ globalGby.getInputs().add(new MutableObject<>(exchange));
+ exchange.getInputs().add(new MutableObject<>(localGby));
+
+ // Verifies the calculated cluster capacity requirement for the test quer plan.
+ globalGby.accept(visitor, null);
+ Assert.assertTrue(clusterCapacity.getAggregatedCores() == PARALLELISM);
+ Assert.assertTrue(clusterCapacity.getAggregatedMemoryByteSize() == 2 * MEMORY_BUDGET * PARALLELISM
+ + 2 * FRAME_SIZE * PARALLELISM * PARALLELISM);
+ }
+
+ @Test
+ public void testUnPartitionedGroupBy() throws AlgebricksException {
+ IClusterCapacity clusterCapacity = new ClusterCapacity();
+ RequiredCapacityVisitor visitor = makeComputationCapacityVisitor(PARALLELISM, clusterCapacity);
+
+ // Constructs a parallel group-by query plan.
+ GroupByOperator globalGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+ ExchangeOperator exchange = new ExchangeOperator();
+ exchange.setPhysicalOperator(new OneToOneExchangePOperator());
+ exchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+ GroupByOperator localGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+ globalGby.getInputs().add(new MutableObject<>(exchange));
+ exchange.getInputs().add(new MutableObject<>(localGby));
+
+ // Verifies the calculated cluster capacity requirement for the test quer plan.
+ globalGby.accept(visitor, null);
+ Assert.assertTrue(clusterCapacity.getAggregatedCores() == 1);
+ Assert.assertTrue(clusterCapacity.getAggregatedMemoryByteSize() == 2 * MEMORY_BUDGET + FRAME_SIZE);
+ }
+
+ @Test
+ public void testParallelJoin() throws AlgebricksException {
+ IClusterCapacity clusterCapacity = new ClusterCapacity();
+ RequiredCapacityVisitor visitor = makeComputationCapacityVisitor(PARALLELISM, clusterCapacity);
+
+ // Constructs a join query plan.
+ InnerJoinOperator join = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+
+ // Left child plan of the join.
+ ExchangeOperator leftChildExchange = new ExchangeOperator();
+ leftChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+ leftChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+ InnerJoinOperator leftChild = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+ join.getInputs().add(new MutableObject<>(leftChildExchange));
+ leftChildExchange.getInputs().add(new MutableObject<>(leftChild));
+ EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
+ ets.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+ leftChild.getInputs().add(new MutableObject<>(ets));
+ leftChild.getInputs().add(new MutableObject<>(ets));
+
+ // Right child plan of the join.
+ ExchangeOperator rightChildExchange = new ExchangeOperator();
+ rightChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+ rightChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+ GroupByOperator rightChild = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.LOCAL);
+ join.getInputs().add(new MutableObject<>(rightChildExchange));
+ rightChildExchange.getInputs().add(new MutableObject<>(rightChild));
+ rightChild.getInputs().add(new MutableObject<>(ets));
+
+ // Verifies the calculated cluster capacity requirement for the test quer plan.
+ join.accept(visitor, null);
+ Assert.assertTrue(clusterCapacity.getAggregatedCores() == PARALLELISM);
+ Assert.assertTrue(clusterCapacity.getAggregatedMemoryByteSize() == 3 * MEMORY_BUDGET * PARALLELISM
+ + 2 * 2L * PARALLELISM * PARALLELISM * FRAME_SIZE + 3 * FRAME_SIZE * PARALLELISM);
+ }
+
+ @Test
+ public void testUnPartitionedJoin() throws AlgebricksException {
+ IClusterCapacity clusterCapacity = new ClusterCapacity();
+ RequiredCapacityVisitor visitor = makeComputationCapacityVisitor(PARALLELISM, clusterCapacity);
+
+ // Constructs a join query plan.
+ InnerJoinOperator join = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+
+ // Left child plan of the join.
+ ExchangeOperator leftChildExchange = new ExchangeOperator();
+ leftChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+ leftChildExchange.setPhysicalOperator(new OneToOneExchangePOperator());
+ InnerJoinOperator leftChild = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+ join.getInputs().add(new MutableObject<>(leftChildExchange));
+ leftChildExchange.getInputs().add(new MutableObject<>(leftChild));
+ EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
+ ets.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+ leftChild.getInputs().add(new MutableObject<>(ets));
+ leftChild.getInputs().add(new MutableObject<>(ets));
+
+ // Right child plan of the join.
+ ExchangeOperator rightChildExchange = new ExchangeOperator();
+ rightChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+ rightChildExchange.setPhysicalOperator(new OneToOneExchangePOperator());
+ GroupByOperator rightChild = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+ join.getInputs().add(new MutableObject<>(rightChildExchange));
+ rightChildExchange.getInputs().add(new MutableObject<>(rightChild));
+ rightChild.getInputs().add(new MutableObject<>(ets));
+
+ // Verifies the calculated cluster capacity requirement for the test quer plan.
+ join.accept(visitor, null);
+ Assert.assertTrue(clusterCapacity.getAggregatedCores() == 1);
+ Assert.assertTrue(clusterCapacity.getAggregatedMemoryByteSize() == 3 * MEMORY_BUDGET + 5L * FRAME_SIZE);
+ }
+
+ private RequiredCapacityVisitor makeComputationCapacityVisitor(int numComputationPartitions,
+ IClusterCapacity clusterCapacity) {
+ return new RequiredCapacityVisitor(numComputationPartitions, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_SIZE,
+ clusterCapacity);
+ }
+
+ private GroupByOperator makeGroupByOperator(AbstractLogicalOperator.ExecutionMode exeMode) {
+ GroupByOperator groupByOperator = new GroupByOperator();
+ groupByOperator.setExecutionMode(exeMode);
+ return groupByOperator;
+ }
+
+ private InnerJoinOperator makeJoinOperator(AbstractLogicalOperator.ExecutionMode exeMode) {
+ InnerJoinOperator joinOperator = new InnerJoinOperator(new MutableObject<>(ConstantExpression.TRUE));
+ joinOperator.setExecutionMode(exeMode);
+ return joinOperator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp
index 774747e..e0ae61c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp
@@ -20,6 +20,9 @@
use tpcds;
+set `compiler.joinmemory` "4MB"
+set `compiler.groupmemory` "4MB"
+
select case when (select value count(ss)
from store_sales ss
where ss_quantity >= 1 and ss_quantity <= 20) > 25437
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
index 3a98b82..fef4e31 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
@@ -18,6 +18,7 @@ package org.apache.asterix.common.api;
* specific language governing permissions and limitations
* under the License.
*/
+import java.util.Collection;
import java.util.Set;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
@@ -28,7 +29,7 @@ public interface IClusterEventsSubscriber {
* @param deadNodeIds
* @return
*/
- public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds);
+ public Set<IClusterManagementWork> notifyNodeFailure(Collection<String> deadNodeIds);
/**
* @param joinedNodeId
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml b/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml
index 9992009..a5ecc6b 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml
@@ -20,7 +20,7 @@
<property>
<name>nc.java.opts</name>
- <value>-Xmx3096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"</value>
+ <value>-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"</value>
<description>JVM parameters for each Node Contoller (NC)</description>
</property>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java
index 580aab7..34b873c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.metadata.cluster;
+import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
@@ -30,7 +31,7 @@ public class RemoveNodeWorkResponse extends ClusterManagementWorkResponse {
nodesToBeRemoved.addAll(w.getNodesToBeRemoved());
}
- public boolean updateProgress(Set<String> failedNodeIds) {
+ public boolean updateProgress(Collection<String> failedNodeIds) {
nodesToBeRemoved.removeAll(failedNodeIds);
return nodesToBeRemoved.isEmpty();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/pom.xml b/asterixdb/asterix-runtime/pom.xml
index 1ccdc76..6458fb0 100644
--- a/asterixdb/asterix-runtime/pom.xml
+++ b/asterixdb/asterix-runtime/pom.xml
@@ -140,5 +140,16 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.10.19</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
new file mode 100644
index 0000000..8ea1fa7
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.job.resource;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+import org.apache.hyracks.control.cc.scheduler.IResourceManager;
+
+// To avoid the computation cost for checking the capacity constraint for each node,
+// currently the admit/allocation decisions are based on the aggregated resource information.
+// TODO(buyingyi): investigate partition-aware resource control.
+public class JobCapacityController implements IJobCapacityController {
+
+ private final IResourceManager resourceManager;
+
+ public JobCapacityController(IResourceManager resourceManager) {
+ this.resourceManager = resourceManager;
+ }
+
+ @Override
+ public JobSubmissionStatus allocate(JobSpecification job) throws HyracksException {
+ IClusterCapacity requiredCapacity = job.getRequiredClusterCapacity();
+ long reqAggregatedMemoryByteSize = requiredCapacity.getAggregatedMemoryByteSize();
+ int reqAggregatedNumCores = requiredCapacity.getAggregatedCores();
+ IReadOnlyClusterCapacity maximumCapacity = resourceManager.getMaximumCapacity();
+ if (!(reqAggregatedMemoryByteSize <= maximumCapacity.getAggregatedMemoryByteSize()
+ && reqAggregatedNumCores <= maximumCapacity.getAggregatedCores())) {
+ throw HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, requiredCapacity.toString(),
+ maximumCapacity.toString());
+ }
+ IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity();
+ long currentAggregatedMemoryByteSize = currentCapacity.getAggregatedMemoryByteSize();
+ int currentAggregatedAvailableCores = currentCapacity.getAggregatedCores();
+ if (!(reqAggregatedMemoryByteSize <= currentAggregatedMemoryByteSize
+ && reqAggregatedNumCores <= currentAggregatedAvailableCores)) {
+ return JobSubmissionStatus.QUEUE;
+ }
+ currentCapacity.setAggregatedMemoryByteSize(currentAggregatedMemoryByteSize - reqAggregatedMemoryByteSize);
+ currentCapacity.setAggregatedCores(currentAggregatedAvailableCores - reqAggregatedNumCores);
+ return JobSubmissionStatus.EXECUTE;
+ }
+
+ @Override
+ public void release(JobSpecification job) {
+ IClusterCapacity requiredCapacity = job.getRequiredClusterCapacity();
+ long reqAggregatedMemoryByteSize = requiredCapacity.getAggregatedMemoryByteSize();
+ int reqAggregatedNumCores = requiredCapacity.getAggregatedCores();
+ IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity();
+ long aggregatedMemoryByteSize = currentCapacity.getAggregatedMemoryByteSize();
+ int aggregatedNumCores = currentCapacity.getAggregatedCores();
+ currentCapacity.setAggregatedMemoryByteSize(aggregatedMemoryByteSize + reqAggregatedMemoryByteSize);
+ currentCapacity.setAggregatedCores(aggregatedNumCores + reqAggregatedNumCores);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
index ed93a2c..608def7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
/**
* Utility class for obtaining information on the set of Hyracks NodeController
@@ -61,6 +62,7 @@ public class RuntimeUtils {
public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) {
ClusterControllerService ccs = (ClusterControllerService) AppContextInfo.INSTANCE
.getCCApplicationContext().getControllerService();
- map.putAll(ccs.getIpAddressNodeNameMap());
+ INodeManager nodeManager = ccs.getNodeManager();
+ map.putAll(nodeManager.getIpAddressNodeNameMap());
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
new file mode 100644
index 0000000..4a63885
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.job.resource;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.control.cc.scheduler.IResourceManager;
+import org.apache.hyracks.control.cc.scheduler.ResourceManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JobCapacityControllerTest {
+
+ @Test
+ public void test() throws HyracksException {
+ IResourceManager resourceManager = makeResourceManagerWithCapacity(4294967296L, 33);
+ JobCapacityController capacityController = new JobCapacityController(resourceManager);
+
+ // Verifies the correctness of the allocate method.
+ Assert.assertTrue(capacityController.allocate(
+ makeJobWithRequiredCapacity(4294967296L, 16)) == IJobCapacityController.JobSubmissionStatus.EXECUTE);
+ Assert.assertTrue(capacityController.allocate(
+ makeJobWithRequiredCapacity(2147483648L, 16)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
+ Assert.assertTrue(capacityController.allocate(
+ makeJobWithRequiredCapacity(2147483648L, 32)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
+
+ boolean exceedCapacity = false;
+ try {
+ capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 64));
+ } catch (HyracksException e) {
+ exceedCapacity = e.getErrorCode() == ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY;
+ }
+ Assert.assertTrue(exceedCapacity);
+ Assert.assertTrue(capacityController.allocate(
+ makeJobWithRequiredCapacity(4294967296L, 32)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
+ exceedCapacity = false;
+ try {
+ capacityController.allocate(makeJobWithRequiredCapacity(4294967297L, 33));
+ } catch (HyracksException e) {
+ exceedCapacity = e.getErrorCode() == ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY;
+ }
+ Assert.assertTrue(exceedCapacity);
+
+ // Verifies that the release method does not leak resource.
+ capacityController.release(makeJobWithRequiredCapacity(4294967296L, 16));
+ Assert.assertTrue(resourceManager.getCurrentCapacity().getAggregatedMemoryByteSize() == 4294967296L);
+ Assert.assertTrue(resourceManager.getCurrentCapacity().getAggregatedCores() == 33);
+ }
+
+ private IResourceManager makeResourceManagerWithCapacity(long memorySize, int cores) throws HyracksException {
+ IResourceManager resourceManager = new ResourceManager();
+ resourceManager.update("node1", new NodeCapacity(memorySize, cores));
+ return resourceManager;
+ }
+
+ private JobSpecification makeJobWithRequiredCapacity(long memorySize, int cores) {
+ // Generates cluster capacity.
+ IClusterCapacity clusterCapacity = makeComputationCapacity(memorySize, cores);
+
+ // Generates a job.
+ JobSpecification job = mock(JobSpecification.class);
+ when(job.getRequiredClusterCapacity()).thenReturn(clusterCapacity);
+ return job;
+ }
+
+ private IClusterCapacity makeComputationCapacity(long memorySize, int cores) {
+ IClusterCapacity clusterCapacity = new ClusterCapacity();
+ clusterCapacity.setAggregatedMemoryByteSize(memorySize);
+ clusterCapacity.setAggregatedCores(cores);
+ return clusterCapacity;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
index 4a5f90f..83d421f 100644
--- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
+++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
@@ -78,9 +78,11 @@ public class SampleLocalClusterIT {
@Test
public void test0_startCluster() throws Exception {
- Process process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/start-sample-cluster.sh"))
+ Process process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/stop-sample-cluster.sh"), "-f")
.inheritIO().start();
Assert.assertEquals(0, process.waitFor());
+ process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/start-sample-cluster.sh")).inheritIO().start();
+ Assert.assertEquals(0, process.waitFor());
}
@Test