You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ti...@apache.org on 2017/01/25 14:46:37 UTC
[4/7] asterixdb git commit: Implements concurrent query management
support.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
new file mode 100644
index 0000000..52ad301
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.job;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.cc.scheduler.IJobQueue;
+import org.apache.hyracks.control.cc.work.JobCleanupWork;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.work.IResultCallback;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+// Job manager manages all jobs that haven been submitted to the cluster.
+public class JobManager implements IJobManager {
+
+ private static final Logger LOGGER = Logger.getLogger(JobManager.class.getName());
+
+ private final ClusterControllerService ccs;
+ private final IJobQueue jobQueue;
+ private final Map<JobId, JobRun> activeRunMap;
+ private final Map<JobId, JobRun> runMapArchive;
+ private final Map<JobId, List<Exception>> runMapHistory;
+ private final IJobCapacityController jobCapacityController;
+
+ public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController)
+ throws HyracksException {
+ this.ccs = ccs;
+ this.jobCapacityController = jobCapacityController;
+ try {
+ Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.jobQueueClassName)
+ .getConstructor(IJobManager.class, IJobCapacityController.class);
+ jobQueue = (IJobQueue) jobQueueConstructor.newInstance(this, this.jobCapacityController);
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
+ | InvocationTargetException e) {
+ throw HyracksException.create(ErrorCode.CLASS_LOADING_ISSUE, e, e.getMessage());
+ }
+ activeRunMap = new HashMap<>();
+ runMapArchive = new LinkedHashMap<JobId, JobRun>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> eldest) {
+ return size() > ccConfig.jobHistorySize;
+ }
+ };
+ runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
+ private static final long serialVersionUID = 1L;
+ /** history size + 1 is for the case when history size = 0 */
+ private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
+ return size() > allowedSize;
+ }
+ };
+ }
+
+ @Override
+ public void add(JobRun jobRun) throws HyracksException {
+ checkJob(jobRun);
+ JobSpecification job = jobRun.getActivityClusterGraphFactory().getJobSpecification();
+ IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
+ switch (status) {
+ case QUEUE:
+ jobRun.setStatus(JobStatus.PENDING, null);
+ jobQueue.add(jobRun);
+ break;
+ case EXECUTE:
+ executeJob(jobRun);
+ break;
+ }
+ }
+
+ @Override
+ public void prepareComplete(JobRun run, JobStatus status, List<Exception> exceptions) throws HyracksException {
+ checkJob(run);
+ if (status == JobStatus.FAILURE_BEFORE_EXECUTION) {
+ run.setPendingStatus(JobStatus.FAILURE, exceptions);
+ finalComplete(run);
+ return;
+ }
+ JobId jobId = run.getJobId();
+ HyracksException caughtException = null;
+ if (run.getPendingStatus() != null && run.getCleanupPendingNodeIds().isEmpty()) {
+ finalComplete(run);
+ return;
+ }
+ if (run.getPendingStatus() != null) {
+ LOGGER.warning("Ignoring duplicate cleanup for JobRun with id: " + jobId);
+ return;
+ }
+ Set<String> targetNodes = run.getParticipatingNodeIds();
+ run.getCleanupPendingNodeIds().addAll(targetNodes);
+ if (run.getPendingStatus() != JobStatus.FAILURE && run.getPendingStatus() != JobStatus.TERMINATED) {
+ run.setPendingStatus(status, exceptions);
+ }
+
+ if (targetNodes != null && !targetNodes.isEmpty()) {
+ INodeManager nodeManager = ccs.getNodeManager();
+ Set<String> toDelete = new HashSet<>();
+ for (String n : targetNodes) {
+ NodeControllerState ncs = nodeManager.getNodeControllerState(n);
+ try {
+ if (ncs == null) {
+ toDelete.add(n);
+ } else {
+ ncs.getNodeController().cleanUpJoblet(jobId, status);
+ }
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, e.getMessage(), e);
+ if (caughtException == null) {
+ caughtException = new HyracksException(e);
+ } else {
+ caughtException.addSuppressed(e);
+ }
+ }
+ }
+ targetNodes.removeAll(toDelete);
+ run.getCleanupPendingNodeIds().removeAll(toDelete);
+ if (run.getCleanupPendingNodeIds().isEmpty()) {
+ finalComplete(run);
+ }
+ } else {
+ finalComplete(run);
+ }
+
+ // throws caught exceptions if any
+ if (caughtException != null) {
+ throw caughtException;
+ }
+ }
+
+ @Override
+ public void finalComplete(JobRun run) throws HyracksException {
+ checkJob(run);
+ JobId jobId = run.getJobId();
+ HyracksException caughtException = null;
+ CCApplicationContext appCtx = ccs.getApplicationContext();
+ if (appCtx != null) {
+ try {
+ appCtx.notifyJobFinish(jobId);
+ } catch (HyracksException e) {
+ LOGGER.log(Level.SEVERE, e.getMessage(), e);
+ caughtException = e;
+ }
+ }
+ run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
+ run.setEndTime(System.currentTimeMillis());
+ activeRunMap.remove(jobId);
+ runMapArchive.put(jobId, run);
+ runMapHistory.put(jobId, run.getExceptions());
+
+ if (run.getActivityClusterGraph().isReportTaskDetails()) {
+ /**
+ * log job details when profiling is enabled
+ */
+ try {
+ ccs.getJobLogFile().log(createJobLogObject(run));
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, e.getMessage(), e);
+ if (caughtException == null) {
+ caughtException = new HyracksException(e);
+ } else {
+ caughtException.addSuppressed(e);
+ }
+ }
+ }
+
+ // Releases cluster capacitys occupied by the job.
+ JobSpecification job = run.getActivityClusterGraphFactory().getJobSpecification();
+ jobCapacityController.release(job);
+
+ // Picks the next job to execute.
+ pickJobsToRun();
+
+ // throws caught exceptions if any
+ if (caughtException != null) {
+ throw caughtException;
+ }
+ }
+
+
+
+ @Override
+ public Collection<JobRun> getRunningJobs() {
+ return activeRunMap.values();
+ }
+
+ @Override
+ public Collection<JobRun> getPendingJobs() {
+ return jobQueue.jobs();
+ }
+
+ @Override
+ public Collection<JobRun> getArchivedJobs() {
+ return runMapArchive.values();
+ }
+
+ @Override
+ public JobRun get(JobId jobId) {
+ JobRun jobRun = activeRunMap.get(jobId);
+ if (jobRun == null) {
+ jobRun = runMapArchive.get(jobId);
+ }
+ return jobRun;
+ }
+
+ @Override
+ public List<Exception> getRunHistory(JobId jobId) {
+ return runMapHistory.get(jobId);
+ }
+
+ private void pickJobsToRun() {
+ List<JobRun> selectedRuns = jobQueue.pull();
+ for (JobRun run : selectedRuns) {
+ executeJob(run);
+ }
+ }
+
+ // Executes a job when the required capacity for the job is met.
+ private void executeJob(JobRun run) {
+ IResultCallback<JobId> callback = run.getCallback();
+ try {
+ run.setStartTime(System.currentTimeMillis());
+ JobId jobId = run.getJobId();
+ activeRunMap.put(jobId, run);
+
+ CCApplicationContext appCtx = ccs.getApplicationContext();
+ IActivityClusterGraphGeneratorFactory acggf = run.getActivityClusterGraphFactory();
+ appCtx.notifyJobCreation(jobId, acggf);
+ run.setStatus(JobStatus.RUNNING, null);
+ executeJobInternal(run);
+ callback.setValue(jobId);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+
+ private void executeJobInternal(JobRun run) {
+ try {
+ run.getExecutor().startJob();
+ } catch (Exception e) {
+ ccs.getWorkQueue()
+ .schedule(new JobCleanupWork(ccs.getJobManager(), run.getJobId(), JobStatus.FAILURE,
+ Collections.singletonList(e)));
+ }
+ }
+
+ private ObjectNode createJobLogObject(final JobRun run) {
+ ObjectMapper om = new ObjectMapper();
+ ObjectNode jobLogObject = om.createObjectNode();
+ ActivityClusterGraph acg = run.getActivityClusterGraph();
+ jobLogObject.set("activity-cluster-graph", acg.toJSON());
+ jobLogObject.set("job-run", run.toJSON());
+ return jobLogObject;
+ }
+
+ private void checkJob(JobRun jobRun) throws HyracksException {
+ if (jobRun == null) {
+ throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index f1d04bb..5682194 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -20,17 +20,12 @@ package org.apache.hyracks.control.cc.job;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
@@ -42,29 +37,37 @@ import org.apache.hyracks.api.job.ActivityCluster;
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.ActivityClusterId;
import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.executor.ActivityPartitionDetails;
+import org.apache.hyracks.control.cc.executor.JobExecutor;
import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
-import org.apache.hyracks.control.cc.scheduler.ActivityPartitionDetails;
-import org.apache.hyracks.control.cc.scheduler.JobScheduler;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.utils.ExceptionUtils;
+import org.apache.hyracks.control.common.work.IResultCallback;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
public class JobRun implements IJobStatusConditionVariable {
private final DeploymentId deploymentId;
private final JobId jobId;
+ private final IActivityClusterGraphGeneratorFactory acggf;
+
private final IActivityClusterGraphGenerator acgg;
private final ActivityClusterGraph acg;
- private final JobScheduler scheduler;
+ private final JobExecutor scheduler;
- private final EnumSet<JobFlag> jobFlags;
+ private final Set<JobFlag> jobFlags;
private final Map<ActivityClusterId, ActivityClusterPlan> activityClusterPlanMap;
@@ -94,21 +97,26 @@ public class JobRun implements IJobStatusConditionVariable {
private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
+ private final IResultCallback<JobId> callback;
+
public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
- IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
+ IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags,
+ IResultCallback<JobId> callback) {
this.deploymentId = deploymentId;
this.jobId = jobId;
+ this.acggf = acggf;
this.acgg = acgg;
this.acg = acgg.initialize();
- this.scheduler = new JobScheduler(ccs, this, acgg.getConstraints());
+ this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints());
this.jobFlags = jobFlags;
- activityClusterPlanMap = new HashMap<ActivityClusterId, ActivityClusterPlan>();
+ this.callback = callback;
+ activityClusterPlanMap = new HashMap<>();
pmm = new PartitionMatchMaker();
- participatingNodeIds = new HashSet<String>();
- cleanupPendingNodeIds = new HashSet<String>();
+ participatingNodeIds = new HashSet<>();
+ cleanupPendingNodeIds = new HashSet<>();
profile = new JobProfile(jobId);
- connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
- operatorLocations = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
+ connectorPolicyMap = new HashMap<>();
+ operatorLocations = new HashMap<>();
createTime = System.currentTimeMillis();
}
@@ -120,11 +128,15 @@ public class JobRun implements IJobStatusConditionVariable {
return jobId;
}
+ public IActivityClusterGraphGeneratorFactory getActivityClusterGraphFactory() {
+ return acggf;
+ }
+
public ActivityClusterGraph getActivityClusterGraph() {
return acg;
}
- public EnumSet<JobFlag> getFlags() {
+ public Set<JobFlag> getFlags() {
return jobFlags;
}
@@ -167,8 +179,8 @@ public class JobRun implements IJobStatusConditionVariable {
return createTime;
}
- public void setCreateTime(long createTime) {
- this.createTime = createTime;
+ public IResultCallback<JobId> getCallback() {
+ return callback;
}
public long getStartTime() {
@@ -228,7 +240,7 @@ public class JobRun implements IJobStatusConditionVariable {
return profile;
}
- public JobScheduler getScheduler() {
+ public JobExecutor getExecutor() {
return scheduler;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
index cca56fc..3a5e3be 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.control.cc.partitions;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -27,7 +28,6 @@ import java.util.Set;
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
-
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.common.job.PartitionDescriptor;
@@ -156,7 +156,7 @@ public class PartitionMatchMaker {
}
}
- public void notifyNodeFailures(final Set<String> deadNodes) {
+ public void notifyNodeFailures(final Collection<String> deadNodes) {
removeEntries(partitionDescriptors, new IEntryFilter<PartitionDescriptor>() {
@Override
public boolean matches(PartitionDescriptor o) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
index 807798e..65851ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
@@ -16,14 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.hyracks.control.cc.partitions;
import org.apache.commons.lang3.tuple.Pair;
-
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.common.base.INodeController;
import org.apache.hyracks.control.common.job.PartitionDescriptor;
import org.apache.hyracks.control.common.job.PartitionRequest;
@@ -34,8 +35,9 @@ public class PartitionUtils {
PartitionDescriptor desc = match.getLeft();
PartitionRequest req = match.getRight();
- NodeControllerState producerNCS = ccs.getNodeMap().get(desc.getNodeId());
- NodeControllerState requestorNCS = ccs.getNodeMap().get(req.getNodeId());
+ INodeManager nodeManager = ccs.getNodeManager();
+ NodeControllerState producerNCS = nodeManager.getNodeControllerState(desc.getNodeId());
+ NodeControllerState requestorNCS = nodeManager.getNodeControllerState(req.getNodeId());
final NetworkAddress dataport = producerNCS.getDataPort();
final INodeController requestorNC = requestorNCS.getNodeController();
requestorNC.reportPartitionAvailability(pid, dataport);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
deleted file mode 100644
index c13a458..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.scheduler;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
-import org.apache.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.ActivityCluster;
-import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.partitions.PartitionId;
-import org.apache.hyracks.control.cc.job.ActivityClusterPlan;
-import org.apache.hyracks.control.cc.job.ActivityPlan;
-import org.apache.hyracks.control.cc.job.JobRun;
-import org.apache.hyracks.control.cc.job.Task;
-import org.apache.hyracks.control.cc.job.TaskCluster;
-import org.apache.hyracks.control.cc.job.TaskClusterId;
-
-public class ActivityClusterPlanner {
- private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
-
- private final JobScheduler scheduler;
-
- private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
-
- public ActivityClusterPlanner(JobScheduler newJobScheduler) {
- this.scheduler = newJobScheduler;
- partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
- }
-
- public ActivityClusterPlan planActivityCluster(ActivityCluster ac) throws HyracksException {
- JobRun jobRun = scheduler.getJobRun();
- Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
-
- Map<ActivityId, ActivityPlan> activityPlanMap = buildActivityPlanMap(ac, jobRun, pcMap);
-
- assignConnectorPolicy(ac, activityPlanMap);
-
- TaskCluster[] taskClusters = computeTaskClusters(ac, jobRun, activityPlanMap);
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Plan for " + ac);
- LOGGER.info("Built " + taskClusters.length + " Task Clusters");
- for (TaskCluster tc : taskClusters) {
- LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
- }
- }
-
- return new ActivityClusterPlan(taskClusters, activityPlanMap);
- }
-
- private Map<ActivityId, ActivityPlan> buildActivityPlanMap(ActivityCluster ac, JobRun jobRun,
- Map<ActivityId, ActivityPartitionDetails> pcMap) {
- Map<ActivityId, ActivityPlan> activityPlanMap = new HashMap<ActivityId, ActivityPlan>();
- Set<ActivityId> depAnIds = new HashSet<ActivityId>();
- for (ActivityId anId : ac.getActivityMap().keySet()) {
- depAnIds.clear();
- getDependencyActivityIds(depAnIds, anId, ac);
- ActivityPartitionDetails apd = pcMap.get(anId);
- Task[] tasks = new Task[apd.getPartitionCount()];
- ActivityPlan activityPlan = new ActivityPlan(apd);
- for (int i = 0; i < tasks.length; ++i) {
- TaskId tid = new TaskId(anId, i);
- tasks[i] = new Task(tid, activityPlan);
- for (ActivityId danId : depAnIds) {
- ActivityCluster dAC = ac.getActivityClusterGraph().getActivityMap().get(danId);
- ActivityClusterPlan dACP = jobRun.getActivityClusterPlanMap().get(dAC.getId());
- assert dACP != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
- + danId;
- Task[] dATasks = dACP.getActivityPlanMap().get(danId).getTasks();
- assert dATasks != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
- + danId;
- assert dATasks.length == tasks.length : "Dependency activity partitioned differently from dependent: "
- + dATasks.length + " != " + tasks.length;
- Task dTask = dATasks[i];
- TaskId dTaskId = dTask.getTaskId();
- tasks[i].getDependencies().add(dTaskId);
- dTask.getDependents().add(tid);
- }
- }
- activityPlan.setTasks(tasks);
- activityPlanMap.put(anId, activityPlan);
- }
- return activityPlanMap;
- }
-
- private TaskCluster[] computeTaskClusters(ActivityCluster ac, JobRun jobRun,
- Map<ActivityId, ActivityPlan> activityPlanMap) {
- Set<ActivityId> activities = ac.getActivityMap().keySet();
- Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = computeTaskConnectivity(jobRun,
- activityPlanMap, activities);
-
- TaskCluster[] taskClusters = ac.getActivityClusterGraph().isUseConnectorPolicyForScheduling() ? buildConnectorPolicyAwareTaskClusters(
- ac, activityPlanMap, taskConnectivity) : buildConnectorPolicyUnawareTaskClusters(ac, activityPlanMap);
-
- for (TaskCluster tc : taskClusters) {
- Set<TaskCluster> tcDependencyTaskClusters = tc.getDependencyTaskClusters();
- for (Task ts : tc.getTasks()) {
- TaskId tid = ts.getTaskId();
- List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
- if (cInfoList != null) {
- for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
- Task targetTS = activityPlanMap.get(p.getLeft().getActivityId()).getTasks()[p.getLeft()
- .getPartition()];
- TaskCluster targetTC = targetTS.getTaskCluster();
- if (targetTC != tc) {
- ConnectorDescriptorId cdId = p.getRight();
- PartitionId pid = new PartitionId(jobRun.getJobId(), cdId, tid.getPartition(), p.getLeft()
- .getPartition());
- tc.getProducedPartitions().add(pid);
- targetTC.getRequiredPartitions().add(pid);
- partitionProducingTaskClusterMap.put(pid, tc);
- }
- }
- }
-
- for (TaskId dTid : ts.getDependencies()) {
- TaskCluster dTC = getTaskCluster(dTid);
- dTC.getDependentTaskClusters().add(tc);
- tcDependencyTaskClusters.add(dTC);
- }
- }
- }
- return taskClusters;
- }
-
- private TaskCluster[] buildConnectorPolicyUnawareTaskClusters(ActivityCluster ac,
- Map<ActivityId, ActivityPlan> activityPlanMap) {
- List<Task> taskStates = new ArrayList<Task>();
- for (ActivityId anId : ac.getActivityMap().keySet()) {
- ActivityPlan ap = activityPlanMap.get(anId);
- Task[] tasks = ap.getTasks();
- for (Task t : tasks) {
- taskStates.add(t);
- }
- }
- TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getId(), 0), ac, taskStates.toArray(new Task[taskStates
- .size()]));
- for (Task t : tc.getTasks()) {
- t.setTaskCluster(tc);
- }
- return new TaskCluster[] { tc };
- }
-
- private Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> computeTaskConnectivity(JobRun jobRun,
- Map<ActivityId, ActivityPlan> activityPlanMap, Set<ActivityId> activities) {
- Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
- ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
- BitSet targetBitmap = new BitSet();
- for (ActivityId ac1 : activities) {
- ActivityCluster ac = acg.getActivityMap().get(ac1);
- Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
- int nProducers = ac1TaskStates.length;
- List<IConnectorDescriptor> outputConns = ac.getActivityOutputMap().get(ac1);
- if (outputConns != null) {
- for (IConnectorDescriptor c : outputConns) {
- ConnectorDescriptorId cdId = c.getConnectorId();
- ActivityId ac2 = ac.getConsumerActivity(cdId);
- Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
- int nConsumers = ac2TaskStates.length;
- if (c.allProducersToAllConsumers()) {
- List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
- for (int j = 0; j < nConsumers; j++) {
- TaskId targetTID = ac2TaskStates[j].getTaskId();
- cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
- }
- for (int i = 0; i < nProducers; ++i) {
- taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
- }
- } else {
- for (int i = 0; i < nProducers; ++i) {
- c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
- List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
- .getTaskId());
- if (cInfoList == null) {
- cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
- taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
- }
- for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
- TaskId targetTID = ac2TaskStates[j].getTaskId();
- cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
- }
- }
- }
- }
- }
- }
- return taskConnectivity;
- }
-
- private TaskCluster[] buildConnectorPolicyAwareTaskClusters(ActivityCluster ac,
- Map<ActivityId, ActivityPlan> activityPlanMap,
- Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity) {
- Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
- for (ActivityId anId : ac.getActivityMap().keySet()) {
- ActivityPlan ap = activityPlanMap.get(anId);
- Task[] tasks = ap.getTasks();
- for (Task t : tasks) {
- Set<TaskId> cluster = new HashSet<TaskId>();
- TaskId tid = t.getTaskId();
- cluster.add(tid);
- taskClusterMap.put(tid, cluster);
- }
- }
-
- JobRun jobRun = scheduler.getJobRun();
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
- for (Map.Entry<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> e : taskConnectivity.entrySet()) {
- Set<TaskId> cluster = taskClusterMap.get(e.getKey());
- for (Pair<TaskId, ConnectorDescriptorId> p : e.getValue()) {
- IConnectorPolicy cPolicy = connectorPolicies.get(p.getRight());
- if (cPolicy.requiresProducerConsumerCoscheduling()) {
- cluster.add(p.getLeft());
- }
- }
- }
-
- /*
- * taskClusterMap contains for every TID x, x -> { coscheduled consumer TIDs U x }
- * We compute the transitive closure of this relation to find the largest set of
- * tasks that need to be co-scheduled
- */
- int counter = 0;
- TaskId[] ordinalList = new TaskId[taskClusterMap.size()];
- Map<TaskId, Integer> ordinalMap = new HashMap<TaskId, Integer>();
- for (TaskId tid : taskClusterMap.keySet()) {
- ordinalList[counter] = tid;
- ordinalMap.put(tid, counter);
- ++counter;
- }
-
- int n = ordinalList.length;
- BitSet[] paths = new BitSet[n];
- for (Map.Entry<TaskId, Set<TaskId>> e : taskClusterMap.entrySet()) {
- int i = ordinalMap.get(e.getKey());
- BitSet bsi = paths[i];
- if (bsi == null) {
- bsi = new BitSet(n);
- paths[i] = bsi;
- }
- for (TaskId ttid : e.getValue()) {
- int j = ordinalMap.get(ttid);
- paths[i].set(j);
- BitSet bsj = paths[j];
- if (bsj == null) {
- bsj = new BitSet(n);
- paths[j] = bsj;
- }
- bsj.set(i);
- }
- }
- for (int k = 0; k < n; ++k) {
- for (int i = paths[k].nextSetBit(0); i >= 0; i = paths[k].nextSetBit(i + 1)) {
- for (int j = paths[i].nextClearBit(0); j < n && j >= 0; j = paths[i].nextClearBit(j + 1)) {
- paths[i].set(j, paths[k].get(j));
- paths[j].set(i, paths[i].get(j));
- }
- }
- }
- BitSet pending = new BitSet(n);
- pending.set(0, n);
- List<List<TaskId>> clusters = new ArrayList<List<TaskId>>();
- for (int i = pending.nextSetBit(0); i >= 0; i = pending.nextSetBit(i)) {
- List<TaskId> cluster = new ArrayList<TaskId>();
- for (int j = paths[i].nextSetBit(0); j >= 0; j = paths[i].nextSetBit(j + 1)) {
- cluster.add(ordinalList[j]);
- pending.clear(j);
- }
- clusters.add(cluster);
- }
-
- List<TaskCluster> tcSet = new ArrayList<TaskCluster>();
- counter = 0;
- for (List<TaskId> cluster : clusters) {
- List<Task> taskStates = new ArrayList<Task>();
- for (TaskId tid : cluster) {
- taskStates.add(activityPlanMap.get(tid.getActivityId()).getTasks()[tid.getPartition()]);
- }
- TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getId(), counter++), ac,
- taskStates.toArray(new Task[taskStates.size()]));
- tcSet.add(tc);
- for (TaskId tid : cluster) {
- activityPlanMap.get(tid.getActivityId()).getTasks()[tid.getPartition()].setTaskCluster(tc);
- }
- }
- TaskCluster[] taskClusters = tcSet.toArray(new TaskCluster[tcSet.size()]);
- return taskClusters;
- }
-
- private TaskCluster getTaskCluster(TaskId tid) {
- JobRun run = scheduler.getJobRun();
- ActivityCluster ac = run.getActivityClusterGraph().getActivityMap().get(tid.getActivityId());
- ActivityClusterPlan acp = run.getActivityClusterPlanMap().get(ac.getId());
- Task[] tasks = acp.getActivityPlanMap().get(tid.getActivityId()).getTasks();
- Task task = tasks[tid.getPartition()];
- assert task.getTaskId().equals(tid);
- return task.getTaskCluster();
- }
-
- private void getDependencyActivityIds(Set<ActivityId> depAnIds, ActivityId anId, ActivityCluster ac) {
- Set<ActivityId> blockers = ac.getBlocked2BlockerMap().get(anId);
- if (blockers != null) {
- depAnIds.addAll(blockers);
- }
- }
-
- private void assignConnectorPolicy(ActivityCluster ac, Map<ActivityId, ActivityPlan> taskMap) {
- Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
- Set<ActivityId> activities = ac.getActivityMap().keySet();
- BitSet targetBitmap = new BitSet();
- for (ActivityId a1 : activities) {
- Task[] ac1TaskStates = taskMap.get(a1).getTasks();
- int nProducers = ac1TaskStates.length;
- List<IConnectorDescriptor> outputConns = ac.getActivityOutputMap().get(a1);
- if (outputConns != null) {
- for (IConnectorDescriptor c : outputConns) {
- ConnectorDescriptorId cdId = c.getConnectorId();
- ActivityId a2 = ac.getConsumerActivity(cdId);
- Task[] ac2TaskStates = taskMap.get(a2).getTasks();
- int nConsumers = ac2TaskStates.length;
-
- int[] fanouts = new int[nProducers];
- if (c.allProducersToAllConsumers()) {
- for (int i = 0; i < nProducers; ++i) {
- fanouts[i] = nConsumers;
- }
- } else {
- for (int i = 0; i < nProducers; ++i) {
- c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
- fanouts[i] = targetBitmap.cardinality();
- }
- }
- IConnectorPolicy cp = assignConnectorPolicy(ac, c, nProducers, nConsumers, fanouts);
- cPolicyMap.put(cdId, cp);
- }
- }
- }
- scheduler.getJobRun().getConnectorPolicyMap().putAll(cPolicyMap);
- }
-
- private IConnectorPolicy assignConnectorPolicy(ActivityCluster ac, IConnectorDescriptor c, int nProducers,
- int nConsumers, int[] fanouts) {
- IConnectorPolicyAssignmentPolicy cpap = ac.getConnectorPolicyAssignmentPolicy();
- if (cpap != null) {
- return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
- }
- cpap = ac.getActivityClusterGraph().getConnectorPolicyAssignmentPolicy();
- if (cpap != null) {
- return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
- }
- return new PipeliningConnectorPolicy();
- }
-
- private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
- throws HyracksException {
- PartitionConstraintSolver solver = scheduler.getSolver();
- Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
- for (ActivityId anId : ac.getActivityMap().keySet()) {
- lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
- }
- solver.solve(lValues);
- Map<OperatorDescriptorId, Integer> nPartMap = new HashMap<OperatorDescriptorId, Integer>();
- for (LValueConstraintExpression lv : lValues) {
- Object value = solver.getValue(lv);
- if (value == null) {
- throw new HyracksException("No value found for " + lv);
- }
- if (!(value instanceof Number)) {
- throw new HyracksException("Unexpected type of value bound to " + lv + ": " + value.getClass() + "("
- + value + ")");
- }
- int nParts = ((Number) value).intValue();
- if (nParts <= 0) {
- throw new HyracksException("Unsatisfiable number of partitions for " + lv + ": " + nParts);
- }
- nPartMap.put(((PartitionCountExpression) lv).getOperatorDescriptorId(), Integer.valueOf(nParts));
- }
- Map<ActivityId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityId, ActivityPartitionDetails>();
- for (ActivityId anId : ac.getActivityMap().keySet()) {
- int nParts = nPartMap.get(anId.getOperatorDescriptorId());
- int[] nInputPartitions = null;
- List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(anId);
- if (inputs != null) {
- nInputPartitions = new int[inputs.size()];
- for (int i = 0; i < nInputPartitions.length; ++i) {
- ConnectorDescriptorId cdId = inputs.get(i).getConnectorId();
- ActivityId aid = ac.getProducerActivity(cdId);
- Integer nPartInt = nPartMap.get(aid.getOperatorDescriptorId());
- nInputPartitions[i] = nPartInt;
- }
- }
- int[] nOutputPartitions = null;
- List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(anId);
- if (outputs != null) {
- nOutputPartitions = new int[outputs.size()];
- for (int i = 0; i < nOutputPartitions.length; ++i) {
- ConnectorDescriptorId cdId = outputs.get(i).getConnectorId();
- ActivityId aid = ac.getConsumerActivity(cdId);
- Integer nPartInt = nPartMap.get(aid.getOperatorDescriptorId());
- nOutputPartitions[i] = nPartInt;
- }
- }
- ActivityPartitionDetails apd = new ActivityPartitionDetails(nParts, nInputPartitions, nOutputPartitions);
- activityPartsMap.put(anId, apd);
- }
- return activityPartsMap;
- }
-
- public Map<? extends PartitionId, ? extends TaskCluster> getPartitionProducingTaskClusterMap() {
- return partitionProducingTaskClusterMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityPartitionDetails.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
deleted file mode 100644
index 97b459c..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.scheduler;
-
-import java.util.Arrays;
-
-public class ActivityPartitionDetails {
- private final int nPartitions;
-
- private final int[] nInputPartitions;
-
- private final int[] nOutputPartitions;
-
- public ActivityPartitionDetails(int nPartitions, int[] nInputPartitions, int[] nOutputPartitions) {
- this.nPartitions = nPartitions;
- this.nInputPartitions = nInputPartitions;
- this.nOutputPartitions = nOutputPartitions;
- }
-
- public int getPartitionCount() {
- return nPartitions;
- }
-
- public int[] getInputPartitionCounts() {
- return nInputPartitions;
- }
-
- public int[] getOutputPartitionCounts() {
- return nOutputPartitions;
- }
-
- @Override
- public String toString() {
- return nPartitions + ":" + (nInputPartitions == null ? "[]" : Arrays.toString(nInputPartitions)) + ":"
- + (nOutputPartitions == null ? "[]" : Arrays.toString(nOutputPartitions));
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
new file mode 100644
index 0000000..eac9800
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.scheduler;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.cc.job.JobRun;
+
+/**
+ * An implementation of IJobQueue that gives more priority to jobs that are submitted earlier.
+ */
+public class FIFOJobQueue implements IJobQueue {
+
+ private static final Logger LOGGER = Logger.getLogger(FIFOJobQueue.class.getName());
+
+ private static final int CAPACITY = 4096;
+ private final List<JobRun> jobQueue = new LinkedList<>();
+ private final IJobManager jobManager;
+ private final IJobCapacityController jobCapacityController;
+
+ public FIFOJobQueue(IJobManager jobManager, IJobCapacityController jobCapacityController) {
+ this.jobManager = jobManager;
+ this.jobCapacityController = jobCapacityController;
+ }
+
+ @Override
+ public void add(JobRun run) throws HyracksException {
+ int size = jobQueue.size();
+ if (size >= CAPACITY) {
+ throw HyracksException.create(ErrorCode.JOB_QUEUE_FULL, new Integer(CAPACITY));
+ }
+ jobQueue.add(run);
+ }
+
+ @Override
+ public List<JobRun> pull() {
+ List<JobRun> jobRuns = new ArrayList<>();
+ Iterator<JobRun> runIterator = jobQueue.iterator();
+ while (runIterator.hasNext()) {
+ JobRun run = runIterator.next();
+ JobSpecification job = run.getActivityClusterGraphFactory().getJobSpecification();
+ // Cluster maximum capacity can change over time, thus we have to re-check if the job should be rejected
+ // or not.
+ try {
+ IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
+ // Checks if the job can be executed immediately.
+ if (status == IJobCapacityController.JobSubmissionStatus.EXECUTE) {
+ jobRuns.add(run);
+ runIterator.remove(); // Removes the selected job.
+ }
+ } catch (HyracksException exception) {
+ // The required capacity exceeds maximum capacity.
+ List<Exception> exceptions = new ArrayList<>();
+ exceptions.add(exception);
+ runIterator.remove(); // Removes the job from the queue.
+ try {
+ // Fails the job.
+ jobManager.prepareComplete(run, JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
+ } catch (HyracksException e) {
+ LOGGER.log(Level.SEVERE, e.getMessage(), e);
+ }
+ continue;
+ }
+ }
+ return jobRuns;
+ }
+
+ @Override
+ public Collection<JobRun> jobs() {
+ return Collections.unmodifiableCollection(jobQueue);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
new file mode 100644
index 0000000..2c26799
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.scheduler;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.control.cc.job.JobRun;
+
+/**
+ * This interface specifies a job queue.
+ */
+public interface IJobQueue {
+
+ /**
+ * Adds a job into the job queue.
+ *
+ * @param run,
+ * the descriptor of a job.
+ * @throws HyracksException
+ * when the size of the queue exceeds its capacity.
+ */
+ void add(JobRun run) throws HyracksException;
+
+ /**
+ * Pull a list of jobs from the job queque, when more cluster capacity becomes available.
+ *
+ * @return a list of jobs whose capacity requirements can all be met at the same time.
+ */
+ List<JobRun> pull();
+
+ /**
+ * @return all pending jobs in the queue.
+ */
+ Collection<JobRun> jobs();
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IResourceManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IResourceManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IResourceManager.java
new file mode 100644
index 0000000..a4ac9e7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IResourceManager.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.scheduler;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
+
+/**
+ * This interface abstracts the resource management of a cluster.
+ */
+public interface IResourceManager {
+
+ /**
+ * @return the maximum capacity of the cluster, assuming that there is no running job
+ * that occupies capacity.
+ */
+ IReadOnlyClusterCapacity getMaximumCapacity();
+
+ /**
+ * @return the current capacity for computation.
+ */
+ IClusterCapacity getCurrentCapacity();
+
+ /**
+ * Updates the cluster capacity when a node is added, removed, or updated.
+ *
+ * @param nodeId,
+ * the id of the node for updating.
+ * @param capacity,
+ * the capacity of one particular node.
+ * @throws HyracksException
+ * when the parameters are invalid.
+ */
+ void update(String nodeId, NodeCapacity capacity) throws HyracksException;
+}