You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ti...@apache.org on 2017/01/25 14:46:39 UTC

[6/7] asterixdb git commit: Implements concurrent query management support.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
index 92bc076..774c4d9 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
@@ -46,22 +46,42 @@ public class AlgebricksException extends Exception {
         return new AlgebricksException(ErrorCode.HYRACKS, errorCode, ErrorCode.getErrorMessage(errorCode), params);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public AlgebricksException(String message) {
         this(ErrorMessageUtil.NONE, UNKNOWN, message, null, null);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public AlgebricksException(Throwable cause) {
         this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public AlgebricksException(Throwable cause, String nodeId) {
         this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, nodeId);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public AlgebricksException(String message, Throwable cause, String nodeId) {
         this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, nodeId);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public AlgebricksException(String message, Throwable cause) {
         this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, (String) null);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/rewriter/rulecontrollers/SequentialFirstRuleCheckFixpointRuleController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/rewriter/rulecontrollers/SequentialFirstRuleCheckFixpointRuleController.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/rewriter/rulecontrollers/SequentialFirstRuleCheckFixpointRuleController.java
index 9446756..7328278 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/rewriter/rulecontrollers/SequentialFirstRuleCheckFixpointRuleController.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/rewriter/rulecontrollers/SequentialFirstRuleCheckFixpointRuleController.java
@@ -59,7 +59,7 @@ public class SequentialFirstRuleCheckFixpointRuleController extends AbstractRule
         if (ruleCollection instanceof List) {
             rules = (List<IAlgebraicRewriteRule>) ruleCollection;
         } else {
-            throw AlgebricksException.create(ErrorCode.COMPILATION_RULECOLLECTION_NOT_INSTANCE_OF_LIST,
+            throw AlgebricksException.create(ErrorCode.RULECOLLECTION_NOT_INSTANCE_OF_LIST,
                     this.getClass().getName());
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 7f132dd..2bb6cc2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -93,5 +93,10 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
index 8b83d83..5f4877d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 
 /**
  * Application Context at the Cluster Controller for an application.
@@ -38,7 +39,7 @@ public interface ICCApplicationContext extends IApplicationContext {
      * @param state
      *            The distributed state
      */
-    public void setDistributedState(Serializable state);
+    void setDistributedState(Serializable state);
 
     /**
      * A listener that listens to Job Lifecycle events at the Cluster
@@ -46,21 +47,21 @@ public interface ICCApplicationContext extends IApplicationContext {
      *
      * @param jobLifecycleListener
      */
-    public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
+    void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
 
     /**
      * A listener that listens to Cluster Lifecycle events at the Cluster
      * Controller.
      *
-     * @param jobLifecycleListener
+     * @param clusterLifecycleListener
      */
-    public void addClusterLifecycleListener(IClusterLifecycleListener clusterLifecycleListener);
+    void addClusterLifecycleListener(IClusterLifecycleListener clusterLifecycleListener);
 
     /**
      * Get the Cluster Controller Context.
      *
      * @return The Cluster Controller Context.
      */
-    public ICCContext getCCContext();
+    ICCContext getCCContext();
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
index 9f7f222..c11cc7a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
@@ -18,10 +18,14 @@
  */
 package org.apache.hyracks.api.application;
 
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+
 public interface ICCApplicationEntryPoint {
-    public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception;
+    void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception;
 
-    public void stop() throws Exception;
+    void stop() throws Exception;
 
     void startupCompleted() throws Exception;
+
+    IJobCapacityController getJobCapacityController();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
index a9bef18..191a4af 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.api.application;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
@@ -47,6 +48,6 @@ public interface IClusterLifecycleListener {
      * @param deadNodeIds
      *            A set of Node Controller Ids that have left the cluster. The set is not cumulative.
      */
-    public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException;
+    public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException;
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
index ea850c5..dea6e4b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
@@ -18,10 +18,14 @@
  */
 package org.apache.hyracks.api.application;
 
+import org.apache.hyracks.api.job.resource.NodeCapacity;
+
 public interface INCApplicationEntryPoint {
-    public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception;
+    void start(INCApplicationContext ncAppCtx, String[] args) throws Exception;
+
+    void notifyStartupComplete() throws Exception;
 
-    public void notifyStartupComplete() throws Exception;
+    void stop() throws Exception;
 
-    public void stop() throws Exception;
+    NodeCapacity getCapacity();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
index c90644f..57f389f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
@@ -35,16 +35,16 @@ public class NodeControllerInfo implements Serializable {
 
     private final NetworkAddress messagingNetworkAddress;
 
-    private final int numCores;
+    private final int numAvailableCores;
 
     public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress,
-            NetworkAddress datasetNetworkAddress, NetworkAddress messagingNetworkAddress, int numCores) {
+            NetworkAddress datasetNetworkAddress, NetworkAddress messagingNetworkAddress, int numAvailableCores) {
         this.nodeId = nodeId;
         this.status = status;
         this.netAddress = netAddress;
         this.datasetNetworkAddress = datasetNetworkAddress;
         this.messagingNetworkAddress = messagingNetworkAddress;
-        this.numCores = numCores;
+        this.numAvailableCores = numAvailableCores;
     }
 
     public String getNodeId() {
@@ -67,7 +67,7 @@ public class NodeControllerInfo implements Serializable {
         return messagingNetworkAddress;
     }
 
-    public int getNumCores() {
-        return numCores;
+    public int getNumAvailableCores() {
+        return numAvailableCores;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 00c2cc4..9a908ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -75,10 +75,8 @@ public class JobSpecificationActivityClusterGraphGeneratorFactory implements IAc
         acg.setFrameSize(spec.getFrameSize());
         acg.setMaxReattempts(spec.getMaxReattempts());
         acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
-        acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
         acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
         acg.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
-        acg.setReportTaskDetails(spec.isReportTaskDetails());
         final Set<Constraint> constraints = new HashSet<Constraint>();
         final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
             @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 5a67188..1ca8bb3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -27,19 +27,28 @@ import org.apache.hyracks.api.util.ErrorMessageUtil;
 /**
  * A registry of runtime/compile error codes
  * Error code:
- * 0 --- 999: runtime errors
- * 1000 ---- 1999: compilation errors
+ * 0 --- 9999: runtime errors
+ * 10000 ---- 19999: compilation errors
  */
 public class ErrorCode {
     private static final String RESOURCE_PATH = "errormsg" + File.separator + "en.properties";
     public static final String HYRACKS = "HYR";
 
+    // Runtime error codes.
     public static final int INVALID_OPERATOR_OPERATION = 1;
     public static final int ERROR_PROCESSING_TUPLE = 2;
     public static final int FAILURE_ON_NODE = 3;
-    public static final int RUNTIME_FILE_WITH_ABSOULTE_PATH_NOT_WITHIN_ANY_IO_DEVICE = 4;
-    public static final int RUNTIME_FULLTEXT_PHRASE_FOUND = 5;
-    public static final int COMPILATION_RULECOLLECTION_NOT_INSTANCE_OF_LIST = 1001;
+    public static final int FILE_WITH_ABSOULTE_PATH_NOT_WITHIN_ANY_IO_DEVICE = 4;
+    public static final int FULLTEXT_PHRASE_FOUND = 5;
+    public static final int JOB_QUEUE_FULL = 6;
+    public static final int INVALID_NETWORK_ADDRESS = 7;
+    public static final int INVALID_INPUT_PARAMETER = 8;
+    public static final int JOB_REQUIREMENTS_EXCEED_CAPACITY = 9;
+    public static final int NO_SUCH_NODE = 10;
+    public static final int CLASS_LOADING_ISSUE = 11;
+
+    // Compilation error codes.
+    public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10001;
 
     // Loads the map that maps error codes to error message templates.
     private static Map<Integer, String> errorMessageMap = null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 56be93e..0fd6923 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.api.exceptions;
 
 import java.io.Serializable;
-import java.util.logging.Logger;
 
 import org.apache.hyracks.api.util.ErrorMessageUtil;
 
@@ -27,14 +27,6 @@ import org.apache.hyracks.api.util.ErrorMessageUtil;
  * The main execution time exception type for runtime errors in a hyracks environment
  */
 public class HyracksDataException extends HyracksException {
-    private static final long serialVersionUID = 1L;
-
-    public static final int UNKNOWN = 0;
-    private final String component;
-    private final int errorCode;
-    private final Serializable[] params;
-    private final String nodeId;
-    private transient volatile String msgCache;
 
     public static HyracksDataException create(int code, Serializable... params) {
         return new HyracksDataException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params);
@@ -46,76 +38,68 @@ public class HyracksDataException extends HyracksException {
 
     public HyracksDataException(String component, int errorCode, String message, Throwable cause, String nodeId,
             Serializable... params) {
-        super(message, cause);
-        this.component = component;
-        this.errorCode = errorCode;
-        this.nodeId = nodeId;
-        this.params = params;
+        super(component, errorCode, message, cause, nodeId, params);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public HyracksDataException(String message) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, message, null, null);
+        super(message);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public HyracksDataException(Throwable cause) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
+        super(cause);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public HyracksDataException(Throwable cause, String nodeId) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, nodeId);
+        super(cause, nodeId);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public HyracksDataException(String message, Throwable cause, String nodeId) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, nodeId);
+        super(message, cause, nodeId);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public HyracksDataException(String message, Throwable cause) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, (String) null);
+        super(message, cause);
     }
 
     public HyracksDataException(String component, int errorCode, Serializable... params) {
-        this(component, errorCode, null, null, null, params);
+        super(component, errorCode, null, null, null, params);
     }
 
     public HyracksDataException(Throwable cause, int errorCode, Serializable... params) {
-        this(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, params);
+        super(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, params);
     }
 
     public HyracksDataException(String component, int errorCode, String message, Serializable... params) {
-        this(component, errorCode, message, null, null, params);
+        super(component, errorCode, message, null, null, params);
     }
 
     public HyracksDataException(String component, int errorCode, Throwable cause, Serializable... params) {
-        this(component, errorCode, cause.getMessage(), cause, null, params);
+        super(component, errorCode, cause.getMessage(), cause, null, params);
     }
 
     public HyracksDataException(String component, int errorCode, String message, Throwable cause,
             Serializable... params) {
-        this(component, errorCode, message, cause, null, params);
-    }
-
-    public String getComponent() {
-        return component;
-    }
-
-    public int getErrorCode() {
-        return errorCode;
-    }
-
-    public Object[] getParams() {
-        return params;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public String getMessage() {
-        if (msgCache == null) {
-            msgCache = ErrorMessageUtil.formatMessage(component, errorCode, super.getMessage(), params);
-        }
-        return msgCache;
+        super(component, errorCode, message, cause, null, params);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index e939d26..5d13212 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -19,22 +19,118 @@
 package org.apache.hyracks.api.exceptions;
 
 import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hyracks.api.util.ErrorMessageUtil;
 
 public class HyracksException extends IOException {
     private static final long serialVersionUID = 1L;
 
-    public HyracksException() {
+    public static final int UNKNOWN = 0;
+    private final String component;
+    private final int errorCode;
+    private final Serializable[] params;
+    private final String nodeId;
+    private transient volatile String msgCache;
+
+    public static HyracksException create(int code, Serializable... params) {
+        return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params);
+    }
+
+    public static HyracksException create(int code, Throwable cause, Serializable... params) {
+        return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), cause, params);
     }
 
+    public HyracksException(String component, int errorCode, String message, Throwable cause, String nodeId,
+            Serializable... params) {
+        super(message, cause);
+        this.component = component;
+        this.errorCode = errorCode;
+        this.nodeId = nodeId;
+        this.params = params;
+    }
+
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public HyracksException(String message) {
-        super(message);
+        this(ErrorMessageUtil.NONE, UNKNOWN, message, null, null);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public HyracksException(Throwable cause) {
-        super(cause);
+        this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
+    }
+
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
+    public HyracksException(Throwable cause, String nodeId) {
+        this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, nodeId);
+    }
+
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
+    public HyracksException(String message, Throwable cause, String nodeId) {
+        this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, nodeId);
     }
 
+    /**
+     * @deprecated Error code is needed.
+     */
+    @Deprecated
     public HyracksException(String message, Throwable cause) {
-        super(message, cause);
+        this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, (String) null);
+    }
+
+    public HyracksException(String component, int errorCode, Serializable... params) {
+        this(component, errorCode, null, null, null, params);
+    }
+
+    public HyracksException(Throwable cause, int errorCode, Serializable... params) {
+        this(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, params);
+    }
+
+    public HyracksException(String component, int errorCode, String message, Serializable... params) {
+        this(component, errorCode, message, null, null, params);
+    }
+
+    public HyracksException(String component, int errorCode, Throwable cause, Serializable... params) {
+        this(component, errorCode, cause.getMessage(), cause, null, params);
+    }
+
+    public HyracksException(String component, int errorCode, String message, Throwable cause, Serializable... params) {
+        this(component, errorCode, message, cause, null, params);
+    }
+
+    public String getComponent() {
+        return component;
+    }
+
+    public int getErrorCode() {
+        return errorCode;
+    }
+
+    public Object[] getParams() {
+        return params;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String getMessage() {
+        if (msgCache == null) {
+            msgCache = ErrorMessageUtil.formatMessage(component, errorCode, super.getMessage(), params);
+        }
+        return msgCache;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 84a961e..5787c72 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -28,10 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.constraints.Constraint;
 import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
@@ -44,6 +40,12 @@ import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class JobSpecification implements Serializable, IOperatorDescriptorRegistry, IConnectorDescriptorRegistry {
     private static final long serialVersionUID = 1L;
@@ -76,11 +78,9 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
 
     private IJobletEventListenerFactory jobletEventListenerFactory;
 
-    private IGlobalJobDataFactory globalJobDataFactory;
-
     private boolean useConnectorPolicyForScheduling;
 
-    private boolean reportTaskDetails;
+    private IClusterCapacity requiredClusterCapacity;
 
     private transient int operatorIdCounter;
 
@@ -106,7 +106,7 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
         connectorIdCounter = 0;
         maxReattempts = 2;
         useConnectorPolicyForScheduling = false;
-        reportTaskDetails = true;
+        requiredClusterCapacity = new ClusterCapacity();
         setFrameSize(frameSize);
     }
 
@@ -281,14 +281,6 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
         this.jobletEventListenerFactory = jobletEventListenerFactory;
     }
 
-    public IGlobalJobDataFactory getGlobalJobDataFactory() {
-        return globalJobDataFactory;
-    }
-
-    public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
-        this.globalJobDataFactory = globalJobDataFactory;
-    }
-
     public boolean isUseConnectorPolicyForScheduling() {
         return useConnectorPolicyForScheduling;
     }
@@ -297,12 +289,12 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
         this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
     }
 
-    public boolean isReportTaskDetails() {
-        return reportTaskDetails;
+    public void setRequiredClusterCapacity(IClusterCapacity capacity) {
+        this.requiredClusterCapacity = capacity;
     }
 
-    public void setReportTaskDetails(boolean reportTaskDetails) {
-        this.reportTaskDetails = reportTaskDetails;
+    public IClusterCapacity getRequiredClusterCapacity() {
+        return requiredClusterCapacity;
     }
 
     private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
index 4351e39..50db00d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
@@ -16,11 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.api.job;
 
 public enum JobStatus {
-    INITIALIZED,
+    PENDING,
     RUNNING,
     TERMINATED,
     FAILURE,
+    FAILURE_BEFORE_EXECUTION
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
new file mode 100644
index 0000000..ded4b63
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+public class ClusterCapacity implements IClusterCapacity {
+
+    private long aggregatedMemoryByteSize = 0;
+    private int aggregatedCores = 0;
+    private final Map<String, Long> nodeMemoryMap = new HashMap<>();
+    private final Map<String, Integer> nodeCoreMap = new HashMap<>();
+
+    @Override
+    public long getAggregatedMemoryByteSize() {
+        return aggregatedMemoryByteSize;
+    }
+
+    @Override
+    public int getAggregatedCores() {
+        return aggregatedCores;
+    }
+
+    @Override
+    public long getMemoryByteSize(String nodeId) throws HyracksException {
+        if (!nodeMemoryMap.containsKey(nodeId)) {
+            throw HyracksException.create(ErrorCode.NO_SUCH_NODE, nodeId);
+        }
+        return nodeMemoryMap.get(nodeId);
+    }
+
+    @Override
+    public int getCores(String nodeId) throws HyracksException {
+        if (!nodeMemoryMap.containsKey(nodeId)) {
+            throw HyracksException.create(ErrorCode.NO_SUCH_NODE, nodeId);
+        }
+        return nodeCoreMap.get(nodeId);
+    }
+
+    @Override
+    public void setAggregatedMemoryByteSize(long aggregatedMemoryByteSize) {
+        this.aggregatedMemoryByteSize = aggregatedMemoryByteSize;
+    }
+
+    @Override
+    public void setAggregatedCores(int aggregatedCores) {
+        this.aggregatedCores = aggregatedCores;
+    }
+
+    @Override
+    public void setMemoryByteSize(String nodeId, long memoryByteSize) {
+        nodeMemoryMap.put(nodeId, memoryByteSize);
+    }
+
+    @Override
+    public void setCores(String nodeId, int cores) {
+        nodeCoreMap.put(nodeId, cores);
+    }
+
+    @Override
+    public void update(String nodeId, NodeCapacity nodeCapacity) throws HyracksException {
+        // Removes the existing node resource and the aggregated resource statistics.
+        if (nodeMemoryMap.containsKey(nodeId)) {
+            aggregatedMemoryByteSize -= nodeMemoryMap.remove(nodeId);
+        }
+        if (nodeCoreMap.containsKey(nodeId)) {
+            aggregatedCores -= nodeCoreMap.remove(nodeId);
+        }
+
+        long memorySize = nodeCapacity.getMemoryByteSize();
+        int cores = nodeCapacity.getCores();
+        // Updates the node capacity map when both memory size and cores are positive.
+        if (memorySize > 0 && cores > 0) {
+            aggregatedMemoryByteSize += memorySize;
+            aggregatedCores += cores;
+            nodeMemoryMap.put(nodeId, memorySize);
+            nodeCoreMap.put(nodeId, cores);
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return ObjectUtils.hashCodeMulti(aggregatedMemoryByteSize, aggregatedCores, nodeMemoryMap,
+                nodeCoreMap);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof ClusterCapacity)) {
+            return false;
+        }
+        ClusterCapacity capacity = (ClusterCapacity) o;
+        return aggregatedMemoryByteSize == capacity.aggregatedMemoryByteSize
+                && aggregatedCores == capacity.aggregatedCores
+                && ObjectUtils.equals(nodeMemoryMap, capacity.nodeMemoryMap)
+                && ObjectUtils.equals(nodeCoreMap, capacity.nodeCoreMap);
+    }
+
+    @Override
+    public String toString() {
+        return "capacity (memory: " + aggregatedMemoryByteSize + " bytes, CPU cores: " + aggregatedCores + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
new file mode 100644
index 0000000..9e38a20
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class DefaultJobCapacityController implements IJobCapacityController {
+
+    public static final DefaultJobCapacityController INSTANCE = new DefaultJobCapacityController();
+
+    private DefaultJobCapacityController() {
+    }
+
+    @Override
+    public JobSubmissionStatus allocate(JobSpecification job) {
+        return JobSubmissionStatus.EXECUTE;
+    }
+
+    @Override
+    public void release(JobSpecification job) {
+        // No operation here.
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
new file mode 100644
index 0000000..ac3261d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+/**
+ * This interface abstracts the mutable capacity for a cluster.
+ */
+public interface IClusterCapacity extends IReadOnlyClusterCapacity {
+
+    /**
+     * Sets the aggregated memory size for a cluster.
+     *
+     * @param aggregatedMemoryByteSize,
+     *            the aggregated memory size.
+     */
+    void setAggregatedMemoryByteSize(long aggregatedMemoryByteSize);
+
+    /**
+     * Sets the aggregated number of CPU cores for a cluster.
+     *
+     * @param aggregatedCores,
+     *            the total number of cores.
+     */
+    void setAggregatedCores(int aggregatedCores);
+
+    /**
+     * Sets the memory byte size (for computation) of a specific node.
+     *
+     * @param nodeId,
+     *            the node id.
+     * @param memoryByteSize,
+     *            the available memory byte size for computation of the node.
+     */
+    void setMemoryByteSize(String nodeId, long memoryByteSize);
+
+    /**
+     * Sets the number of CPU cores for a specific node.
+     *
+     * @param nodeId,
+     *            the node id.
+     * @param cores,
+     *            the number of CPU cores for the node.
+     */
+    void setCores(String nodeId, int cores);
+
+    /**
+     * Updates the cluster capacity information with the capacity of one particular node.
+     *
+     * @param nodeId,
+     *            the id of the node for updating.
+     * @param capacity,
+     *            the capacity of one particular node.
+     * @throws HyracksException
+     *             when the parameters are invalid.
+     */
+    void update(String nodeId, NodeCapacity capacity) throws HyracksException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
new file mode 100644
index 0000000..5fa4bd9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+
+/**
+ * This interface determines the behavior of a job when it is submitted to the job manager.
+ * The job could be one of the following three cases:
+ * -- rejected immediately because its capacity requirement exceeds the cluster's capacity.
+ * -- entered into a pending job queue for deferred execution, due to the current capacity limitation because of
+ * concurrent running jobs;
+ * -- executed immediately because there is sufficient capacity.
+ */
+public interface IJobCapacityController {
+
+    enum JobSubmissionStatus {
+        EXECUTE,
+        QUEUE
+    }
+
+    /**
+     * Allocates required cluster capacity for a job.
+     *
+     * @param job,
+     *            the job specification.
+     * @return EXECUTE, if the job can be executed immediately;
+     *         QUEUE, if the job cannot be executed
+     * @throws HyracksException
+     *             if the job's capacity requirement exceeds the maximum capacity of the cluster.
+     */
+    JobSubmissionStatus allocate(JobSpecification job) throws HyracksException;
+
+    /**
+     * Releases cluster capacity for a job when it completes.
+     *
+     * @param job,
+     *            the job specification.
+     */
+    void release(JobSpecification job);
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java
new file mode 100644
index 0000000..59b6bfd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+/**
+ * This interface provides read-only methods for the capacity of a cluster.
+ */
+public interface IReadOnlyClusterCapacity extends Serializable {
+
+    /**
+     * @return the aggregated memory byte size for the cluster.
+     */
+    long getAggregatedMemoryByteSize();
+
+    /**
+     * @return the aggregated number of cores
+     */
+    int getAggregatedCores();
+
+    /**
+     * Retrieves the memory byte size for computation on a specific node.
+     * (Note that usually a portion of memory is used for storage.)
+     *
+     * @param nodeId,
+     *            the node id.
+     * @return the memory byte size for computation on the node.
+     * @throws HyracksException
+     *             when the input node does not exist.
+     */
+    long getMemoryByteSize(String nodeId) throws HyracksException;
+
+    /**
+     * Retrieves the number of CPU cores for computation on a specific node.
+     *
+     * @param nodeId,
+     *            the node id.
+     * @return the number of CPU cores for computation on the node.
+     * @throws HyracksException,
+     *             when the input node does not exist.
+     */
+    int getCores(String nodeId) throws HyracksException;
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/NodeCapacity.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/NodeCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/NodeCapacity.java
new file mode 100644
index 0000000..7902e7d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/NodeCapacity.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import java.io.Serializable;
+
+/**
+ * Specifies the capacity for computation on a particular node, i.e., a NCDriver process.
+ */
+public class NodeCapacity implements Serializable {
+
+    // All memory for computations -- this is not changed during the lifetime of a running instance.
+    private final long memoryByteSize;
+
+    // All CPU cores -- currently we assume that it doesn't change during the lifetime of a running instance.
+    // Otherwise, for each heartbeat, we will have to update global cluster capacity of a cluster.
+    private final int cores;
+
+    /**
+     * NOTE: neither parameters can be negative.
+     * However, both of them can be zero, which means the node is to be removed from the cluster.
+     *
+     * @param memorySize,
+     *            the memory size of the node.
+     * @param cores,
+     *            the number of cores of the node.
+     */
+    public NodeCapacity(long memorySize, int cores) {
+        this.memoryByteSize = memorySize;
+        this.cores = cores;
+    }
+
+    public long getMemoryByteSize() {
+        return memoryByteSize;
+    }
+
+    public int getCores() {
+        return cores;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 52367ee..d17c9aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -17,9 +17,18 @@
 # under the License.
 #
 
-1 = Unsupported operation %1$s in %2$s operator
-2 = Error in processing tuple %1$s in a frame
-4 = The file with absolute path %1$s is not within any of the current IO devices
-5 = Phrase search in Full-text is not supported. An expression should include only one word
+# 0 --- 9999: runtime errors
+# 10000 ---- 19999: compilation errors
 
-1001 = The given rule collection %1$s is not an instance of the List class.
\ No newline at end of file
+1=Unsupported operation %1$s in %2$s operator
+2=Error in processing tuple %1$s in a frame
+4=The file with absolute path %1$s is not within any of the current IO devices
+5=Phrase search in Full-text is not supported. An expression should include only one word
+6=Job queue is full with %1$s jobs
+7=Network address cannot be resolved -- %1$s
+8=Invalid internal input parameter
+9=Job requirement %1$s exceeds capacity %2$s
+10=Node %1$s does not exist
+11=Class loading issue: %1$s
+
+10000 = The given rule collection %1$s is not an instance of the List class.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/resource/ClusterCapacityTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/resource/ClusterCapacityTest.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/resource/ClusterCapacityTest.java
new file mode 100644
index 0000000..277e8e2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/resource/ClusterCapacityTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClusterCapacityTest {
+
+    @Test
+    public void test() throws HyracksException {
+        ClusterCapacity capacity = new ClusterCapacity();
+        String nodeId = "node1";
+
+        // Adds one node.
+        capacity.update(nodeId, new NodeCapacity(1024L, 8));
+        Assert.assertTrue(capacity.getAggregatedMemoryByteSize() == 1024L);
+        Assert.assertTrue(capacity.getAggregatedCores() == 8);
+
+        // Updates the node.
+        capacity.update(nodeId, new NodeCapacity(-1L, -2));
+
+        // Verifies that node is removed
+        Assert.assertTrue(capacity.getAggregatedMemoryByteSize() == 0L);
+        Assert.assertTrue(capacity.getAggregatedCores() == 0);
+
+        boolean nodeNotExist = false;
+        try {
+            capacity.getMemoryByteSize(nodeId);
+        } catch (HyracksException e) {
+            nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+        }
+        Assert.assertTrue(nodeNotExist);
+        nodeNotExist = false;
+        try {
+            capacity.getCores(nodeId);
+        } catch (HyracksException e) {
+            nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+        }
+        Assert.assertTrue(nodeNotExist);
+
+        // Adds the node again.
+        capacity.update(nodeId, new NodeCapacity(1024L, 8));
+        // Updates the node.
+        capacity.update(nodeId, new NodeCapacity(4L, 0));
+
+        // Verifies that node does not exist
+        Assert.assertTrue(capacity.getAggregatedMemoryByteSize() == 0L);
+        Assert.assertTrue(capacity.getAggregatedCores() == 0);
+        nodeNotExist = false;
+        try {
+            capacity.getMemoryByteSize(nodeId);
+        } catch (HyracksException e) {
+            nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+        }
+        Assert.assertTrue(nodeNotExist);
+        nodeNotExist = false;
+        try {
+            capacity.getCores(nodeId);
+        } catch (HyracksException e) {
+            nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+        }
+        Assert.assertTrue(nodeNotExist);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
index e949e57..105d47f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
@@ -113,5 +113,16 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>2.0.2-beta</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 26beb63..7ea5f70 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -72,13 +72,13 @@ class ClientInterfaceIPCI implements IIPCI {
             case GET_JOB_STATUS:
                 HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
                         (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
-                ccs.getWorkQueue().schedule(new GetJobStatusWork(ccs, gjsf.getJobId(),
-                        new IPCResponder<JobStatus>(handle, mid)));
+                ccs.getWorkQueue().schedule(
+                        new GetJobStatusWork(ccs.getJobManager(), gjsf.getJobId(), new IPCResponder<>(handle, mid)));
                 break;
             case GET_JOB_INFO:
                 HyracksClientInterfaceFunctions.GetJobInfoFunction gjif =
                         (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
-                ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs, gjif.getJobId(),
+                ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs.getJobManager(), gjif.getJobId(),
                         new IPCResponder<JobInfo>(handle, mid)));
                 break;
             case START_JOB:
@@ -118,8 +118,8 @@ class ClientInterfaceIPCI implements IIPCI {
                         new IPCResponder<>(handle, mid)));
                 break;
             case GET_NODE_CONTROLLERS_INFO:
-                ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs,
-                        new IPCResponder<>(handle, mid)));
+                ccs.getWorkQueue().schedule(
+                        new GetNodeControllersInfoWork(ccs.getNodeManager(), new IPCResponder<>(handle, mid)));
                 break;
             case GET_CLUSTER_TOPOLOGY:
                 try {
@@ -149,7 +149,8 @@ class ClientInterfaceIPCI implements IIPCI {
             case GET_NODE_DETAILS_JSON:
                 HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gndjf =
                         (HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn;
-                ccs.getWorkQueue().schedule(new GetNodeDetailsJSONWork(ccs, gndjf.getNodeId(),
+                ccs.getWorkQueue()
+                        .schedule(new GetNodeDetailsJSONWork(ccs.getNodeManager(), ccs.getCCConfig(), gndjf.getNodeId(),
                         gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid)));
                 break;
             case THREAD_DUMP:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index b6c9a08..21fcf92 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -68,7 +68,7 @@ class ClusterControllerIPCI implements IIPCI {
                 break;
             case UNREGISTER_NODE:
                 CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn;
-                ccs.getWorkQueue().schedule(new UnregisterNodeWork(ccs, unf.getNodeId()));
+                ccs.getWorkQueue().schedule(new UnregisterNodeWork(ccs.getNodeManager(), unf.getNodeId()));
                 break;
             case NODE_HEARTBEAT:
                 CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
@@ -87,7 +87,7 @@ class ClusterControllerIPCI implements IIPCI {
                 break;
             case REPORT_PROFILE:
                 CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
-                ccs.getWorkQueue().schedule(new ReportProfilesWork(ccs, rpf.getProfiles()));
+                ccs.getWorkQueue().schedule(new ReportProfilesWork(ccs.getJobManager(), rpf.getProfiles()));
                 break;
             case NOTIFY_TASK_COMPLETE:
                 CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
@@ -137,7 +137,7 @@ class ClusterControllerIPCI implements IIPCI {
                         rsf.getDeploymentId(), rsf.getNodeId()));
                 break;
             case GET_NODE_CONTROLLERS_INFO:
-                ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs,
+                ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs.getNodeManager(),
                         new IResultCallback<Map<String, NodeControllerInfo>>() {
                             @Override
                             public void setValue(Map<String, NodeControllerInfo> result) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 5fdcede..1a363c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -20,12 +20,12 @@ package org.apache.hyracks.control.cc;
 
 import java.io.File;
 import java.io.FileReader;
+import java.lang.reflect.Constructor;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -43,17 +43,22 @@ import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.api.topology.TopologyDefinitionParser;
 import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.cc.cluster.NodeManager;
 import org.apache.hyracks.control.cc.dataset.DatasetDirectoryService;
 import org.apache.hyracks.control.cc.dataset.IDatasetDirectoryService;
-import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.cc.scheduler.IResourceManager;
+import org.apache.hyracks.control.cc.scheduler.ResourceManager;
 import org.apache.hyracks.control.cc.web.WebServer;
-import org.apache.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
 import org.apache.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
+import org.apache.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
 import org.apache.hyracks.control.cc.work.GetThreadDumpWork.ThreadDumpRun;
 import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork;
 import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork;
@@ -83,10 +88,6 @@ public class ClusterControllerService implements IControllerService {
 
     private final LogFile jobLog;
 
-    private final Map<String, NodeControllerState> nodeRegistry;
-
-    private final Map<InetAddress, Set<String>> ipAddressNodeNameMap;
-
     private final ServerContext serverCtx;
 
     private final WebServer webServer;
@@ -95,12 +96,6 @@ public class ClusterControllerService implements IControllerService {
 
     private CCApplicationContext appCtx;
 
-    private final Map<JobId, JobRun> activeRunMap;
-
-    private final Map<JobId, JobRun> runMapArchive;
-
-    private final Map<JobId, List<Exception>> runMapHistory;
-
     private final WorkQueue workQueue;
 
     private ExecutorService executor;
@@ -119,6 +114,12 @@ public class ClusterControllerService implements IControllerService {
 
     private final Map<String, ThreadDumpRun> threadDumpRunMap;
 
+    private final INodeManager nodeManager;
+
+    private final IResourceManager resourceManager = new ResourceManager();
+
+    private IJobManager jobManager;
+
     private ShutdownRun shutdownCallback;
 
     private ICCApplicationEntryPoint aep;
@@ -127,8 +128,6 @@ public class ClusterControllerService implements IControllerService {
         this.ccConfig = ccConfig;
         File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
         jobLog = new LogFile(jobLogFolder);
-        nodeRegistry = new LinkedHashMap<>();
-        ipAddressNodeNameMap = new HashMap<>();
         serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
         IIPCI ccIPCI = new ClusterControllerIPCI(this);
         clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
@@ -137,25 +136,7 @@ public class ClusterControllerService implements IControllerService {
         clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
                 new JavaSerializationBasedPayloadSerializerDeserializer());
         webServer = new WebServer(this);
-        activeRunMap = new HashMap<>();
-        runMapArchive = new LinkedHashMap<JobId, JobRun>() {
-            private static final long serialVersionUID = 1L;
 
-            @Override
-            protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> eldest) {
-                return size() > ccConfig.jobHistorySize;
-            }
-        };
-        runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
-            private static final long serialVersionUID = 1L;
-            /** history size + 1 is for the case when history size = 0 */
-            private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
-
-            @Override
-            protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
-                return size() > allowedSize;
-            }
-        };
         // WorkQueue is in charge of heartbeat as well as other events.
         workQueue = new WorkQueue("ClusterController", Thread.MAX_PRIORITY);
         this.timer = new Timer(true);
@@ -167,6 +148,9 @@ public class ClusterControllerService implements IControllerService {
         deploymentRunMap = new HashMap<>();
         stateDumpRunMap = new HashMap<>();
         threadDumpRunMap = Collections.synchronizedMap(new HashMap<>());
+
+        // Node manager is in charge of cluster membership management.
+        nodeManager = new NodeManager(ccConfig, resourceManager);
     }
 
     private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
@@ -207,13 +191,22 @@ public class ClusterControllerService implements IControllerService {
         appCtx.addJobLifecycleListener(datasetDirectoryService);
         executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
         String className = ccConfig.appCCMainClass;
+
+        IJobCapacityController jobCapacityController = DefaultJobCapacityController.INSTANCE;
         if (className != null) {
             Class<?> c = Class.forName(className);
             aep = (ICCApplicationEntryPoint) c.newInstance();
             String[] args = ccConfig.appArgs == null ? null
                     : ccConfig.appArgs.toArray(new String[ccConfig.appArgs.size()]);
             aep.start(appCtx, args);
+            jobCapacityController = aep.getJobCapacityController();
         }
+
+        // Job manager is in charge of job lifecycle management.
+        Constructor<?> jobManagerConstructor = this.getClass().getClassLoader().loadClass(ccConfig.jobManagerClassName)
+                .getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
+        jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
+
     }
 
     private void connectNCs() throws Exception {
@@ -301,20 +294,16 @@ public class ClusterControllerService implements IControllerService {
         return ccContext;
     }
 
-    public Map<JobId, JobRun> getActiveRunMap() {
-        return activeRunMap;
-    }
-
-    public Map<JobId, JobRun> getRunMapArchive() {
-        return runMapArchive;
+    public IJobManager getJobManager() {
+        return jobManager;
     }
 
-    public Map<JobId, List<Exception>> getRunHistory() {
-        return runMapHistory;
+    public INodeManager getNodeManager() {
+        return nodeManager;
     }
 
-    public Map<InetAddress, Set<String>> getIpAddressNodeNameMap() {
-        return ipAddressNodeNameMap;
+    public IResourceManager getResourceManager() {
+        return resourceManager;
     }
 
     public LogFile getJobLogFile() {
@@ -329,10 +318,6 @@ public class ClusterControllerService implements IControllerService {
         return executor;
     }
 
-    public Map<String, NodeControllerState> getNodeMap() {
-        return nodeRegistry;
-    }
-
     public CCConfig getConfig() {
         return ccConfig;
     }
@@ -366,7 +351,8 @@ public class ClusterControllerService implements IControllerService {
 
         @Override
         public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws HyracksDataException {
-            GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork(ClusterControllerService.this, map);
+            GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork(
+                    ClusterControllerService.this.getNodeManager(), map);
             try {
                 workQueue.scheduleAndSync(ginmw);
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index bf94dff..955b7f2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -25,10 +25,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
@@ -36,6 +35,9 @@ import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class NodeControllerState {
     private static final int RRD_SIZE = 720;
 
@@ -141,7 +143,7 @@ public class NodeControllerState {
 
     private int lastHeartbeatDuration;
 
-    private int numCores;
+    private NodeCapacity capacity;
 
     public NodeControllerState(INodeController nodeController, NodeRegistration reg) {
         this.nodeController = nodeController;
@@ -204,7 +206,7 @@ public class NodeControllerState {
         diskWrites = new long[RRD_SIZE];
 
         rrdPtr = 0;
-        numCores = 0;
+        capacity = reg.getCapacity();
     }
 
     public synchronized void notifyHeartbeat(HeartbeatData hbData) {
@@ -242,7 +244,6 @@ public class NodeControllerState {
             diskReads[rrdPtr] = hbData.diskReads;
             diskWrites[rrdPtr] = hbData.diskWrites;
             rrdPtr = (rrdPtr + 1) % RRD_SIZE;
-            numCores = hbData.numCores;
         }
     }
 
@@ -250,10 +251,6 @@ public class NodeControllerState {
         return lastHeartbeatDuration++;
     }
 
-    public int getLastHeartbeatDuration() {
-        return lastHeartbeatDuration;
-    }
-
     public INodeController getNodeController() {
         return nodeController;
     }
@@ -277,8 +274,9 @@ public class NodeControllerState {
     public NetworkAddress getMessagingPort() {
         return messagingPort;
     }
-    public int getNumCores() {
-        return numCores;
+
+    public NodeCapacity getCapacity() {
+        return capacity;
     }
 
     public synchronized ObjectNode toSummaryJSON()  {
@@ -324,6 +322,8 @@ public class NodeControllerState {
             o.putPOJO("nonheap-used-sizes", nonheapUsedSize);
             o.putPOJO("nonheap-committed-sizes", nonheapCommittedSize);
             o.putPOJO("nonheap-max-sizes", nonheapMaxSize);
+            o.putPOJO("application-memory-budget", capacity.getMemoryByteSize());
+            o.putPOJO("application-cpu-core-budget", capacity.getCores());
             o.putPOJO("thread-counts", threadCount);
             o.putPOJO("peak-thread-counts", peakThreadCount);
             o.putPOJO("system-load-averages", systemLoadAverage);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/IndexPage.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/IndexPage.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/IndexPage.java
index 680c2a7..443fa88 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/IndexPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/IndexPage.java
@@ -42,7 +42,7 @@ public class IndexPage extends AbstractPage {
     public IndexPage() throws Exception {
         ClusterControllerService ccs = getAdminConsoleApplication().getClusterControllerService();
 
-        GetNodeSummariesJSONWork gnse = new GetNodeSummariesJSONWork(ccs);
+        GetNodeSummariesJSONWork gnse = new GetNodeSummariesJSONWork(ccs.getNodeManager());
         ccs.getWorkQueue().scheduleAndSync(gnse);
         ArrayNode nodeSummaries = gnse.getSummaries();
         add(new Label("node-count", String.valueOf(nodeSummaries.size())));
@@ -63,7 +63,7 @@ public class IndexPage extends AbstractPage {
         };
         add(nodeList);
 
-        GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs);
+        GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs.getJobManager());
         ccs.getWorkQueue().scheduleAndSync(gjse);
         ArrayNode jobSummaries = gjse.getSummaries();
         ListView<JsonNode> jobList = new ListView<JsonNode>("jobs-list", Lists.newArrayList(jobSummaries.iterator())) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
index abc07d9..68add85 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
@@ -79,7 +79,7 @@ public class JobDetailsPage extends AbstractPage {
             }
         }
 
-        GetJobRunJSONWork gjrw = new GetJobRunJSONWork(ccs, jobId);
+        GetJobRunJSONWork gjrw = new GetJobRunJSONWork(ccs.getJobManager(), jobId);
         ccs.getWorkQueue().scheduleAndSync(gjrw);
         Label jobrun = new Label("job-run", gjrw.getJSON().toString());
         jobrun.setEscapeModelStrings(false);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index dd6f83b..e43a59d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.control.cc.application;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -110,7 +111,7 @@ public class CCApplicationContext extends ApplicationContext implements ICCAppli
         }
     }
 
-    public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException {
+    public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException {
         for (IClusterLifecycleListener l : clusterLifecycleListeners) {
             l.notifyNodeFailure(deadNodeIds);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/INodeManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/INodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/INodeManager.java
new file mode 100644
index 0000000..40d81f8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/INodeManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.cluster;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.NodeControllerState;
+
+/**
+ * This interface provides abstractions for a node manager, which manages the node membership in a cluster.
+ */
+public interface INodeManager {
+
+    /**
+     * A functional interface for applying a function for each node.
+     */
+    @FunctionalInterface
+    interface NodeFunction {
+        void apply(String nodeId, NodeControllerState ncState);
+    }
+
+    /**
+     * Applies a function for each node in the cluster.
+     *
+     * @param nodeFunction,
+     *            a function implementation that follows the <code>NodeFunction</code> interface.
+     */
+    void apply(NodeFunction nodeFunction);
+
+    /**
+     * @return all node ids.
+     */
+    Collection<String> getAllNodeIds();
+
+    /**
+     * @return all node controller states.
+     */
+    Collection<NodeControllerState> getAllNodeControllerStates();
+
+    /**
+     * @return the map that maps a IP addresses to a set of node names.
+     */
+    Map<InetAddress, Set<String>> getIpAddressNodeNameMap();
+
+    /**
+     * @return the map that maps a node id to its corresponding node controller info.
+     */
+    Map<String, NodeControllerInfo> getNodeControllerInfoMap();
+
+    /**
+     * Removes all nodes that are considered "dead", i.e., which run out of heartbeats.
+     *
+     * @return all dead nodes and their impacted jobs.
+     * @throws HyracksException
+     *             when any IP address given in the dead nodes is not valid
+     */
+    Pair<Collection<String>, Collection<JobId>> removeDeadNodes() throws HyracksException;
+
+    /**
+     * Retrieves the node controller state from a given node id.
+     *
+     * @param nodeId,
+     *            a given node id.
+     * @return the corresponding node controller state.
+     */
+    NodeControllerState getNodeControllerState(String nodeId);
+
+    /**
+     * Adds one node into the cluster.
+     *
+     * @param nodeId,
+     *            the node id.
+     * @param ncState,
+     *            the node controller state.
+     * @throws HyracksException
+     *             when the node has already been added or the IP address given in the node state is not valid.
+     */
+    void addNode(String nodeId, NodeControllerState ncState) throws HyracksException;
+
+    /**
+     * Removes one node from the cluster.
+     *
+     * @param nodeId,
+     *            the node id.
+     * @throws HyracksException
+     *             when the IP address given in the node state is not valid
+     */
+    void removeNode(String nodeId) throws HyracksException;
+
+}