You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ti...@apache.org on 2017/01/25 14:46:40 UTC

[7/7] asterixdb git commit: Implements concurrent query management support.

Implements concurrent query management support.

The following changes are included:
-- factor out JobManager, NodeManager, and ResourceManager from ClusterControllerService;
-- let each application plugin its own IJobCapacityController implementation;
-- let each job specify its required cluster capacity;
-- add a required cluster capacity estimation visitor for optimized query plans;
-- add admission control and queuing for queries, but always executes DDLs and DMLs immediately;
-- add tests for JobManger, NodeManager, ClusterCapacity, ClusterCapacityVisitor, and IJobCapacityController;
-- enlarge the -Xmx setting for ManagixSqlppExecutionTest.

Change-Id: I8fb6fda57efa139114dd234e08cc7de7129468c8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1424
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
BAD: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/e0c232d2
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/e0c232d2
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/e0c232d2

Branch: refs/heads/master
Commit: e0c232d2764307e25db767f397a2fd7e577bf338
Parents: 6224966
Author: Yingyi Bu <yi...@couchbase.com>
Authored: Tue Jan 24 09:02:45 2017 -0800
Committer: Till Westmann <ti...@apache.org>
Committed: Wed Jan 25 06:45:40 2017 -0800

----------------------------------------------------------------------
 .../apache/asterix/api/common/APIFramework.java | 119 +--
 .../app/resource/RequiredCapacityVisitor.java   | 364 +++++++++
 .../bootstrap/CCApplicationEntryPoint.java      |  14 +-
 .../bootstrap/ClusterLifecycleListener.java     |   5 +-
 .../bootstrap/GlobalRecoveryManager.java        |   3 +-
 .../bootstrap/NCApplicationEntryPoint.java      |  15 +
 .../asterix/messaging/CCMessageBroker.java      |   5 +-
 .../org/apache/asterix/util/ResourceUtils.java  |  70 ++
 .../asterix/api/common/APIFrameworkTest.java    |  84 ++-
 .../resource/RequiredCapacityVisitorTest.java   | 174 +++++
 .../queries_sqlpp/tpcds/q09/q09.3.query.sqlpp   |   3 +
 .../common/api/IClusterEventsSubscriber.java    |   3 +-
 .../integrationts/asterix-configuration.xml     |   2 +-
 .../cluster/RemoveNodeWorkResponse.java         |   3 +-
 asterixdb/asterix-runtime/pom.xml               |  11 +
 .../job/resource/JobCapacityController.java     |  76 ++
 .../asterix/runtime/util/RuntimeUtils.java      |   4 +-
 .../job/resource/JobCapacityControllerTest.java |  98 +++
 .../server/test/SampleLocalClusterIT.java       |   4 +-
 .../common/exceptions/AlgebricksException.java  |  20 +
 ...ialFirstRuleCheckFixpointRuleController.java |   2 +-
 hyracks-fullstack/hyracks/hyracks-api/pom.xml   |   5 +
 .../api/application/ICCApplicationContext.java  |  11 +-
 .../application/ICCApplicationEntryPoint.java   |   8 +-
 .../application/IClusterLifecycleListener.java  |   3 +-
 .../application/INCApplicationEntryPoint.java   |  10 +-
 .../hyracks/api/client/NodeControllerInfo.java  |  10 +-
 ...ionActivityClusterGraphGeneratorFactory.java |   2 -
 .../hyracks/api/exceptions/ErrorCode.java       |  19 +-
 .../api/exceptions/HyracksDataException.java    |  80 +-
 .../api/exceptions/HyracksException.java        | 104 ++-
 .../hyracks/api/job/JobSpecification.java       |  32 +-
 .../org/apache/hyracks/api/job/JobStatus.java   |   4 +-
 .../api/job/resource/ClusterCapacity.java       | 125 ++++
 .../resource/DefaultJobCapacityController.java  |  40 +
 .../api/job/resource/IClusterCapacity.java      |  76 ++
 .../job/resource/IJobCapacityController.java    |  60 ++
 .../job/resource/IReadOnlyClusterCapacity.java  |  64 ++
 .../hyracks/api/job/resource/NodeCapacity.java  |  58 ++
 .../src/main/resources/errormsg/en.properties   |  19 +-
 .../api/job/resource/ClusterCapacityTest.java   |  85 +++
 .../hyracks-control/hyracks-control-cc/pom.xml  |  11 +
 .../hyracks/control/cc/ClientInterfaceIPCI.java |  13 +-
 .../control/cc/ClusterControllerIPCI.java       |   6 +-
 .../control/cc/ClusterControllerService.java    |  84 +--
 .../hyracks/control/cc/NodeControllerState.java |  22 +-
 .../cc/adminconsole/pages/IndexPage.java        |   4 +-
 .../cc/adminconsole/pages/JobDetailsPage.java   |   2 +-
 .../cc/application/CCApplicationContext.java    |   3 +-
 .../control/cc/cluster/INodeManager.java        | 114 +++
 .../hyracks/control/cc/cluster/NodeManager.java | 186 +++++
 .../cc/executor/ActivityClusterPlanner.java     | 448 +++++++++++
 .../cc/executor/ActivityPartitionDetails.java   |  53 ++
 .../control/cc/executor/JobExecutor.java        | 723 ++++++++++++++++++
 .../cc/executor/PartitionConstraintSolver.java  | 129 ++++
 .../cc/executor/RankedRunnableTaskCluster.java  |  64 ++
 .../control/cc/executor/Runnability.java        | 105 +++
 .../hyracks/control/cc/job/ActivityPlan.java    |   2 +-
 .../hyracks/control/cc/job/IJobManager.java     | 112 +++
 .../hyracks/control/cc/job/JobManager.java      | 306 ++++++++
 .../apache/hyracks/control/cc/job/JobRun.java   |  52 +-
 .../cc/partitions/PartitionMatchMaker.java      |   4 +-
 .../control/cc/partitions/PartitionUtils.java   |   8 +-
 .../cc/scheduler/ActivityClusterPlanner.java    | 448 -----------
 .../cc/scheduler/ActivityPartitionDetails.java  |  53 --
 .../control/cc/scheduler/FIFOJobQueue.java      | 103 +++
 .../hyracks/control/cc/scheduler/IJobQueue.java |  55 ++
 .../control/cc/scheduler/IResourceManager.java  |  54 ++
 .../control/cc/scheduler/JobScheduler.java      | 745 -------------------
 .../cc/scheduler/PartitionConstraintSolver.java | 129 ----
 .../cc/scheduler/RankedRunnableTaskCluster.java |  51 --
 .../control/cc/scheduler/ResourceManager.java   |  52 ++
 .../control/cc/scheduler/Runnability.java       | 105 ---
 .../control/cc/web/JobsRESTAPIFunction.java     |   4 +-
 .../control/cc/web/NodesRESTAPIFunction.java    |  11 +-
 .../hyracks/control/cc/web/WebServer.java       |   3 -
 .../control/cc/work/AbstractHeartbeatWork.java  |   7 +-
 .../cc/work/AbstractTaskLifecycleWork.java      |   4 +-
 .../control/cc/work/CliDeployBinaryWork.java    |  15 +-
 .../control/cc/work/CliUnDeployBinaryWork.java  |  14 +-
 .../control/cc/work/ClusterShutdownWork.java    |  12 +-
 .../control/cc/work/GatherStateDumpsWork.java   |  14 +-
 .../work/GetActivityClusterGraphJSONWork.java   |  18 +-
 .../cc/work/GetIpAddressNodeNameMapWork.java    |   9 +-
 .../hyracks/control/cc/work/GetJobInfoWork.java |  13 +-
 .../control/cc/work/GetJobRunJSONWork.java      |  23 +-
 .../control/cc/work/GetJobStatusWork.java       |  14 +-
 .../cc/work/GetJobSummariesJSONWork.java        |  19 +-
 .../cc/work/GetNodeControllersInfoWork.java     |  20 +-
 .../control/cc/work/GetNodeDetailsJSONWork.java |  25 +-
 .../cc/work/GetNodeSummariesJSONWork.java       |  16 +-
 .../control/cc/work/GetThreadDumpWork.java      |   5 +-
 .../hyracks/control/cc/work/JobCleanupWork.java | 101 +--
 .../hyracks/control/cc/work/JobStartWork.java   |  19 +-
 .../cc/work/JobletCleanupNotificationWork.java  |  32 +-
 .../control/cc/work/NotifyDeployBinaryWork.java |   2 +-
 .../control/cc/work/NotifyShutdownWork.java     |   2 +-
 .../control/cc/work/RegisterNodeWork.java       |  31 +-
 .../work/RegisterPartitionAvailibilityWork.java |   5 +-
 .../cc/work/RegisterPartitionRequestWork.java   |   5 +-
 .../control/cc/work/RemoveDeadNodesWork.java    |  57 +-
 .../control/cc/work/ReportProfilesWork.java     |  14 +-
 .../control/cc/work/TaskCompleteWork.java       |   6 +-
 .../control/cc/work/TaskFailureWork.java        |   6 +-
 .../control/cc/work/UnregisterNodeWork.java     |  14 +-
 .../cc/work/WaitForJobCompletionWork.java       |  42 +-
 .../control/cc/cluster/NodeManagerTest.java     | 165 ++++
 .../hyracks/control/cc/job/JobManagerTest.java  | 238 ++++++
 .../control/common/base/INodeController.java    |   3 +-
 .../control/common/controllers/CCConfig.java    |   9 +
 .../common/controllers/NodeRegistration.java    |  10 +-
 .../common/deployment/DeploymentRun.java        |   3 +-
 .../control/common/ipc/CCNCFunctions.java       |   8 +-
 .../common/ipc/NodeControllerRemoteProxy.java   |   4 +-
 .../control/common/shutdown/ShutdownRun.java    |   3 +-
 .../control/nc/NodeControllerService.java       |   9 +-
 .../apache/hyracks/control/nc/io/IOManager.java |   2 +-
 .../hyracks/control/nc/work/StartTasksWork.java |   8 +-
 .../btree/helper/NCApplicationEntryPoint.java   |   7 +
 .../search/AbstractTOccurrenceSearcher.java     |   4 +-
 120 files changed, 5008 insertions(+), 2187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 90a8599..c4535cf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -61,9 +61,10 @@ import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.util.AppContextInfo;
 import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
-import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.util.ResourceUtils;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -89,6 +90,7 @@ import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConf
 import org.apache.hyracks.api.client.IClusterInfoCollector;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
@@ -238,8 +240,9 @@ public class APIFramework {
 
         int parallelism = getParallelism(querySpecificConfig.get(CompilerProperties.COMPILER_PARALLELISM_KEY),
                 compilerProperties.getParallelism());
-        builder.setClusterLocations(parallelism == CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE
-                ? metadataProvider.getClusterLocations() : getComputationLocations(clusterInfoCollector, parallelism));
+        AlgebricksAbsolutePartitionConstraint computationLocations = chooseLocations(clusterInfoCollector, parallelism,
+                metadataProvider.getClusterLocations());
+        builder.setClusterLocations(computationLocations);
 
         ICompiler compiler = compilerFactory.createCompiler(plan, metadataProvider, t.getVarCounter());
         if (conf.isOptimize()) {
@@ -314,6 +317,14 @@ public class APIFramework {
                 metadataProvider.isWriteTransaction());
         JobSpecification spec = compiler.createJob(AppContextInfo.INSTANCE, jobEventListenerFactory);
 
+        // When the top-level statement is a query, the statement parameter is null.
+        if (statement == null) {
+            // Sets a required capacity, only for read-only queries.
+            // DDLs and DMLs are considered not that frequent.
+            spec.setRequiredClusterCapacity(ResourceUtils.getRequiredCompacity(plan, computationLocations,
+                    sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize));
+        }
+
         if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
             printPlanPrefix(conf, "Hyracks job");
             if (rwQ != null) {
@@ -364,54 +375,78 @@ public class APIFramework {
         }
     }
 
-    // Computes the location constraints based on user-configured parallelism parameter.
-    // Note that the parallelism parameter is only a hint -- it will not be respected if it is too small or too large.
-    private AlgebricksAbsolutePartitionConstraint getComputationLocations(IClusterInfoCollector clusterInfoCollector,
-            int parallelismHint) throws AlgebricksException {
+    // Chooses the location constraints, i.e., whether to use storage parallelism or use a user-sepcified number
+    // of cores.
+    private AlgebricksAbsolutePartitionConstraint chooseLocations(IClusterInfoCollector clusterInfoCollector,
+            int parallelismHint, AlgebricksAbsolutePartitionConstraint storageLocations) throws AlgebricksException {
         try {
             Map<String, NodeControllerInfo> ncMap = clusterInfoCollector.getNodeControllerInfos();
 
-            // Unifies the handling of non-positive parallelism.
-            int parallelism = parallelismHint <= 0 ? -2 * ncMap.size() : parallelismHint;
-
-            // Calculates per node parallelism, with load balance, i.e., randomly selecting nodes with larger
-            // parallelism.
-            int numNodes = ncMap.size();
-            int numNodesWithOneMorePartition = parallelism % numNodes;
-            int perNodeParallelismMin = parallelism / numNodes;
-            int perNodeParallelismMax = parallelism / numNodes + 1;
-            List<String> allNodes = new ArrayList<>();
-            Set<String> selectedNodesWithOneMorePartition = new HashSet<>();
-            for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) {
-                allNodes.add(entry.getKey());
-            }
-            Random random = new Random();
-            for (int index = numNodesWithOneMorePartition; index >= 1; --index) {
-                int pick = random.nextInt(index);
-                selectedNodesWithOneMorePartition.add(allNodes.get(pick));
-                Collections.swap(allNodes, pick, index - 1);
-            }
+            // Gets total number of cores in the cluster.
+            int totalNumCores = getTotalNumCores(ncMap);
 
-            // Generates cluster locations, which has duplicates for a node if it contains more than one partitions.
-            List<String> locations = new ArrayList<>();
-            for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) {
-                String nodeId = entry.getKey();
-                int numCores = entry.getValue().getNumCores();
-                int availableCores = numCores > 1 ? numCores - 1 : numCores; // Reserves one core for heartbeat.
-                int nodeParallelism = selectedNodesWithOneMorePartition.contains(nodeId) ? perNodeParallelismMax
-                        : perNodeParallelismMin;
-                int coresToUse = nodeParallelism >= 0 && nodeParallelism < availableCores ? nodeParallelism
-                        : availableCores;
-                for (int count = 0; count < coresToUse; ++count) {
-                    locations.add(nodeId);
-                }
+            // If storage parallelism is not larger than the total number of cores, we use the storage parallelism.
+            // Otherwise, we will use all available cores.
+            if (parallelismHint == CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE
+                    && storageLocations.getLocations().length <= totalNumCores) {
+                return storageLocations;
             }
-            return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
-        } catch (Exception e) {
+            return getComputationLocations(ncMap, parallelismHint);
+        } catch (HyracksException e) {
             throw new AlgebricksException(e);
         }
     }
 
+    // Computes the location constraints based on user-configured parallelism parameter.
+    // Note that the parallelism parameter is only a hint -- it will not be respected if it is too small or too large.
+    private AlgebricksAbsolutePartitionConstraint getComputationLocations(Map<String, NodeControllerInfo> ncMap,
+            int parallelismHint) {
+        // Unifies the handling of non-positive parallelism.
+        int parallelism = parallelismHint <= 0 ? -2 * ncMap.size() : parallelismHint;
+
+        // Calculates per node parallelism, with load balance, i.e., randomly selecting nodes with larger
+        // parallelism.
+        int numNodes = ncMap.size();
+        int numNodesWithOneMorePartition = parallelism % numNodes;
+        int perNodeParallelismMin = parallelism / numNodes;
+        int perNodeParallelismMax = parallelism / numNodes + 1;
+        List<String> allNodes = new ArrayList<>();
+        Set<String> selectedNodesWithOneMorePartition = new HashSet<>();
+        for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) {
+            allNodes.add(entry.getKey());
+        }
+        Random random = new Random();
+        for (int index = numNodesWithOneMorePartition; index >= 1; --index) {
+            int pick = random.nextInt(index);
+            selectedNodesWithOneMorePartition.add(allNodes.get(pick));
+            Collections.swap(allNodes, pick, index - 1);
+        }
+
+        // Generates cluster locations, which has duplicates for a node if it contains more than one partitions.
+        List<String> locations = new ArrayList<>();
+        for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) {
+            String nodeId = entry.getKey();
+            int availableCores = entry.getValue().getNumAvailableCores();
+            int nodeParallelism = selectedNodesWithOneMorePartition.contains(nodeId) ? perNodeParallelismMax
+                    : perNodeParallelismMin;
+            int coresToUse = nodeParallelism >= 0 && nodeParallelism < availableCores ? nodeParallelism
+                    : availableCores;
+            for (int count = 0; count < coresToUse; ++count) {
+                locations.add(nodeId);
+            }
+        }
+        return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
+    }
+
+    // Gets the total number of available cores in the cluster.
+    private int getTotalNumCores(Map<String, NodeControllerInfo> ncMap) {
+        int sum = 0;
+        for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) {
+            sum += entry.getValue().getNumAvailableCores();
+        }
+        return sum;
+    }
+
     // Gets the frame limit.
     private int getFrameLimit(String parameter, long memBudgetInConfiguration, int frameSize) {
         IPropertyInterpreter<Long> longBytePropertyInterpreter = PropertyInterpreters.getLongBytePropertyInterpreter();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
new file mode 100644
index 0000000..3a6bfee
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.resource;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+
+// The current implementation aggregates the memory requirement for each operator.
+// TODO(buyingyi): consider stages for calculating the memory requirement.
+public class RequiredCapacityVisitor implements ILogicalOperatorVisitor<Void, Void> {
+
+    private static final long MAX_BUFFER_PER_CONNECTION = 1L;
+
+    private final long numComputationPartitions;
+    private final long groupByMemorySize;
+    private final long joinMemorySize;
+    private final long sortMemorySize;
+    private final long frameSize;
+    private final IClusterCapacity clusterCapacity;
+    private final Set<ILogicalOperator> visitedOperators = new HashSet<>();
+    private long stageMemorySoFar = 0L;
+
+    public RequiredCapacityVisitor(int numComputationPartitions, int sortFrameLimit, int groupFrameLimit,
+            int joinFrameLimit, int frameSize, IClusterCapacity clusterCapacity) {
+        this.numComputationPartitions = numComputationPartitions;
+        this.frameSize = frameSize;
+        this.groupByMemorySize = groupFrameLimit * (long) frameSize;
+        this.joinMemorySize = joinFrameLimit * (long) frameSize;
+        this.sortMemorySize = sortFrameLimit * (long) frameSize;
+        this.clusterCapacity = clusterCapacity;
+        this.clusterCapacity.setAggregatedCores(1); // At least one core is needed.
+    }
+
+    @Override
+    public Void visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+        calculateMemoryUsageForBlockingOperators(op, groupByMemorySize);
+        return null;
+    }
+
+    @Override
+    public Void visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+        calculateMemoryUsageForBlockingOperators(op, joinMemorySize);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+        calculateMemoryUsageForBlockingOperators(op, joinMemorySize);
+        return null;
+    }
+
+    @Override
+    public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+        calculateMemoryUsageForBlockingOperators(op, sortMemorySize);
+        return null;
+    }
+
+    @Override
+    public Void visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+        // Makes sure that the downstream of a replicate operator is only visited once.
+        if (!visitedOperators.contains(op)) {
+            visitedOperators.add(op);
+            visitInternal(op, true);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+        // Makes sure that the downstream of a split operator is only visited once.
+        if (!visitedOperators.contains(op)) {
+            visitedOperators.add(op);
+            visitInternal(op, true);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+        calculateMemoryUsageForExchange(op);
+        return null;
+    }
+
+    @Override
+    public Void visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
+            throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+        visitInternal(op, true);
+        return null;
+    }
+
+    // Calculates the memory usage for exchange operators.
+    private void calculateMemoryUsageForExchange(ExchangeOperator op) throws AlgebricksException {
+        visitInternal(op, false);
+        IPhysicalOperator physicalOperator = op.getPhysicalOperator();
+        PhysicalOperatorTag physicalOperatorTag = physicalOperator.getOperatorTag();
+        if (physicalOperatorTag == PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE
+                || physicalOperatorTag == PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
+            addOutputBuffer(op);
+            return;
+        }
+        stageMemorySoFar += 2L * MAX_BUFFER_PER_CONNECTION * numComputationPartitions * numComputationPartitions
+                * frameSize;
+        clusterCapacity.setAggregatedMemoryByteSize(stageMemorySoFar);
+    }
+
+    // Calculates the cluster-wide memory usage for blocking activities like group-by, sort, and join.
+    private void calculateMemoryUsageForBlockingOperators(ILogicalOperator op, long memSize)
+            throws AlgebricksException {
+        visitInternal(op, false);
+        if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+                || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) {
+            stageMemorySoFar += memSize * numComputationPartitions;
+        } else {
+            stageMemorySoFar += memSize;
+        }
+        clusterCapacity.setAggregatedMemoryByteSize(stageMemorySoFar);
+    }
+
+    // Recursively visits input operators of an operator and sets the CPU core usage.
+    private void visitInternal(ILogicalOperator op, boolean toAddOuputBuffer) throws AlgebricksException {
+        for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+            inputOpRef.getValue().accept(this, null);
+        }
+        if (toAddOuputBuffer) {
+            addOutputBuffer(op);
+        }
+        setAvailableCores(op);
+    }
+
+    // Adds output buffer for an operator.
+    private void addOutputBuffer(ILogicalOperator op) {
+        if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+                || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) {
+            stageMemorySoFar += frameSize * numComputationPartitions; // every operator needs one output buffer.
+        } else {
+            stageMemorySoFar += frameSize; // every operator needs one output buffer.
+        }
+        clusterCapacity.setAggregatedMemoryByteSize(stageMemorySoFar);
+    }
+
+    // Sets the number of available cores
+    private void setAvailableCores(ILogicalOperator op) {
+        if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+                || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) {
+            clusterCapacity.setAggregatedCores((int) numComputationPartitions);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 19c00db..5756e7d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -30,7 +30,6 @@ import javax.servlet.Servlet;
 
 import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.api.http.servlet.APIServlet;
-import org.apache.asterix.api.http.servlet.FullAPIServlet;
 import org.apache.asterix.api.http.servlet.ClusterAPIServlet;
 import org.apache.asterix.api.http.servlet.ClusterCCDetailsAPIServlet;
 import org.apache.asterix.api.http.servlet.ClusterNodeDetailsAPIServlet;
@@ -38,6 +37,7 @@ import org.apache.asterix.api.http.servlet.ConnectorAPIServlet;
 import org.apache.asterix.api.http.servlet.DDLAPIServlet;
 import org.apache.asterix.api.http.servlet.DiagnosticsAPIServlet;
 import org.apache.asterix.api.http.servlet.FeedServlet;
+import org.apache.asterix.api.http.servlet.FullAPIServlet;
 import org.apache.asterix.api.http.servlet.QueryAPIServlet;
 import org.apache.asterix.api.http.servlet.QueryResultAPIServlet;
 import org.apache.asterix.api.http.servlet.QueryServiceServlet;
@@ -47,8 +47,8 @@ import org.apache.asterix.api.http.servlet.ServletConstants;
 import org.apache.asterix.api.http.servlet.ShutdownAPIServlet;
 import org.apache.asterix.api.http.servlet.UpdateAPIServlet;
 import org.apache.asterix.api.http.servlet.VersionAPIServlet;
-import org.apache.asterix.app.cc.ResourceIdManager;
 import org.apache.asterix.app.cc.CompilerExtensionManager;
+import org.apache.asterix.app.cc.ResourceIdManager;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.config.AsterixExtension;
@@ -62,11 +62,13 @@ import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
 import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
 import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
+import org.apache.asterix.runtime.job.resource.JobCapacityController;
 import org.apache.asterix.runtime.util.AppContextInfo;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.control.cc.ClusterControllerService;
@@ -85,6 +87,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
     private static IAsterixStateProxy proxy;
     protected ICCApplicationContext appCtx;
     protected CompilerExtensionManager ccExtensionManager;
+    private IJobCapacityController jobCapacityController;
 
     @Override
     public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
@@ -131,6 +134,8 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
         ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
         ccAppCtx.setMessageBroker(messageBroker);
+
+        jobCapacityController = new JobCapacityController(controllerService.getResourceManager());
     }
 
     protected List<AsterixExtension> getExtensions() {
@@ -330,6 +335,11 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
     }
 
+    @Override
+    public IJobCapacityController getJobCapacityController() {
+        return jobCapacityController;
+    }
+
     public static synchronized void setAsterixStateProxy(IAsterixStateProxy proxy) {
         CCApplicationEntryPoint.proxy = proxy;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 7a4ff13..41d8b0d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.hyracks.bootstrap;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -93,7 +94,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
     }
 
     @Override
-    public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException {
+    public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException {
         for (String deadNode : deadNodeIds) {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("NC: " + deadNode + " left");
@@ -118,7 +119,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
         }
     }
 
-    private void updateProgress(ClusterEventType eventType, Set<String> nodeIds) {
+    private void updateProgress(ClusterEventType eventType, Collection<String> nodeIds) {
         List<IClusterManagementWorkResponse> completedResponses = new ArrayList<IClusterManagementWorkResponse>();
         boolean isComplete = false;
         for (IClusterManagementWorkResponse resp : pendingWorkResponses) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index f64f998..d437b5b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.hyracks.bootstrap;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -59,7 +60,7 @@ public class GlobalRecoveryManager implements IGlobalRecoveryMaanger {
     }
 
     @Override
-    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
+    public Set<IClusterManagementWork> notifyNodeFailure(Collection<String> deadNodeIds) {
         setState(ClusterStateManager.INSTANCE.getState());
         ClusterStateManager.INSTANCE.setGlobalRecoveryCompleted(false);
         return Collections.emptySet();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 8998c6b..bc270df 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -32,6 +32,7 @@ import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
@@ -51,6 +52,7 @@ import org.apache.asterix.transaction.management.resource.PersistentLocalResourc
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.application.INCApplicationEntryPoint;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.messages.IMessageBroker;
@@ -261,6 +263,19 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         performLocalCleanUp();
     }
 
+    @Override
+    public NodeCapacity getCapacity() {
+        IPropertiesProvider propertiesProvider = (IPropertiesProvider) runtimeContext;
+        StorageProperties storageProperties = propertiesProvider.getStorageProperties();
+        // Deducts the reserved buffer cache size and memory component size from the maxium heap size,
+        // and deducts one core for processing heartbeats.
+        long memorySize = Runtime.getRuntime().maxMemory() - storageProperties.getBufferCacheSize()
+                - storageProperties.getMemoryComponentGlobalBudget();
+        int allCores = Runtime.getRuntime().availableProcessors();
+        int maximumCoresForComputation = allCores > 1 ? allCores - 1 : allCores;
+        return new NodeCapacity(memorySize, maximumCoresForComputation);
+    }
+
     private void performLocalCleanUp() {
         //Delete working area files from failed jobs
         runtimeContext.getIOManager().deleteWorkspaceFiles();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index d1d7ff7..d785cce 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -28,6 +28,7 @@ import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 
 public class CCMessageBroker implements ICCMessageBroker {
 
@@ -49,8 +50,8 @@ public class CCMessageBroker implements ICCMessageBroker {
 
     @Override
     public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception {
-        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
-        NodeControllerState state = nodeMap.get(nodeId);
+        INodeManager nodeManager = ccs.getNodeManager();
+        NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
         state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java
new file mode 100644
index 0000000..50a21bc
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.util;
+
+import org.apache.asterix.app.resource.RequiredCapacityVisitor;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+
+public class ResourceUtils {
+
+    private ResourceUtils() {
+    }
+
+    /**
+     * Calculates the required cluster capacity from a given query plan, the computation locations,
+     * the operator memory budgets, and frame size.
+     *
+     * @param plan,
+     *            a given query plan.
+     * @param computationLocations,
+     *            the partitions for computation.
+     * @param sortFrameLimit,
+     *            the frame limit for one sorter partition.
+     * @param groupFrameLimit,
+     *            the frame limit for one group-by partition.
+     * @param joinFrameLimit
+     *            the frame limit for one joiner partition.
+     * @param frameSize
+     *            the frame size used in query execution.
+     * @return the required cluster capacity for executing the query.
+     * @throws AlgebricksException
+     *             if the query plan is malformed.
+     */
+    public static IClusterCapacity getRequiredCompacity(ILogicalPlan plan,
+            AlgebricksAbsolutePartitionConstraint computationLocations, int sortFrameLimit, int groupFrameLimit,
+            int joinFrameLimit, int frameSize)
+            throws AlgebricksException {
+        // Creates a cluster capacity visitor.
+        IClusterCapacity clusterCapacity = new ClusterCapacity();
+        RequiredCapacityVisitor visitor = new RequiredCapacityVisitor(computationLocations.getLocations().length,
+                sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize, clusterCapacity);
+
+        // There could be only one root operator for a top-level query plan.
+        ILogicalOperator rootOp = plan.getRoots().get(0).getValue();
+        rootOp.accept(visitor, null);
+        return clusterCapacity;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
index a79053e..e041021 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
@@ -27,8 +27,10 @@ import static org.mockito.Mockito.when;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IClusterInfoCollector;
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.junit.Assert;
@@ -39,15 +41,16 @@ import junit.extensions.PA;
 public class APIFrameworkTest {
 
     @Test
-    public void testGetComputationLocations() throws Exception {
+    public void testChooseLocations() throws Exception {
+        // Mocks cluster info collector.
         IClusterInfoCollector clusterInfoCollector = mock(IClusterInfoCollector.class);
 
         // Constructs mocked cluster nodes.
         Map<String, NodeControllerInfo> map = new HashMap<>();
         NodeControllerInfo nc1Info = mock(NodeControllerInfo.class);
-        when(nc1Info.getNumCores()).thenReturn(4);
+        when(nc1Info.getNumAvailableCores()).thenReturn(1);
         NodeControllerInfo nc2Info = mock(NodeControllerInfo.class);
-        when(nc2Info.getNumCores()).thenReturn(4);
+        when(nc2Info.getNumAvailableCores()).thenReturn(1);
         String nc1 = "nc1";
         String nc2 = "nc2";
         map.put(nc1, nc1Info);
@@ -57,10 +60,56 @@ public class APIFrameworkTest {
         // Creates an APIFramework.
         APIFramework apiFramework = new APIFramework(mock(ILangCompilationProvider.class));
 
+        // Tests large storage locations.
+        AlgebricksAbsolutePartitionConstraint storageLocations = new AlgebricksAbsolutePartitionConstraint(
+                new String[] { "node1", "node1", "node2" });
+        AlgebricksAbsolutePartitionConstraint computationLocations = (AlgebricksAbsolutePartitionConstraint) PA
+                .invokeMethod(apiFramework,
+                        "chooseLocations(" + IClusterInfoCollector.class.getName() + ",int,"
+                                + AlgebricksAbsolutePartitionConstraint.class.getName() + ")",
+                        clusterInfoCollector, CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE, storageLocations);
+        Assert.assertTrue(computationLocations.getLocations().length == 2);
+
+        // Tests suitable storage locations.
+        storageLocations = new AlgebricksAbsolutePartitionConstraint(new String[] { "node1", "node2" });
+        computationLocations = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework,
+                "chooseLocations(" + IClusterInfoCollector.class.getName() + ",int,"
+                        + AlgebricksAbsolutePartitionConstraint.class.getName() + ")",
+                clusterInfoCollector, CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE, storageLocations);
+        Assert.assertTrue(computationLocations.getLocations().length == 2);
+
+        // Tests small storage locations.
+        storageLocations = new AlgebricksAbsolutePartitionConstraint(new String[] { "node1" });
+        computationLocations = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework,
+                "chooseLocations(" + IClusterInfoCollector.class.getName() + ",int,"
+                        + AlgebricksAbsolutePartitionConstraint.class.getName() + ")",
+                clusterInfoCollector, CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE, storageLocations);
+        Assert.assertTrue(computationLocations.getLocations().length == 1);
+
+        // Verifies the number of calls on clusterInfoCollector.getNodeControllerInfos() in
+        // APIFramework.chooseLocations(...).
+        verify(clusterInfoCollector, times(3)).getNodeControllerInfos();
+    }
+
+    @Test
+    public void testGetComputationLocations() throws AlgebricksException {
+        // Constructs mocked cluster nodes.
+        Map<String, NodeControllerInfo> map = new HashMap<>();
+        NodeControllerInfo nc1Info = mock(NodeControllerInfo.class);
+        when(nc1Info.getNumAvailableCores()).thenReturn(4);
+        NodeControllerInfo nc2Info = mock(NodeControllerInfo.class);
+        when(nc2Info.getNumAvailableCores()).thenReturn(4);
+        String nc1 = "nc1";
+        String nc2 = "nc2";
+        map.put(nc1, nc1Info);
+        map.put(nc2, nc2Info);
+
+        // Creates an APIFramework.
+        APIFramework apiFramework = new APIFramework(mock(ILangCompilationProvider.class));
+
         // Tests odd number parallelism.
         AlgebricksAbsolutePartitionConstraint loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(
-                apiFramework, "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)",
-                clusterInfoCollector, 5);
+                apiFramework, "getComputationLocations(java.util.Map,int)", map, 5);
         int nc1Count = 0, nc2Count = 0;
         String[] partitions = loc.getLocations();
         for (String partition : partitions) {
@@ -78,7 +127,7 @@ public class APIFrameworkTest {
 
         // Tests even number parallelism.
         loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework,
-                "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector, 8);
+                "getComputationLocations(java.util.Map,int)", map, 8);
         nc1Count = 0;
         nc2Count = 0;
         partitions = loc.getLocations();
@@ -93,40 +142,35 @@ public class APIFrameworkTest {
         Assert.assertTrue(nc1Count > 0);
         Assert.assertTrue(nc2Count > 0);
         Assert.assertTrue(Math.abs(nc1Count - nc2Count) == 0); // Tests load balance.
-        // The maximum parallelism cannot be beyond n *(#core-1), where n is the number of NCs and #core is the number
+        // The maximum parallelism cannot be beyond n * core, where n is the number of NCs and #core is the number
         // of cores per NC.
-        Assert.assertTrue(partitions.length == 6);
+        Assert.assertTrue(partitions.length == 8);
 
         // Tests the case when parallelism is one.
         loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework,
-                "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector, 1);
+                "getComputationLocations(java.util.Map,int)", map, 1);
         Assert.assertTrue(loc.getLocations().length == 1);
 
         // Tests the case when parallelism is a negative.
         // In this case, the compiler has no idea and falls back to the default setting where all possible cores
         // are used.
         loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework,
-                "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector,
-                -100);
-        Assert.assertTrue(loc.getLocations().length == 6);
+                "getComputationLocations(java.util.Map,int)", map, -100);
+        Assert.assertTrue(loc.getLocations().length == 8);
 
         // Tests the case when parallelism is -1.
         // In this case, the compiler has no idea and falls back to the default setting where all possible cores
         // are used.
         loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework,
-                "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector, -1);
-        Assert.assertTrue(loc.getLocations().length == 6);
+                "getComputationLocations(java.util.Map,int)", map, -1);
+        Assert.assertTrue(loc.getLocations().length == 8);
 
         // Tests the case when parallelism is zero.
         // In this case, the compiler has no idea and falls back to the default setting where all possible cores
         // are used.
         loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework,
-                "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector, 0);
-        Assert.assertTrue(loc.getLocations().length == 6);
-
-        // Verifies the number of calls on clusterInfoCollector.getNodeControllerInfos() in
-        // APIFramework.getComputationLocations(...).
-        verify(clusterInfoCollector, times(6)).getNodeControllerInfos();
+                "getComputationLocations(java.util.Map,int)", map, 0);
+        Assert.assertTrue(loc.getLocations().length == 8);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java
new file mode 100644
index 0000000..cc18c31
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.resource;
+
+import java.util.Collections;
+
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RequiredCapacityVisitorTest {
+
+    private static final long MEMORY_BUDGET = 33554432L;
+    private static final int FRAME_SIZE = 32768;
+    private static final int FRAME_LIMIT = (int) (MEMORY_BUDGET / FRAME_SIZE);
+    private static final int PARALLELISM = 10;
+
+    @Test
+    public void testParallelGroupBy() throws AlgebricksException {
+        IClusterCapacity clusterCapacity = new ClusterCapacity();
+        RequiredCapacityVisitor visitor = makeComputationCapacityVisitor(PARALLELISM, clusterCapacity);
+
+        // Constructs a parallel group-by query plan.
+        GroupByOperator globalGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+        ExchangeOperator exchange = new ExchangeOperator();
+        exchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+        GroupByOperator localGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.LOCAL);
+        globalGby.getInputs().add(new MutableObject<>(exchange));
+        exchange.getInputs().add(new MutableObject<>(localGby));
+
+        // Verifies the calculated cluster capacity requirement for the test quer plan.
+        globalGby.accept(visitor, null);
+        Assert.assertTrue(clusterCapacity.getAggregatedCores() == PARALLELISM);
+        Assert.assertTrue(clusterCapacity.getAggregatedMemoryByteSize() == 2 * MEMORY_BUDGET * PARALLELISM
+                + 2 * FRAME_SIZE * PARALLELISM * PARALLELISM);
+    }
+
+    @Test
+    public void testUnPartitionedGroupBy() throws AlgebricksException {
+        IClusterCapacity clusterCapacity = new ClusterCapacity();
+        RequiredCapacityVisitor visitor = makeComputationCapacityVisitor(PARALLELISM, clusterCapacity);
+
+        // Constructs a parallel group-by query plan.
+        GroupByOperator globalGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+        ExchangeOperator exchange = new ExchangeOperator();
+        exchange.setPhysicalOperator(new OneToOneExchangePOperator());
+        exchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+        GroupByOperator localGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+        globalGby.getInputs().add(new MutableObject<>(exchange));
+        exchange.getInputs().add(new MutableObject<>(localGby));
+
+        // Verifies the calculated cluster capacity requirement for the test quer plan.
+        globalGby.accept(visitor, null);
+        Assert.assertTrue(clusterCapacity.getAggregatedCores() == 1);
+        Assert.assertTrue(clusterCapacity.getAggregatedMemoryByteSize() == 2 * MEMORY_BUDGET + FRAME_SIZE);
+    }
+
+    @Test
+    public void testParallelJoin() throws AlgebricksException {
+        IClusterCapacity clusterCapacity = new ClusterCapacity();
+        RequiredCapacityVisitor visitor = makeComputationCapacityVisitor(PARALLELISM, clusterCapacity);
+
+        // Constructs a join query plan.
+        InnerJoinOperator join = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+
+        // Left child plan of the join.
+        ExchangeOperator leftChildExchange = new ExchangeOperator();
+        leftChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+        leftChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+        InnerJoinOperator leftChild = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+        join.getInputs().add(new MutableObject<>(leftChildExchange));
+        leftChildExchange.getInputs().add(new MutableObject<>(leftChild));
+        EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
+        ets.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+        leftChild.getInputs().add(new MutableObject<>(ets));
+        leftChild.getInputs().add(new MutableObject<>(ets));
+
+        // Right child plan of the join.
+        ExchangeOperator rightChildExchange = new ExchangeOperator();
+        rightChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+        rightChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+        GroupByOperator rightChild = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.LOCAL);
+        join.getInputs().add(new MutableObject<>(rightChildExchange));
+        rightChildExchange.getInputs().add(new MutableObject<>(rightChild));
+        rightChild.getInputs().add(new MutableObject<>(ets));
+
+        // Verifies the calculated cluster capacity requirement for the test quer plan.
+        join.accept(visitor, null);
+        Assert.assertTrue(clusterCapacity.getAggregatedCores() == PARALLELISM);
+        Assert.assertTrue(clusterCapacity.getAggregatedMemoryByteSize() == 3 * MEMORY_BUDGET * PARALLELISM
+                + 2 * 2L * PARALLELISM * PARALLELISM * FRAME_SIZE + 3 * FRAME_SIZE * PARALLELISM);
+    }
+
+    @Test
+    public void testUnPartitionedJoin() throws AlgebricksException {
+        IClusterCapacity clusterCapacity = new ClusterCapacity();
+        RequiredCapacityVisitor visitor = makeComputationCapacityVisitor(PARALLELISM, clusterCapacity);
+
+        // Constructs a join query plan.
+        InnerJoinOperator join = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+
+        // Left child plan of the join.
+        ExchangeOperator leftChildExchange = new ExchangeOperator();
+        leftChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+        leftChildExchange.setPhysicalOperator(new OneToOneExchangePOperator());
+        InnerJoinOperator leftChild = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+        join.getInputs().add(new MutableObject<>(leftChildExchange));
+        leftChildExchange.getInputs().add(new MutableObject<>(leftChild));
+        EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
+        ets.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+        leftChild.getInputs().add(new MutableObject<>(ets));
+        leftChild.getInputs().add(new MutableObject<>(ets));
+
+        // Right child plan of the join.
+        ExchangeOperator rightChildExchange = new ExchangeOperator();
+        rightChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+        rightChildExchange.setPhysicalOperator(new OneToOneExchangePOperator());
+        GroupByOperator rightChild = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+        join.getInputs().add(new MutableObject<>(rightChildExchange));
+        rightChildExchange.getInputs().add(new MutableObject<>(rightChild));
+        rightChild.getInputs().add(new MutableObject<>(ets));
+
+        // Verifies the calculated cluster capacity requirement for the test quer plan.
+        join.accept(visitor, null);
+        Assert.assertTrue(clusterCapacity.getAggregatedCores() == 1);
+        Assert.assertTrue(clusterCapacity.getAggregatedMemoryByteSize() == 3 * MEMORY_BUDGET + 5L * FRAME_SIZE);
+    }
+
+    private RequiredCapacityVisitor makeComputationCapacityVisitor(int numComputationPartitions,
+            IClusterCapacity clusterCapacity) {
+        return new RequiredCapacityVisitor(numComputationPartitions, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_SIZE,
+                clusterCapacity);
+    }
+
+    private GroupByOperator makeGroupByOperator(AbstractLogicalOperator.ExecutionMode exeMode) {
+        GroupByOperator groupByOperator = new GroupByOperator();
+        groupByOperator.setExecutionMode(exeMode);
+        return groupByOperator;
+    }
+
+    private InnerJoinOperator makeJoinOperator(AbstractLogicalOperator.ExecutionMode exeMode) {
+        InnerJoinOperator joinOperator = new InnerJoinOperator(new MutableObject<>(ConstantExpression.TRUE));
+        joinOperator.setExecutionMode(exeMode);
+        return joinOperator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp
index 774747e..e0ae61c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp
@@ -20,6 +20,9 @@
 
 use tpcds;
 
+set `compiler.joinmemory` "4MB"
+set `compiler.groupmemory` "4MB"
+
 select case when (select value count(ss)
                   from store_sales ss
                   where ss_quantity >= 1 and ss_quantity <= 20) > 25437

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
index 3a98b82..fef4e31 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
@@ -18,6 +18,7 @@ package org.apache.asterix.common.api;
  * specific language governing permissions and limitations
  * under the License.
  */
+import java.util.Collection;
 import java.util.Set;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
@@ -28,7 +29,7 @@ public interface IClusterEventsSubscriber {
      * @param deadNodeIds
      * @return
      */
-    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds);
+    public Set<IClusterManagementWork> notifyNodeFailure(Collection<String> deadNodeIds);
 
     /**
      * @param joinedNodeId

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml b/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml
index 9992009..a5ecc6b 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml
@@ -20,7 +20,7 @@
 
   <property>
     <name>nc.java.opts</name>
-    <value>-Xmx3096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"</value>
+    <value>-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"</value>
     <description>JVM parameters for each Node Contoller (NC)</description>
   </property>
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java
index 580aab7..34b873c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.metadata.cluster;
 
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -30,7 +31,7 @@ public class RemoveNodeWorkResponse extends ClusterManagementWorkResponse {
         nodesToBeRemoved.addAll(w.getNodesToBeRemoved());
     }
 
-    public boolean updateProgress(Set<String> failedNodeIds) {
+    public boolean updateProgress(Collection<String> failedNodeIds) {
         nodesToBeRemoved.removeAll(failedNodeIds);
         return nodesToBeRemoved.isEmpty();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/pom.xml b/asterixdb/asterix-runtime/pom.xml
index 1ccdc76..6458fb0 100644
--- a/asterixdb/asterix-runtime/pom.xml
+++ b/asterixdb/asterix-runtime/pom.xml
@@ -140,5 +140,16 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.10.19</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
new file mode 100644
index 0000000..8ea1fa7
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.job.resource;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+import org.apache.hyracks.control.cc.scheduler.IResourceManager;
+
+// To avoid the computation cost for checking the capacity constraint for each node,
+// currently the admit/allocation decisions are based on the aggregated resource information.
+// TODO(buyingyi): investigate partition-aware resource control.
+public class JobCapacityController implements IJobCapacityController {
+
+    private final IResourceManager resourceManager;
+
+    public JobCapacityController(IResourceManager resourceManager) {
+        this.resourceManager = resourceManager;
+    }
+
+    @Override
+    public JobSubmissionStatus allocate(JobSpecification job) throws HyracksException {
+        IClusterCapacity requiredCapacity = job.getRequiredClusterCapacity();
+        long reqAggregatedMemoryByteSize = requiredCapacity.getAggregatedMemoryByteSize();
+        int reqAggregatedNumCores = requiredCapacity.getAggregatedCores();
+        IReadOnlyClusterCapacity maximumCapacity = resourceManager.getMaximumCapacity();
+        if (!(reqAggregatedMemoryByteSize <= maximumCapacity.getAggregatedMemoryByteSize()
+                && reqAggregatedNumCores <= maximumCapacity.getAggregatedCores())) {
+            throw HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, requiredCapacity.toString(),
+                    maximumCapacity.toString());
+        }
+        IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity();
+        long currentAggregatedMemoryByteSize = currentCapacity.getAggregatedMemoryByteSize();
+        int currentAggregatedAvailableCores = currentCapacity.getAggregatedCores();
+        if (!(reqAggregatedMemoryByteSize <= currentAggregatedMemoryByteSize
+                && reqAggregatedNumCores <= currentAggregatedAvailableCores)) {
+            return JobSubmissionStatus.QUEUE;
+        }
+        currentCapacity.setAggregatedMemoryByteSize(currentAggregatedMemoryByteSize - reqAggregatedMemoryByteSize);
+        currentCapacity.setAggregatedCores(currentAggregatedAvailableCores - reqAggregatedNumCores);
+        return JobSubmissionStatus.EXECUTE;
+    }
+
+    @Override
+    public void release(JobSpecification job) {
+        IClusterCapacity requiredCapacity = job.getRequiredClusterCapacity();
+        long reqAggregatedMemoryByteSize = requiredCapacity.getAggregatedMemoryByteSize();
+        int reqAggregatedNumCores = requiredCapacity.getAggregatedCores();
+        IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity();
+        long aggregatedMemoryByteSize = currentCapacity.getAggregatedMemoryByteSize();
+        int aggregatedNumCores = currentCapacity.getAggregatedCores();
+        currentCapacity.setAggregatedMemoryByteSize(aggregatedMemoryByteSize + reqAggregatedMemoryByteSize);
+        currentCapacity.setAggregatedCores(aggregatedNumCores + reqAggregatedNumCores);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
index ed93a2c..608def7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
@@ -28,6 +28,7 @@ import java.util.Set;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 
 /**
  * Utility class for obtaining information on the set of Hyracks NodeController
@@ -61,6 +62,7 @@ public class RuntimeUtils {
     public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) {
         ClusterControllerService ccs = (ClusterControllerService) AppContextInfo.INSTANCE
                 .getCCApplicationContext().getControllerService();
-        map.putAll(ccs.getIpAddressNodeNameMap());
+        INodeManager nodeManager = ccs.getNodeManager();
+        map.putAll(nodeManager.getIpAddressNodeNameMap());
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
new file mode 100644
index 0000000..4a63885
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.job.resource;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.control.cc.scheduler.IResourceManager;
+import org.apache.hyracks.control.cc.scheduler.ResourceManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JobCapacityControllerTest {
+
+    @Test
+    public void test() throws HyracksException {
+        IResourceManager resourceManager = makeResourceManagerWithCapacity(4294967296L, 33);
+        JobCapacityController capacityController = new JobCapacityController(resourceManager);
+
+        // Verifies the correctness of the allocate method.
+        Assert.assertTrue(capacityController.allocate(
+                makeJobWithRequiredCapacity(4294967296L, 16)) == IJobCapacityController.JobSubmissionStatus.EXECUTE);
+        Assert.assertTrue(capacityController.allocate(
+                makeJobWithRequiredCapacity(2147483648L, 16)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
+        Assert.assertTrue(capacityController.allocate(
+                makeJobWithRequiredCapacity(2147483648L, 32)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
+
+        boolean exceedCapacity = false;
+        try {
+            capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 64));
+        } catch (HyracksException e) {
+            exceedCapacity = e.getErrorCode() == ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY;
+        }
+        Assert.assertTrue(exceedCapacity);
+        Assert.assertTrue(capacityController.allocate(
+                makeJobWithRequiredCapacity(4294967296L, 32)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
+        exceedCapacity = false;
+        try {
+            capacityController.allocate(makeJobWithRequiredCapacity(4294967297L, 33));
+        } catch (HyracksException e) {
+            exceedCapacity = e.getErrorCode() == ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY;
+        }
+        Assert.assertTrue(exceedCapacity);
+
+        // Verifies that the release method does not leak resource.
+        capacityController.release(makeJobWithRequiredCapacity(4294967296L, 16));
+        Assert.assertTrue(resourceManager.getCurrentCapacity().getAggregatedMemoryByteSize() == 4294967296L);
+        Assert.assertTrue(resourceManager.getCurrentCapacity().getAggregatedCores() == 33);
+    }
+
+    private IResourceManager makeResourceManagerWithCapacity(long memorySize, int cores) throws HyracksException {
+        IResourceManager resourceManager = new ResourceManager();
+        resourceManager.update("node1", new NodeCapacity(memorySize, cores));
+        return resourceManager;
+    }
+
+    private JobSpecification makeJobWithRequiredCapacity(long memorySize, int cores) {
+        // Generates cluster capacity.
+        IClusterCapacity clusterCapacity = makeComputationCapacity(memorySize, cores);
+
+        // Generates a job.
+        JobSpecification job = mock(JobSpecification.class);
+        when(job.getRequiredClusterCapacity()).thenReturn(clusterCapacity);
+        return job;
+    }
+
+    private IClusterCapacity makeComputationCapacity(long memorySize, int cores) {
+        IClusterCapacity clusterCapacity = new ClusterCapacity();
+        clusterCapacity.setAggregatedMemoryByteSize(memorySize);
+        clusterCapacity.setAggregatedCores(cores);
+        return clusterCapacity;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
index 4a5f90f..83d421f 100644
--- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
+++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
@@ -78,9 +78,11 @@ public class SampleLocalClusterIT {
 
     @Test
     public void test0_startCluster() throws Exception {
-        Process process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/start-sample-cluster.sh"))
+        Process process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/stop-sample-cluster.sh"), "-f")
                 .inheritIO().start();
         Assert.assertEquals(0, process.waitFor());
+        process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/start-sample-cluster.sh")).inheritIO().start();
+        Assert.assertEquals(0, process.waitFor());
     }
 
     @Test