You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ti...@apache.org on 2017/01/25 14:46:39 UTC
[6/7] asterixdb git commit: Implements concurrent query management
support.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
index 92bc076..774c4d9 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
@@ -46,22 +46,42 @@ public class AlgebricksException extends Exception {
return new AlgebricksException(ErrorCode.HYRACKS, errorCode, ErrorCode.getErrorMessage(errorCode), params);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public AlgebricksException(String message) {
this(ErrorMessageUtil.NONE, UNKNOWN, message, null, null);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public AlgebricksException(Throwable cause) {
this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public AlgebricksException(Throwable cause, String nodeId) {
this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, nodeId);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public AlgebricksException(String message, Throwable cause, String nodeId) {
this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, nodeId);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public AlgebricksException(String message, Throwable cause) {
this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, (String) null);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/rewriter/rulecontrollers/SequentialFirstRuleCheckFixpointRuleController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/rewriter/rulecontrollers/SequentialFirstRuleCheckFixpointRuleController.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/rewriter/rulecontrollers/SequentialFirstRuleCheckFixpointRuleController.java
index 9446756..7328278 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/rewriter/rulecontrollers/SequentialFirstRuleCheckFixpointRuleController.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/rewriter/rulecontrollers/SequentialFirstRuleCheckFixpointRuleController.java
@@ -59,7 +59,7 @@ public class SequentialFirstRuleCheckFixpointRuleController extends AbstractRule
if (ruleCollection instanceof List) {
rules = (List<IAlgebraicRewriteRule>) ruleCollection;
} else {
- throw AlgebricksException.create(ErrorCode.COMPILATION_RULECOLLECTION_NOT_INSTANCE_OF_LIST,
+ throw AlgebricksException.create(ErrorCode.RULECOLLECTION_NOT_INSTANCE_OF_LIST,
this.getClass().getName());
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 7f132dd..2bb6cc2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -93,5 +93,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
index 8b83d83..5f4877d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
/**
* Application Context at the Cluster Controller for an application.
@@ -38,7 +39,7 @@ public interface ICCApplicationContext extends IApplicationContext {
* @param state
* The distributed state
*/
- public void setDistributedState(Serializable state);
+ void setDistributedState(Serializable state);
/**
* A listener that listens to Job Lifecycle events at the Cluster
@@ -46,21 +47,21 @@ public interface ICCApplicationContext extends IApplicationContext {
*
* @param jobLifecycleListener
*/
- public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
+ void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
/**
* A listener that listens to Cluster Lifecycle events at the Cluster
* Controller.
*
- * @param jobLifecycleListener
+ * @param clusterLifecycleListener
*/
- public void addClusterLifecycleListener(IClusterLifecycleListener clusterLifecycleListener);
+ void addClusterLifecycleListener(IClusterLifecycleListener clusterLifecycleListener);
/**
* Get the Cluster Controller Context.
*
* @return The Cluster Controller Context.
*/
- public ICCContext getCCContext();
+ ICCContext getCCContext();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
index 9f7f222..c11cc7a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
@@ -18,10 +18,14 @@
*/
package org.apache.hyracks.api.application;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+
public interface ICCApplicationEntryPoint {
- public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception;
+ void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception;
- public void stop() throws Exception;
+ void stop() throws Exception;
void startupCompleted() throws Exception;
+
+ IJobCapacityController getJobCapacityController();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
index a9bef18..191a4af 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.api.application;
+import java.util.Collection;
import java.util.Map;
import java.util.Set;
@@ -47,6 +48,6 @@ public interface IClusterLifecycleListener {
* @param deadNodeIds
* A set of Node Controller Ids that have left the cluster. The set is not cumulative.
*/
- public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException;
+ public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
index ea850c5..dea6e4b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
@@ -18,10 +18,14 @@
*/
package org.apache.hyracks.api.application;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
+
public interface INCApplicationEntryPoint {
- public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception;
+ void start(INCApplicationContext ncAppCtx, String[] args) throws Exception;
+
+ void notifyStartupComplete() throws Exception;
- public void notifyStartupComplete() throws Exception;
+ void stop() throws Exception;
- public void stop() throws Exception;
+ NodeCapacity getCapacity();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
index c90644f..57f389f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
@@ -35,16 +35,16 @@ public class NodeControllerInfo implements Serializable {
private final NetworkAddress messagingNetworkAddress;
- private final int numCores;
+ private final int numAvailableCores;
public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress,
- NetworkAddress datasetNetworkAddress, NetworkAddress messagingNetworkAddress, int numCores) {
+ NetworkAddress datasetNetworkAddress, NetworkAddress messagingNetworkAddress, int numAvailableCores) {
this.nodeId = nodeId;
this.status = status;
this.netAddress = netAddress;
this.datasetNetworkAddress = datasetNetworkAddress;
this.messagingNetworkAddress = messagingNetworkAddress;
- this.numCores = numCores;
+ this.numAvailableCores = numAvailableCores;
}
public String getNodeId() {
@@ -67,7 +67,7 @@ public class NodeControllerInfo implements Serializable {
return messagingNetworkAddress;
}
- public int getNumCores() {
- return numCores;
+ public int getNumAvailableCores() {
+ return numAvailableCores;
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 00c2cc4..9a908ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -75,10 +75,8 @@ public class JobSpecificationActivityClusterGraphGeneratorFactory implements IAc
acg.setFrameSize(spec.getFrameSize());
acg.setMaxReattempts(spec.getMaxReattempts());
acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
- acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
acg.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
- acg.setReportTaskDetails(spec.isReportTaskDetails());
final Set<Constraint> constraints = new HashSet<Constraint>();
final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 5a67188..1ca8bb3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -27,19 +27,28 @@ import org.apache.hyracks.api.util.ErrorMessageUtil;
/**
* A registry of runtime/compile error codes
* Error code:
- * 0 --- 999: runtime errors
- * 1000 ---- 1999: compilation errors
+ * 0 --- 9999: runtime errors
+ * 10000 ---- 19999: compilation errors
*/
public class ErrorCode {
private static final String RESOURCE_PATH = "errormsg" + File.separator + "en.properties";
public static final String HYRACKS = "HYR";
+ // Runtime error codes.
public static final int INVALID_OPERATOR_OPERATION = 1;
public static final int ERROR_PROCESSING_TUPLE = 2;
public static final int FAILURE_ON_NODE = 3;
- public static final int RUNTIME_FILE_WITH_ABSOULTE_PATH_NOT_WITHIN_ANY_IO_DEVICE = 4;
- public static final int RUNTIME_FULLTEXT_PHRASE_FOUND = 5;
- public static final int COMPILATION_RULECOLLECTION_NOT_INSTANCE_OF_LIST = 1001;
+ public static final int FILE_WITH_ABSOULTE_PATH_NOT_WITHIN_ANY_IO_DEVICE = 4;
+ public static final int FULLTEXT_PHRASE_FOUND = 5;
+ public static final int JOB_QUEUE_FULL = 6;
+ public static final int INVALID_NETWORK_ADDRESS = 7;
+ public static final int INVALID_INPUT_PARAMETER = 8;
+ public static final int JOB_REQUIREMENTS_EXCEED_CAPACITY = 9;
+ public static final int NO_SUCH_NODE = 10;
+ public static final int CLASS_LOADING_ISSUE = 11;
+
+ // Compilation error codes.
+ public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10001;
// Loads the map that maps error codes to error message templates.
private static Map<Integer, String> errorMessageMap = null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 56be93e..0fd6923 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.hyracks.api.exceptions;
import java.io.Serializable;
-import java.util.logging.Logger;
import org.apache.hyracks.api.util.ErrorMessageUtil;
@@ -27,14 +27,6 @@ import org.apache.hyracks.api.util.ErrorMessageUtil;
* The main execution time exception type for runtime errors in a hyracks environment
*/
public class HyracksDataException extends HyracksException {
- private static final long serialVersionUID = 1L;
-
- public static final int UNKNOWN = 0;
- private final String component;
- private final int errorCode;
- private final Serializable[] params;
- private final String nodeId;
- private transient volatile String msgCache;
public static HyracksDataException create(int code, Serializable... params) {
return new HyracksDataException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params);
@@ -46,76 +38,68 @@ public class HyracksDataException extends HyracksException {
public HyracksDataException(String component, int errorCode, String message, Throwable cause, String nodeId,
Serializable... params) {
- super(message, cause);
- this.component = component;
- this.errorCode = errorCode;
- this.nodeId = nodeId;
- this.params = params;
+ super(component, errorCode, message, cause, nodeId, params);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public HyracksDataException(String message) {
- this(ErrorMessageUtil.NONE, UNKNOWN, message, null, null);
+ super(message);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public HyracksDataException(Throwable cause) {
- this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
+ super(cause);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public HyracksDataException(Throwable cause, String nodeId) {
- this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, nodeId);
+ super(cause, nodeId);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public HyracksDataException(String message, Throwable cause, String nodeId) {
- this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, nodeId);
+ super(message, cause, nodeId);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public HyracksDataException(String message, Throwable cause) {
- this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, (String) null);
+ super(message, cause);
}
public HyracksDataException(String component, int errorCode, Serializable... params) {
- this(component, errorCode, null, null, null, params);
+ super(component, errorCode, null, null, null, params);
}
public HyracksDataException(Throwable cause, int errorCode, Serializable... params) {
- this(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, params);
+ super(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, params);
}
public HyracksDataException(String component, int errorCode, String message, Serializable... params) {
- this(component, errorCode, message, null, null, params);
+ super(component, errorCode, message, null, null, params);
}
public HyracksDataException(String component, int errorCode, Throwable cause, Serializable... params) {
- this(component, errorCode, cause.getMessage(), cause, null, params);
+ super(component, errorCode, cause.getMessage(), cause, null, params);
}
public HyracksDataException(String component, int errorCode, String message, Throwable cause,
Serializable... params) {
- this(component, errorCode, message, cause, null, params);
- }
-
- public String getComponent() {
- return component;
- }
-
- public int getErrorCode() {
- return errorCode;
- }
-
- public Object[] getParams() {
- return params;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- @Override
- public String getMessage() {
- if (msgCache == null) {
- msgCache = ErrorMessageUtil.formatMessage(component, errorCode, super.getMessage(), params);
- }
- return msgCache;
+ super(component, errorCode, message, cause, null, params);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index e939d26..5d13212 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -19,22 +19,118 @@
package org.apache.hyracks.api.exceptions;
import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hyracks.api.util.ErrorMessageUtil;
public class HyracksException extends IOException {
private static final long serialVersionUID = 1L;
- public HyracksException() {
+ public static final int UNKNOWN = 0;
+ private final String component;
+ private final int errorCode;
+ private final Serializable[] params;
+ private final String nodeId;
+ private transient volatile String msgCache;
+
+ public static HyracksException create(int code, Serializable... params) {
+ return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params);
+ }
+
+ public static HyracksException create(int code, Throwable cause, Serializable... params) {
+ return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), cause, params);
}
+ public HyracksException(String component, int errorCode, String message, Throwable cause, String nodeId,
+ Serializable... params) {
+ super(message, cause);
+ this.component = component;
+ this.errorCode = errorCode;
+ this.nodeId = nodeId;
+ this.params = params;
+ }
+
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public HyracksException(String message) {
- super(message);
+ this(ErrorMessageUtil.NONE, UNKNOWN, message, null, null);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public HyracksException(Throwable cause) {
- super(cause);
+ this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
+ }
+
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
+ public HyracksException(Throwable cause, String nodeId) {
+ this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, nodeId);
+ }
+
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
+ public HyracksException(String message, Throwable cause, String nodeId) {
+ this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, nodeId);
}
+ /**
+ * @deprecated Error code is needed.
+ */
+ @Deprecated
public HyracksException(String message, Throwable cause) {
- super(message, cause);
+ this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, (String) null);
+ }
+
+ public HyracksException(String component, int errorCode, Serializable... params) {
+ this(component, errorCode, null, null, null, params);
+ }
+
+ public HyracksException(Throwable cause, int errorCode, Serializable... params) {
+ this(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, params);
+ }
+
+ public HyracksException(String component, int errorCode, String message, Serializable... params) {
+ this(component, errorCode, message, null, null, params);
+ }
+
+ public HyracksException(String component, int errorCode, Throwable cause, Serializable... params) {
+ this(component, errorCode, cause.getMessage(), cause, null, params);
+ }
+
+ public HyracksException(String component, int errorCode, String message, Throwable cause, Serializable... params) {
+ this(component, errorCode, message, cause, null, params);
+ }
+
+ public String getComponent() {
+ return component;
+ }
+
+ public int getErrorCode() {
+ return errorCode;
+ }
+
+ public Object[] getParams() {
+ return params;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String getMessage() {
+ if (msgCache == null) {
+ msgCache = ErrorMessageUtil.formatMessage(component, errorCode, super.getMessage(), params);
+ }
+ return msgCache;
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 84a961e..5787c72 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -28,10 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
@@ -44,6 +40,12 @@ import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
public class JobSpecification implements Serializable, IOperatorDescriptorRegistry, IConnectorDescriptorRegistry {
private static final long serialVersionUID = 1L;
@@ -76,11 +78,9 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
private IJobletEventListenerFactory jobletEventListenerFactory;
- private IGlobalJobDataFactory globalJobDataFactory;
-
private boolean useConnectorPolicyForScheduling;
- private boolean reportTaskDetails;
+ private IClusterCapacity requiredClusterCapacity;
private transient int operatorIdCounter;
@@ -106,7 +106,7 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
connectorIdCounter = 0;
maxReattempts = 2;
useConnectorPolicyForScheduling = false;
- reportTaskDetails = true;
+ requiredClusterCapacity = new ClusterCapacity();
setFrameSize(frameSize);
}
@@ -281,14 +281,6 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
this.jobletEventListenerFactory = jobletEventListenerFactory;
}
- public IGlobalJobDataFactory getGlobalJobDataFactory() {
- return globalJobDataFactory;
- }
-
- public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
- this.globalJobDataFactory = globalJobDataFactory;
- }
-
public boolean isUseConnectorPolicyForScheduling() {
return useConnectorPolicyForScheduling;
}
@@ -297,12 +289,12 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
}
- public boolean isReportTaskDetails() {
- return reportTaskDetails;
+ public void setRequiredClusterCapacity(IClusterCapacity capacity) {
+ this.requiredClusterCapacity = capacity;
}
- public void setReportTaskDetails(boolean reportTaskDetails) {
- this.reportTaskDetails = reportTaskDetails;
+ public IClusterCapacity getRequiredClusterCapacity() {
+ return requiredClusterCapacity;
}
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
index 4351e39..50db00d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
@@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.hyracks.api.job;
public enum JobStatus {
- INITIALIZED,
+ PENDING,
RUNNING,
TERMINATED,
FAILURE,
+ FAILURE_BEFORE_EXECUTION
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
new file mode 100644
index 0000000..ded4b63
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+public class ClusterCapacity implements IClusterCapacity {
+
+ private long aggregatedMemoryByteSize = 0;
+ private int aggregatedCores = 0;
+ private final Map<String, Long> nodeMemoryMap = new HashMap<>();
+ private final Map<String, Integer> nodeCoreMap = new HashMap<>();
+
+ @Override
+ public long getAggregatedMemoryByteSize() {
+ return aggregatedMemoryByteSize;
+ }
+
+ @Override
+ public int getAggregatedCores() {
+ return aggregatedCores;
+ }
+
+ @Override
+ public long getMemoryByteSize(String nodeId) throws HyracksException {
+ if (!nodeMemoryMap.containsKey(nodeId)) {
+ throw HyracksException.create(ErrorCode.NO_SUCH_NODE, nodeId);
+ }
+ return nodeMemoryMap.get(nodeId);
+ }
+
+ @Override
+ public int getCores(String nodeId) throws HyracksException {
+ if (!nodeMemoryMap.containsKey(nodeId)) {
+ throw HyracksException.create(ErrorCode.NO_SUCH_NODE, nodeId);
+ }
+ return nodeCoreMap.get(nodeId);
+ }
+
+ @Override
+ public void setAggregatedMemoryByteSize(long aggregatedMemoryByteSize) {
+ this.aggregatedMemoryByteSize = aggregatedMemoryByteSize;
+ }
+
+ @Override
+ public void setAggregatedCores(int aggregatedCores) {
+ this.aggregatedCores = aggregatedCores;
+ }
+
+ @Override
+ public void setMemoryByteSize(String nodeId, long memoryByteSize) {
+ nodeMemoryMap.put(nodeId, memoryByteSize);
+ }
+
+ @Override
+ public void setCores(String nodeId, int cores) {
+ nodeCoreMap.put(nodeId, cores);
+ }
+
+ @Override
+ public void update(String nodeId, NodeCapacity nodeCapacity) throws HyracksException {
+ // Removes the existing node resource and the aggregated resource statistics.
+ if (nodeMemoryMap.containsKey(nodeId)) {
+ aggregatedMemoryByteSize -= nodeMemoryMap.remove(nodeId);
+ }
+ if (nodeCoreMap.containsKey(nodeId)) {
+ aggregatedCores -= nodeCoreMap.remove(nodeId);
+ }
+
+ long memorySize = nodeCapacity.getMemoryByteSize();
+ int cores = nodeCapacity.getCores();
+ // Updates the node capacity map when both memory size and cores are positive.
+ if (memorySize > 0 && cores > 0) {
+ aggregatedMemoryByteSize += memorySize;
+ aggregatedCores += cores;
+ nodeMemoryMap.put(nodeId, memorySize);
+ nodeCoreMap.put(nodeId, cores);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return ObjectUtils.hashCodeMulti(aggregatedMemoryByteSize, aggregatedCores, nodeMemoryMap,
+ nodeCoreMap);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof ClusterCapacity)) {
+ return false;
+ }
+ ClusterCapacity capacity = (ClusterCapacity) o;
+ return aggregatedMemoryByteSize == capacity.aggregatedMemoryByteSize
+ && aggregatedCores == capacity.aggregatedCores
+ && ObjectUtils.equals(nodeMemoryMap, capacity.nodeMemoryMap)
+ && ObjectUtils.equals(nodeCoreMap, capacity.nodeCoreMap);
+ }
+
+ @Override
+ public String toString() {
+ return "capacity (memory: " + aggregatedMemoryByteSize + " bytes, CPU cores: " + aggregatedCores + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
new file mode 100644
index 0000000..9e38a20
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class DefaultJobCapacityController implements IJobCapacityController {
+
+ public static final DefaultJobCapacityController INSTANCE = new DefaultJobCapacityController();
+
+ private DefaultJobCapacityController() {
+ }
+
+ @Override
+ public JobSubmissionStatus allocate(JobSpecification job) {
+ return JobSubmissionStatus.EXECUTE;
+ }
+
+ @Override
+ public void release(JobSpecification job) {
+ // No operation here.
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
new file mode 100644
index 0000000..ac3261d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+/**
+ * This interface abstracts the mutable capacity for a cluster.
+ */
+public interface IClusterCapacity extends IReadOnlyClusterCapacity {
+
+ /**
+ * Sets the aggregated memory size for a cluster.
+ *
+ * @param aggregatedMemoryByteSize,
+ * the aggregated memory size.
+ */
+ void setAggregatedMemoryByteSize(long aggregatedMemoryByteSize);
+
+ /**
+ * Sets the aggregated number of CPU cores for a cluster.
+ *
+ * @param aggregatedCores,
+ * the total number of cores.
+ */
+ void setAggregatedCores(int aggregatedCores);
+
+ /**
+ * Sets the memory byte size (for computation) of a specific node.
+ *
+ * @param nodeId,
+ * the node id.
+ * @param memoryByteSize,
+ * the available memory byte size for computation of the node.
+ */
+ void setMemoryByteSize(String nodeId, long memoryByteSize);
+
+ /**
+ * Sets the number of CPU cores for a specific node.
+ *
+ * @param nodeId,
+ * the node id.
+ * @param cores,
+ * the number of CPU cores for the node.
+ */
+ void setCores(String nodeId, int cores);
+
+ /**
+ * Updates the cluster capacity information with the capacity of one particular node.
+ *
+ * @param nodeId,
+ * the id of the node for updating.
+ * @param capacity,
+ * the capacity of one particular node.
+ * @throws HyracksException
+ * when the parameters are invalid.
+ */
+ void update(String nodeId, NodeCapacity capacity) throws HyracksException;
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
new file mode 100644
index 0000000..5fa4bd9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+
+/**
+ * This interface determines the behavior of a job when it is submitted to the job manager.
+ * The job could be one of the following three cases:
+ * -- rejected immediately because its capacity requirement exceeds the cluster's capacity.
+ * -- entered into a pending job queue for deferred execution, due to the current capacity limitation because of
+ * concurrent running jobs;
+ * -- executed immediately because there is sufficient capacity.
+ */
+public interface IJobCapacityController {
+
+ enum JobSubmissionStatus {
+ EXECUTE,
+ QUEUE
+ }
+
+ /**
+ * Allocates required cluster capacity for a job.
+ *
+ * @param job,
+ * the job specification.
+ * @return EXECUTE, if the job can be executed immediately;
+ * QUEUE, if the job cannot be executed
+ * @throws HyracksException
+ * if the job's capacity requirement exceeds the maximum capacity of the cluster.
+ */
+ JobSubmissionStatus allocate(JobSpecification job) throws HyracksException;
+
+ /**
+ * Releases cluster capacity for a job when it completes.
+ *
+ * @param job,
+ * the job specification.
+ */
+ void release(JobSpecification job);
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java
new file mode 100644
index 0000000..59b6bfd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+/**
+ * This interface provides read-only methods for the capacity of a cluster.
+ */
+public interface IReadOnlyClusterCapacity extends Serializable {
+
+ /**
+ * @return the aggregated memory byte size for the cluster.
+ */
+ long getAggregatedMemoryByteSize();
+
+ /**
+ * @return the aggregated number of cores
+ */
+ int getAggregatedCores();
+
+ /**
+ * Retrieves the memory byte size for computation on a specific node.
+ * (Note that usually a portion of memory is used for storage.)
+ *
+ * @param nodeId,
+ * the node id.
+ * @return the memory byte size for computation on the node.
+ * @throws HyracksException
+ * when the input node does not exist.
+ */
+ long getMemoryByteSize(String nodeId) throws HyracksException;
+
+ /**
+ * Retrieves the number of CPU cores for computation on a specific node.
+ *
+ * @param nodeId,
+ * the node id.
+ * @return the number of CPU cores for computation on the node.
+ * @throws HyracksException,
+ * when the input node does not exist.
+ */
+ int getCores(String nodeId) throws HyracksException;
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/NodeCapacity.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/NodeCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/NodeCapacity.java
new file mode 100644
index 0000000..7902e7d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/NodeCapacity.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import java.io.Serializable;
+
+/**
+ * Specifies the capacity for computation on a particular node, i.e., a NCDriver process.
+ */
+public class NodeCapacity implements Serializable {
+
+ // All memory for computations -- this is not changed during the lifetime of a running instance.
+ private final long memoryByteSize;
+
+ // All CPU cores -- currently we assume that it doesn't change during the lifetime of a running instance.
+ // Otherwise, for each heartbeat, we will have to update global cluster capacity of a cluster.
+ private final int cores;
+
+ /**
+ * NOTE: neither parameters can be negative.
+ * However, both of them can be zero, which means the node is to be removed from the cluster.
+ *
+ * @param memorySize,
+ * the memory size of the node.
+ * @param cores,
+ * the number of cores of the node.
+ */
+ public NodeCapacity(long memorySize, int cores) {
+ this.memoryByteSize = memorySize;
+ this.cores = cores;
+ }
+
+ public long getMemoryByteSize() {
+ return memoryByteSize;
+ }
+
+ public int getCores() {
+ return cores;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 52367ee..d17c9aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -17,9 +17,18 @@
# under the License.
#
-1 = Unsupported operation %1$s in %2$s operator
-2 = Error in processing tuple %1$s in a frame
-4 = The file with absolute path %1$s is not within any of the current IO devices
-5 = Phrase search in Full-text is not supported. An expression should include only one word
+# 0 --- 9999: runtime errors
+# 10000 ---- 19999: compilation errors
-1001 = The given rule collection %1$s is not an instance of the List class.
\ No newline at end of file
+1=Unsupported operation %1$s in %2$s operator
+2=Error in processing tuple %1$s in a frame
+4=The file with absolute path %1$s is not within any of the current IO devices
+5=Phrase search in Full-text is not supported. An expression should include only one word
+6=Job queue is full with %1$s jobs
+7=Network address cannot be resolved -- %1$s
+8=Invalid internal input parameter
+9=Job requirement %1$s exceeds capacity %2$s
+10=Node %1$s does not exist
+11=Class loading issue: %1$s
+
+10000 = The given rule collection %1$s is not an instance of the List class.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/resource/ClusterCapacityTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/resource/ClusterCapacityTest.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/resource/ClusterCapacityTest.java
new file mode 100644
index 0000000..277e8e2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/resource/ClusterCapacityTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClusterCapacityTest {
+
+ @Test
+ public void test() throws HyracksException {
+ ClusterCapacity capacity = new ClusterCapacity();
+ String nodeId = "node1";
+
+ // Adds one node.
+ capacity.update(nodeId, new NodeCapacity(1024L, 8));
+ Assert.assertTrue(capacity.getAggregatedMemoryByteSize() == 1024L);
+ Assert.assertTrue(capacity.getAggregatedCores() == 8);
+
+ // Updates the node.
+ capacity.update(nodeId, new NodeCapacity(-1L, -2));
+
+ // Verifies that node is removed
+ Assert.assertTrue(capacity.getAggregatedMemoryByteSize() == 0L);
+ Assert.assertTrue(capacity.getAggregatedCores() == 0);
+
+ boolean nodeNotExist = false;
+ try {
+ capacity.getMemoryByteSize(nodeId);
+ } catch (HyracksException e) {
+ nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+ }
+ Assert.assertTrue(nodeNotExist);
+ nodeNotExist = false;
+ try {
+ capacity.getCores(nodeId);
+ } catch (HyracksException e) {
+ nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+ }
+ Assert.assertTrue(nodeNotExist);
+
+ // Adds the node again.
+ capacity.update(nodeId, new NodeCapacity(1024L, 8));
+ // Updates the node.
+ capacity.update(nodeId, new NodeCapacity(4L, 0));
+
+ // Verifies that node does not exist
+ Assert.assertTrue(capacity.getAggregatedMemoryByteSize() == 0L);
+ Assert.assertTrue(capacity.getAggregatedCores() == 0);
+ nodeNotExist = false;
+ try {
+ capacity.getMemoryByteSize(nodeId);
+ } catch (HyracksException e) {
+ nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+ }
+ Assert.assertTrue(nodeNotExist);
+ nodeNotExist = false;
+ try {
+ capacity.getCores(nodeId);
+ } catch (HyracksException e) {
+ nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+ }
+ Assert.assertTrue(nodeNotExist);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
index e949e57..105d47f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
@@ -113,5 +113,16 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>2.0.2-beta</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 26beb63..7ea5f70 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -72,13 +72,13 @@ class ClientInterfaceIPCI implements IIPCI {
case GET_JOB_STATUS:
HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
(HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
- ccs.getWorkQueue().schedule(new GetJobStatusWork(ccs, gjsf.getJobId(),
- new IPCResponder<JobStatus>(handle, mid)));
+ ccs.getWorkQueue().schedule(
+ new GetJobStatusWork(ccs.getJobManager(), gjsf.getJobId(), new IPCResponder<>(handle, mid)));
break;
case GET_JOB_INFO:
HyracksClientInterfaceFunctions.GetJobInfoFunction gjif =
(HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
- ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs, gjif.getJobId(),
+ ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs.getJobManager(), gjif.getJobId(),
new IPCResponder<JobInfo>(handle, mid)));
break;
case START_JOB:
@@ -118,8 +118,8 @@ class ClientInterfaceIPCI implements IIPCI {
new IPCResponder<>(handle, mid)));
break;
case GET_NODE_CONTROLLERS_INFO:
- ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs,
- new IPCResponder<>(handle, mid)));
+ ccs.getWorkQueue().schedule(
+ new GetNodeControllersInfoWork(ccs.getNodeManager(), new IPCResponder<>(handle, mid)));
break;
case GET_CLUSTER_TOPOLOGY:
try {
@@ -149,7 +149,8 @@ class ClientInterfaceIPCI implements IIPCI {
case GET_NODE_DETAILS_JSON:
HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gndjf =
(HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn;
- ccs.getWorkQueue().schedule(new GetNodeDetailsJSONWork(ccs, gndjf.getNodeId(),
+ ccs.getWorkQueue()
+ .schedule(new GetNodeDetailsJSONWork(ccs.getNodeManager(), ccs.getCCConfig(), gndjf.getNodeId(),
gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid)));
break;
case THREAD_DUMP:
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index b6c9a08..21fcf92 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -68,7 +68,7 @@ class ClusterControllerIPCI implements IIPCI {
break;
case UNREGISTER_NODE:
CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn;
- ccs.getWorkQueue().schedule(new UnregisterNodeWork(ccs, unf.getNodeId()));
+ ccs.getWorkQueue().schedule(new UnregisterNodeWork(ccs.getNodeManager(), unf.getNodeId()));
break;
case NODE_HEARTBEAT:
CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
@@ -87,7 +87,7 @@ class ClusterControllerIPCI implements IIPCI {
break;
case REPORT_PROFILE:
CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
- ccs.getWorkQueue().schedule(new ReportProfilesWork(ccs, rpf.getProfiles()));
+ ccs.getWorkQueue().schedule(new ReportProfilesWork(ccs.getJobManager(), rpf.getProfiles()));
break;
case NOTIFY_TASK_COMPLETE:
CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
@@ -137,7 +137,7 @@ class ClusterControllerIPCI implements IIPCI {
rsf.getDeploymentId(), rsf.getNodeId()));
break;
case GET_NODE_CONTROLLERS_INFO:
- ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs,
+ ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs.getNodeManager(),
new IResultCallback<Map<String, NodeControllerInfo>>() {
@Override
public void setValue(Map<String, NodeControllerInfo> result) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 5fdcede..1a363c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -20,12 +20,12 @@ package org.apache.hyracks.control.cc;
import java.io.File;
import java.io.FileReader;
+import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -43,17 +43,22 @@ import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.topology.ClusterTopology;
import org.apache.hyracks.api.topology.TopologyDefinitionParser;
import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.cc.cluster.NodeManager;
import org.apache.hyracks.control.cc.dataset.DatasetDirectoryService;
import org.apache.hyracks.control.cc.dataset.IDatasetDirectoryService;
-import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.cc.scheduler.IResourceManager;
+import org.apache.hyracks.control.cc.scheduler.ResourceManager;
import org.apache.hyracks.control.cc.web.WebServer;
-import org.apache.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
import org.apache.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
+import org.apache.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
import org.apache.hyracks.control.cc.work.GetThreadDumpWork.ThreadDumpRun;
import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork;
import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork;
@@ -83,10 +88,6 @@ public class ClusterControllerService implements IControllerService {
private final LogFile jobLog;
- private final Map<String, NodeControllerState> nodeRegistry;
-
- private final Map<InetAddress, Set<String>> ipAddressNodeNameMap;
-
private final ServerContext serverCtx;
private final WebServer webServer;
@@ -95,12 +96,6 @@ public class ClusterControllerService implements IControllerService {
private CCApplicationContext appCtx;
- private final Map<JobId, JobRun> activeRunMap;
-
- private final Map<JobId, JobRun> runMapArchive;
-
- private final Map<JobId, List<Exception>> runMapHistory;
-
private final WorkQueue workQueue;
private ExecutorService executor;
@@ -119,6 +114,12 @@ public class ClusterControllerService implements IControllerService {
private final Map<String, ThreadDumpRun> threadDumpRunMap;
+ private final INodeManager nodeManager;
+
+ private final IResourceManager resourceManager = new ResourceManager();
+
+ private IJobManager jobManager;
+
private ShutdownRun shutdownCallback;
private ICCApplicationEntryPoint aep;
@@ -127,8 +128,6 @@ public class ClusterControllerService implements IControllerService {
this.ccConfig = ccConfig;
File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
jobLog = new LogFile(jobLogFolder);
- nodeRegistry = new LinkedHashMap<>();
- ipAddressNodeNameMap = new HashMap<>();
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
IIPCI ccIPCI = new ClusterControllerIPCI(this);
clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
@@ -137,25 +136,7 @@ public class ClusterControllerService implements IControllerService {
clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
new JavaSerializationBasedPayloadSerializerDeserializer());
webServer = new WebServer(this);
- activeRunMap = new HashMap<>();
- runMapArchive = new LinkedHashMap<JobId, JobRun>() {
- private static final long serialVersionUID = 1L;
- @Override
- protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> eldest) {
- return size() > ccConfig.jobHistorySize;
- }
- };
- runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
- private static final long serialVersionUID = 1L;
- /** history size + 1 is for the case when history size = 0 */
- private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
-
- @Override
- protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
- return size() > allowedSize;
- }
- };
// WorkQueue is in charge of heartbeat as well as other events.
workQueue = new WorkQueue("ClusterController", Thread.MAX_PRIORITY);
this.timer = new Timer(true);
@@ -167,6 +148,9 @@ public class ClusterControllerService implements IControllerService {
deploymentRunMap = new HashMap<>();
stateDumpRunMap = new HashMap<>();
threadDumpRunMap = Collections.synchronizedMap(new HashMap<>());
+
+ // Node manager is in charge of cluster membership management.
+ nodeManager = new NodeManager(ccConfig, resourceManager);
}
private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
@@ -207,13 +191,22 @@ public class ClusterControllerService implements IControllerService {
appCtx.addJobLifecycleListener(datasetDirectoryService);
executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
String className = ccConfig.appCCMainClass;
+
+ IJobCapacityController jobCapacityController = DefaultJobCapacityController.INSTANCE;
if (className != null) {
Class<?> c = Class.forName(className);
aep = (ICCApplicationEntryPoint) c.newInstance();
String[] args = ccConfig.appArgs == null ? null
: ccConfig.appArgs.toArray(new String[ccConfig.appArgs.size()]);
aep.start(appCtx, args);
+ jobCapacityController = aep.getJobCapacityController();
}
+
+ // Job manager is in charge of job lifecycle management.
+ Constructor<?> jobManagerConstructor = this.getClass().getClassLoader().loadClass(ccConfig.jobManagerClassName)
+ .getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
+ jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
+
}
private void connectNCs() throws Exception {
@@ -301,20 +294,16 @@ public class ClusterControllerService implements IControllerService {
return ccContext;
}
- public Map<JobId, JobRun> getActiveRunMap() {
- return activeRunMap;
- }
-
- public Map<JobId, JobRun> getRunMapArchive() {
- return runMapArchive;
+ public IJobManager getJobManager() {
+ return jobManager;
}
- public Map<JobId, List<Exception>> getRunHistory() {
- return runMapHistory;
+ public INodeManager getNodeManager() {
+ return nodeManager;
}
- public Map<InetAddress, Set<String>> getIpAddressNodeNameMap() {
- return ipAddressNodeNameMap;
+ public IResourceManager getResourceManager() {
+ return resourceManager;
}
public LogFile getJobLogFile() {
@@ -329,10 +318,6 @@ public class ClusterControllerService implements IControllerService {
return executor;
}
- public Map<String, NodeControllerState> getNodeMap() {
- return nodeRegistry;
- }
-
public CCConfig getConfig() {
return ccConfig;
}
@@ -366,7 +351,8 @@ public class ClusterControllerService implements IControllerService {
@Override
public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws HyracksDataException {
- GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork(ClusterControllerService.this, map);
+ GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork(
+ ClusterControllerService.this.getNodeManager(), map);
try {
workQueue.scheduleAndSync(ginmw);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index bf94dff..955b7f2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -25,10 +25,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
import org.apache.hyracks.control.common.base.INodeController;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
@@ -36,6 +35,9 @@ import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
public class NodeControllerState {
private static final int RRD_SIZE = 720;
@@ -141,7 +143,7 @@ public class NodeControllerState {
private int lastHeartbeatDuration;
- private int numCores;
+ private NodeCapacity capacity;
public NodeControllerState(INodeController nodeController, NodeRegistration reg) {
this.nodeController = nodeController;
@@ -204,7 +206,7 @@ public class NodeControllerState {
diskWrites = new long[RRD_SIZE];
rrdPtr = 0;
- numCores = 0;
+ capacity = reg.getCapacity();
}
public synchronized void notifyHeartbeat(HeartbeatData hbData) {
@@ -242,7 +244,6 @@ public class NodeControllerState {
diskReads[rrdPtr] = hbData.diskReads;
diskWrites[rrdPtr] = hbData.diskWrites;
rrdPtr = (rrdPtr + 1) % RRD_SIZE;
- numCores = hbData.numCores;
}
}
@@ -250,10 +251,6 @@ public class NodeControllerState {
return lastHeartbeatDuration++;
}
- public int getLastHeartbeatDuration() {
- return lastHeartbeatDuration;
- }
-
public INodeController getNodeController() {
return nodeController;
}
@@ -277,8 +274,9 @@ public class NodeControllerState {
public NetworkAddress getMessagingPort() {
return messagingPort;
}
- public int getNumCores() {
- return numCores;
+
+ public NodeCapacity getCapacity() {
+ return capacity;
}
public synchronized ObjectNode toSummaryJSON() {
@@ -324,6 +322,8 @@ public class NodeControllerState {
o.putPOJO("nonheap-used-sizes", nonheapUsedSize);
o.putPOJO("nonheap-committed-sizes", nonheapCommittedSize);
o.putPOJO("nonheap-max-sizes", nonheapMaxSize);
+ o.putPOJO("application-memory-budget", capacity.getMemoryByteSize());
+ o.putPOJO("application-cpu-core-budget", capacity.getCores());
o.putPOJO("thread-counts", threadCount);
o.putPOJO("peak-thread-counts", peakThreadCount);
o.putPOJO("system-load-averages", systemLoadAverage);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/IndexPage.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/IndexPage.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/IndexPage.java
index 680c2a7..443fa88 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/IndexPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/IndexPage.java
@@ -42,7 +42,7 @@ public class IndexPage extends AbstractPage {
public IndexPage() throws Exception {
ClusterControllerService ccs = getAdminConsoleApplication().getClusterControllerService();
- GetNodeSummariesJSONWork gnse = new GetNodeSummariesJSONWork(ccs);
+ GetNodeSummariesJSONWork gnse = new GetNodeSummariesJSONWork(ccs.getNodeManager());
ccs.getWorkQueue().scheduleAndSync(gnse);
ArrayNode nodeSummaries = gnse.getSummaries();
add(new Label("node-count", String.valueOf(nodeSummaries.size())));
@@ -63,7 +63,7 @@ public class IndexPage extends AbstractPage {
};
add(nodeList);
- GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs);
+ GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs.getJobManager());
ccs.getWorkQueue().scheduleAndSync(gjse);
ArrayNode jobSummaries = gjse.getSummaries();
ListView<JsonNode> jobList = new ListView<JsonNode>("jobs-list", Lists.newArrayList(jobSummaries.iterator())) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
index abc07d9..68add85 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
@@ -79,7 +79,7 @@ public class JobDetailsPage extends AbstractPage {
}
}
- GetJobRunJSONWork gjrw = new GetJobRunJSONWork(ccs, jobId);
+ GetJobRunJSONWork gjrw = new GetJobRunJSONWork(ccs.getJobManager(), jobId);
ccs.getWorkQueue().scheduleAndSync(gjrw);
Label jobrun = new Label("job-run", gjrw.getJSON().toString());
jobrun.setEscapeModelStrings(false);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index dd6f83b..e43a59d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.control.cc.application;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -110,7 +111,7 @@ public class CCApplicationContext extends ApplicationContext implements ICCAppli
}
}
- public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException {
+ public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException {
for (IClusterLifecycleListener l : clusterLifecycleListeners) {
l.notifyNodeFailure(deadNodeIds);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/INodeManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/INodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/INodeManager.java
new file mode 100644
index 0000000..40d81f8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/INodeManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.cluster;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.NodeControllerState;
+
+/**
+ * This interface provides abstractions for a node manager, which manages the node membership in a cluster.
+ */
+public interface INodeManager {
+
+ /**
+ * A functional interface for applying a function for each node.
+ */
+ @FunctionalInterface
+ interface NodeFunction {
+ void apply(String nodeId, NodeControllerState ncState);
+ }
+
+ /**
+ * Applies a function for each node in the cluster.
+ *
+ * @param nodeFunction,
+ * a function implementation that follows the <code>NodeFunction</code> interface.
+ */
+ void apply(NodeFunction nodeFunction);
+
+ /**
+ * @return all node ids.
+ */
+ Collection<String> getAllNodeIds();
+
+ /**
+ * @return all node controller states.
+ */
+ Collection<NodeControllerState> getAllNodeControllerStates();
+
+ /**
+ * @return the map that maps a IP addresses to a set of node names.
+ */
+ Map<InetAddress, Set<String>> getIpAddressNodeNameMap();
+
+ /**
+ * @return the map that maps a node id to its corresponding node controller info.
+ */
+ Map<String, NodeControllerInfo> getNodeControllerInfoMap();
+
+ /**
+ * Removes all nodes that are considered "dead", i.e., which run out of heartbeats.
+ *
+ * @return all dead nodes and their impacted jobs.
+ * @throws HyracksException
+ * when any IP address given in the dead nodes is not valid
+ */
+ Pair<Collection<String>, Collection<JobId>> removeDeadNodes() throws HyracksException;
+
+ /**
+ * Retrieves the node controller state from a given node id.
+ *
+ * @param nodeId,
+ * a given node id.
+ * @return the corresponding node controller state.
+ */
+ NodeControllerState getNodeControllerState(String nodeId);
+
+ /**
+ * Adds one node into the cluster.
+ *
+ * @param nodeId,
+ * the node id.
+ * @param ncState,
+ * the node controller state.
+ * @throws HyracksException
+ * when the node has already been added or the IP address given in the node state is not valid.
+ */
+ void addNode(String nodeId, NodeControllerState ncState) throws HyracksException;
+
+ /**
+ * Removes one node from the cluster.
+ *
+ * @param nodeId,
+ * the node id.
+ * @throws HyracksException
+ * when the IP address given in the node state is not valid
+ */
+ void removeNode(String nodeId) throws HyracksException;
+
+}