You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ti...@apache.org on 2017/01/25 14:46:37 UTC

[4/7] asterixdb git commit: Implements concurrent query management support.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
new file mode 100644
index 0000000..52ad301
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.job;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.cc.scheduler.IJobQueue;
+import org.apache.hyracks.control.cc.work.JobCleanupWork;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.work.IResultCallback;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+// Job manager manages all jobs that haven been submitted to the cluster.
+public class JobManager implements IJobManager {
+
+    private static final Logger LOGGER = Logger.getLogger(JobManager.class.getName());
+
+    private final ClusterControllerService ccs;
+    private final IJobQueue jobQueue;
+    private final Map<JobId, JobRun> activeRunMap;
+    private final Map<JobId, JobRun> runMapArchive;
+    private final Map<JobId, List<Exception>> runMapHistory;
+    private final IJobCapacityController jobCapacityController;
+
+    public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController)
+            throws HyracksException {
+        this.ccs = ccs;
+        this.jobCapacityController = jobCapacityController;
+        try {
+            Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.jobQueueClassName)
+                    .getConstructor(IJobManager.class, IJobCapacityController.class);
+            jobQueue = (IJobQueue) jobQueueConstructor.newInstance(this, this.jobCapacityController);
+        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
+                | InvocationTargetException e) {
+            throw HyracksException.create(ErrorCode.CLASS_LOADING_ISSUE, e, e.getMessage());
+        }
+        activeRunMap = new HashMap<>();
+        runMapArchive = new LinkedHashMap<JobId, JobRun>() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> eldest) {
+                return size() > ccConfig.jobHistorySize;
+            }
+        };
+        runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
+            private static final long serialVersionUID = 1L;
+            /** history size + 1 is for the case when history size = 0 */
+            private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
+
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
+                return size() > allowedSize;
+            }
+        };
+    }
+
+    @Override
+    public void add(JobRun jobRun) throws HyracksException {
+        checkJob(jobRun);
+        JobSpecification job = jobRun.getActivityClusterGraphFactory().getJobSpecification();
+        IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
+        switch (status) {
+            case QUEUE:
+                jobRun.setStatus(JobStatus.PENDING, null);
+                jobQueue.add(jobRun);
+                break;
+            case EXECUTE:
+                executeJob(jobRun);
+                break;
+        }
+    }
+
+    @Override
+    public void prepareComplete(JobRun run, JobStatus status, List<Exception> exceptions) throws HyracksException {
+        checkJob(run);
+        if (status == JobStatus.FAILURE_BEFORE_EXECUTION) {
+            run.setPendingStatus(JobStatus.FAILURE, exceptions);
+            finalComplete(run);
+            return;
+        }
+        JobId jobId = run.getJobId();
+        HyracksException caughtException = null;
+        if (run.getPendingStatus() != null && run.getCleanupPendingNodeIds().isEmpty()) {
+            finalComplete(run);
+            return;
+        }
+        if (run.getPendingStatus() != null) {
+            LOGGER.warning("Ignoring duplicate cleanup for JobRun with id: " + jobId);
+            return;
+        }
+        Set<String> targetNodes = run.getParticipatingNodeIds();
+        run.getCleanupPendingNodeIds().addAll(targetNodes);
+        if (run.getPendingStatus() != JobStatus.FAILURE && run.getPendingStatus() != JobStatus.TERMINATED) {
+            run.setPendingStatus(status, exceptions);
+        }
+
+        if (targetNodes != null && !targetNodes.isEmpty()) {
+            INodeManager nodeManager = ccs.getNodeManager();
+            Set<String> toDelete = new HashSet<>();
+            for (String n : targetNodes) {
+                NodeControllerState ncs = nodeManager.getNodeControllerState(n);
+                try {
+                    if (ncs == null) {
+                        toDelete.add(n);
+                    } else {
+                        ncs.getNodeController().cleanUpJoblet(jobId, status);
+                    }
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, e.getMessage(), e);
+                    if (caughtException == null) {
+                        caughtException = new HyracksException(e);
+                    } else {
+                        caughtException.addSuppressed(e);
+                    }
+                }
+            }
+            targetNodes.removeAll(toDelete);
+            run.getCleanupPendingNodeIds().removeAll(toDelete);
+            if (run.getCleanupPendingNodeIds().isEmpty()) {
+                finalComplete(run);
+            }
+        } else {
+            finalComplete(run);
+        }
+
+        // throws caught exceptions if any
+        if (caughtException != null) {
+            throw caughtException;
+        }
+    }
+
+    @Override
+    public void finalComplete(JobRun run) throws HyracksException {
+        checkJob(run);
+        JobId jobId = run.getJobId();
+        HyracksException caughtException = null;
+        CCApplicationContext appCtx = ccs.getApplicationContext();
+        if (appCtx != null) {
+            try {
+                appCtx.notifyJobFinish(jobId);
+            } catch (HyracksException e) {
+                LOGGER.log(Level.SEVERE, e.getMessage(), e);
+                caughtException = e;
+            }
+        }
+        run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
+        run.setEndTime(System.currentTimeMillis());
+        activeRunMap.remove(jobId);
+        runMapArchive.put(jobId, run);
+        runMapHistory.put(jobId, run.getExceptions());
+
+        if (run.getActivityClusterGraph().isReportTaskDetails()) {
+            /**
+             * log job details when profiling is enabled
+             */
+            try {
+                ccs.getJobLogFile().log(createJobLogObject(run));
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, e.getMessage(), e);
+                if (caughtException == null) {
+                    caughtException = new HyracksException(e);
+                } else {
+                    caughtException.addSuppressed(e);
+                }
+            }
+        }
+
+        // Releases cluster capacitys occupied by the job.
+        JobSpecification job = run.getActivityClusterGraphFactory().getJobSpecification();
+        jobCapacityController.release(job);
+
+        // Picks the next job to execute.
+        pickJobsToRun();
+
+        // throws caught exceptions if any
+        if (caughtException != null) {
+            throw caughtException;
+        }
+    }
+
+
+
+    @Override
+    public Collection<JobRun> getRunningJobs() {
+        return activeRunMap.values();
+    }
+
+    @Override
+    public Collection<JobRun> getPendingJobs() {
+        return jobQueue.jobs();
+    }
+
+    @Override
+    public Collection<JobRun> getArchivedJobs() {
+        return runMapArchive.values();
+    }
+
+    @Override
+    public JobRun get(JobId jobId) {
+        JobRun jobRun = activeRunMap.get(jobId);
+        if (jobRun == null) {
+            jobRun = runMapArchive.get(jobId);
+        }
+        return jobRun;
+    }
+
+    @Override
+    public List<Exception> getRunHistory(JobId jobId) {
+        return runMapHistory.get(jobId);
+    }
+
+    private void pickJobsToRun() {
+        List<JobRun> selectedRuns = jobQueue.pull();
+        for (JobRun run : selectedRuns) {
+            executeJob(run);
+        }
+    }
+
+    // Executes a job when the required capacity for the job is met.
+    private void executeJob(JobRun run) {
+        IResultCallback<JobId> callback = run.getCallback();
+        try {
+            run.setStartTime(System.currentTimeMillis());
+            JobId jobId = run.getJobId();
+            activeRunMap.put(jobId, run);
+
+            CCApplicationContext appCtx = ccs.getApplicationContext();
+            IActivityClusterGraphGeneratorFactory acggf = run.getActivityClusterGraphFactory();
+            appCtx.notifyJobCreation(jobId, acggf);
+            run.setStatus(JobStatus.RUNNING, null);
+            executeJobInternal(run);
+            callback.setValue(jobId);
+        } catch (Exception e) {
+            callback.setException(e);
+        }
+    }
+
+    private void executeJobInternal(JobRun run) {
+        try {
+            run.getExecutor().startJob();
+        } catch (Exception e) {
+            ccs.getWorkQueue()
+                    .schedule(new JobCleanupWork(ccs.getJobManager(), run.getJobId(), JobStatus.FAILURE,
+                            Collections.singletonList(e)));
+        }
+    }
+
+    private ObjectNode createJobLogObject(final JobRun run) {
+        ObjectMapper om = new ObjectMapper();
+        ObjectNode jobLogObject = om.createObjectNode();
+        ActivityClusterGraph acg = run.getActivityClusterGraph();
+        jobLogObject.set("activity-cluster-graph", acg.toJSON());
+        jobLogObject.set("job-run", run.toJSON());
+        return jobLogObject;
+    }
+
+    private void checkJob(JobRun jobRun) throws HyracksException {
+        if (jobRun == null) {
+            throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index f1d04bb..5682194 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -20,17 +20,12 @@ package org.apache.hyracks.control.cc.job;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
@@ -42,29 +37,37 @@ import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.ActivityClusterId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.executor.ActivityPartitionDetails;
+import org.apache.hyracks.control.cc.executor.JobExecutor;
 import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
-import org.apache.hyracks.control.cc.scheduler.ActivityPartitionDetails;
-import org.apache.hyracks.control.cc.scheduler.JobScheduler;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.utils.ExceptionUtils;
+import org.apache.hyracks.control.common.work.IResultCallback;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class JobRun implements IJobStatusConditionVariable {
     private final DeploymentId deploymentId;
 
     private final JobId jobId;
 
+    private final IActivityClusterGraphGeneratorFactory acggf;
+
     private final IActivityClusterGraphGenerator acgg;
 
     private final ActivityClusterGraph acg;
 
-    private final JobScheduler scheduler;
+    private final JobExecutor scheduler;
 
-    private final EnumSet<JobFlag> jobFlags;
+    private final Set<JobFlag> jobFlags;
 
     private final Map<ActivityClusterId, ActivityClusterPlan> activityClusterPlanMap;
 
@@ -94,21 +97,26 @@ public class JobRun implements IJobStatusConditionVariable {
 
     private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
 
+    private final IResultCallback<JobId> callback;
+
     public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
-            IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
+            IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags,
+            IResultCallback<JobId> callback) {
         this.deploymentId = deploymentId;
         this.jobId = jobId;
+        this.acggf = acggf;
         this.acgg = acgg;
         this.acg = acgg.initialize();
-        this.scheduler = new JobScheduler(ccs, this, acgg.getConstraints());
+        this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints());
         this.jobFlags = jobFlags;
-        activityClusterPlanMap = new HashMap<ActivityClusterId, ActivityClusterPlan>();
+        this.callback = callback;
+        activityClusterPlanMap = new HashMap<>();
         pmm = new PartitionMatchMaker();
-        participatingNodeIds = new HashSet<String>();
-        cleanupPendingNodeIds = new HashSet<String>();
+        participatingNodeIds = new HashSet<>();
+        cleanupPendingNodeIds = new HashSet<>();
         profile = new JobProfile(jobId);
-        connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
-        operatorLocations = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
+        connectorPolicyMap = new HashMap<>();
+        operatorLocations = new HashMap<>();
         createTime = System.currentTimeMillis();
     }
 
@@ -120,11 +128,15 @@ public class JobRun implements IJobStatusConditionVariable {
         return jobId;
     }
 
+    public IActivityClusterGraphGeneratorFactory getActivityClusterGraphFactory() {
+        return acggf;
+    }
+
     public ActivityClusterGraph getActivityClusterGraph() {
         return acg;
     }
 
-    public EnumSet<JobFlag> getFlags() {
+    public Set<JobFlag> getFlags() {
         return jobFlags;
     }
 
@@ -167,8 +179,8 @@ public class JobRun implements IJobStatusConditionVariable {
         return createTime;
     }
 
-    public void setCreateTime(long createTime) {
-        this.createTime = createTime;
+    public IResultCallback<JobId> getCallback() {
+        return callback;
     }
 
     public long getStartTime() {
@@ -228,7 +240,7 @@ public class JobRun implements IJobStatusConditionVariable {
         return profile;
     }
 
-    public JobScheduler getScheduler() {
+    public JobExecutor getExecutor() {
         return scheduler;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
index cca56fc..3a5e3be 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.control.cc.partitions;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -27,7 +28,6 @@ import java.util.Set;
 import java.util.logging.Logger;
 
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.common.job.PartitionDescriptor;
@@ -156,7 +156,7 @@ public class PartitionMatchMaker {
         }
     }
 
-    public void notifyNodeFailures(final Set<String> deadNodes) {
+    public void notifyNodeFailures(final Collection<String> deadNodes) {
         removeEntries(partitionDescriptors, new IEntryFilter<PartitionDescriptor>() {
             @Override
             public boolean matches(PartitionDescriptor o) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
index 807798e..65851ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
@@ -16,14 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.control.cc.partitions;
 
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.job.PartitionDescriptor;
 import org.apache.hyracks.control.common.job.PartitionRequest;
@@ -34,8 +35,9 @@ public class PartitionUtils {
         PartitionDescriptor desc = match.getLeft();
         PartitionRequest req = match.getRight();
 
-        NodeControllerState producerNCS = ccs.getNodeMap().get(desc.getNodeId());
-        NodeControllerState requestorNCS = ccs.getNodeMap().get(req.getNodeId());
+        INodeManager nodeManager = ccs.getNodeManager();
+        NodeControllerState producerNCS = nodeManager.getNodeControllerState(desc.getNodeId());
+        NodeControllerState requestorNCS = nodeManager.getNodeControllerState(req.getNodeId());
         final NetworkAddress dataport = producerNCS.getDataPort();
         final INodeController requestorNC = requestorNCS.getNodeController();
         requestorNC.reportPartitionAvailability(pid, dataport);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
deleted file mode 100644
index c13a458..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.scheduler;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
-import org.apache.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.ActivityCluster;
-import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.partitions.PartitionId;
-import org.apache.hyracks.control.cc.job.ActivityClusterPlan;
-import org.apache.hyracks.control.cc.job.ActivityPlan;
-import org.apache.hyracks.control.cc.job.JobRun;
-import org.apache.hyracks.control.cc.job.Task;
-import org.apache.hyracks.control.cc.job.TaskCluster;
-import org.apache.hyracks.control.cc.job.TaskClusterId;
-
-public class ActivityClusterPlanner {
-    private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
-
-    private final JobScheduler scheduler;
-
-    private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
-
-    public ActivityClusterPlanner(JobScheduler newJobScheduler) {
-        this.scheduler = newJobScheduler;
-        partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
-    }
-
-    public ActivityClusterPlan planActivityCluster(ActivityCluster ac) throws HyracksException {
-        JobRun jobRun = scheduler.getJobRun();
-        Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
-
-        Map<ActivityId, ActivityPlan> activityPlanMap = buildActivityPlanMap(ac, jobRun, pcMap);
-
-        assignConnectorPolicy(ac, activityPlanMap);
-
-        TaskCluster[] taskClusters = computeTaskClusters(ac, jobRun, activityPlanMap);
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Plan for " + ac);
-            LOGGER.info("Built " + taskClusters.length + " Task Clusters");
-            for (TaskCluster tc : taskClusters) {
-                LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
-            }
-        }
-
-        return new ActivityClusterPlan(taskClusters, activityPlanMap);
-    }
-
-    private Map<ActivityId, ActivityPlan> buildActivityPlanMap(ActivityCluster ac, JobRun jobRun,
-            Map<ActivityId, ActivityPartitionDetails> pcMap) {
-        Map<ActivityId, ActivityPlan> activityPlanMap = new HashMap<ActivityId, ActivityPlan>();
-        Set<ActivityId> depAnIds = new HashSet<ActivityId>();
-        for (ActivityId anId : ac.getActivityMap().keySet()) {
-            depAnIds.clear();
-            getDependencyActivityIds(depAnIds, anId, ac);
-            ActivityPartitionDetails apd = pcMap.get(anId);
-            Task[] tasks = new Task[apd.getPartitionCount()];
-            ActivityPlan activityPlan = new ActivityPlan(apd);
-            for (int i = 0; i < tasks.length; ++i) {
-                TaskId tid = new TaskId(anId, i);
-                tasks[i] = new Task(tid, activityPlan);
-                for (ActivityId danId : depAnIds) {
-                    ActivityCluster dAC = ac.getActivityClusterGraph().getActivityMap().get(danId);
-                    ActivityClusterPlan dACP = jobRun.getActivityClusterPlanMap().get(dAC.getId());
-                    assert dACP != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
-                            + danId;
-                    Task[] dATasks = dACP.getActivityPlanMap().get(danId).getTasks();
-                    assert dATasks != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
-                            + danId;
-                    assert dATasks.length == tasks.length : "Dependency activity partitioned differently from dependent: "
-                            + dATasks.length + " != " + tasks.length;
-                    Task dTask = dATasks[i];
-                    TaskId dTaskId = dTask.getTaskId();
-                    tasks[i].getDependencies().add(dTaskId);
-                    dTask.getDependents().add(tid);
-                }
-            }
-            activityPlan.setTasks(tasks);
-            activityPlanMap.put(anId, activityPlan);
-        }
-        return activityPlanMap;
-    }
-
-    private TaskCluster[] computeTaskClusters(ActivityCluster ac, JobRun jobRun,
-            Map<ActivityId, ActivityPlan> activityPlanMap) {
-        Set<ActivityId> activities = ac.getActivityMap().keySet();
-        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = computeTaskConnectivity(jobRun,
-                activityPlanMap, activities);
-
-        TaskCluster[] taskClusters = ac.getActivityClusterGraph().isUseConnectorPolicyForScheduling() ? buildConnectorPolicyAwareTaskClusters(
-                ac, activityPlanMap, taskConnectivity) : buildConnectorPolicyUnawareTaskClusters(ac, activityPlanMap);
-
-        for (TaskCluster tc : taskClusters) {
-            Set<TaskCluster> tcDependencyTaskClusters = tc.getDependencyTaskClusters();
-            for (Task ts : tc.getTasks()) {
-                TaskId tid = ts.getTaskId();
-                List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
-                if (cInfoList != null) {
-                    for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
-                        Task targetTS = activityPlanMap.get(p.getLeft().getActivityId()).getTasks()[p.getLeft()
-                                .getPartition()];
-                        TaskCluster targetTC = targetTS.getTaskCluster();
-                        if (targetTC != tc) {
-                            ConnectorDescriptorId cdId = p.getRight();
-                            PartitionId pid = new PartitionId(jobRun.getJobId(), cdId, tid.getPartition(), p.getLeft()
-                                    .getPartition());
-                            tc.getProducedPartitions().add(pid);
-                            targetTC.getRequiredPartitions().add(pid);
-                            partitionProducingTaskClusterMap.put(pid, tc);
-                        }
-                    }
-                }
-
-                for (TaskId dTid : ts.getDependencies()) {
-                    TaskCluster dTC = getTaskCluster(dTid);
-                    dTC.getDependentTaskClusters().add(tc);
-                    tcDependencyTaskClusters.add(dTC);
-                }
-            }
-        }
-        return taskClusters;
-    }
-
-    private TaskCluster[] buildConnectorPolicyUnawareTaskClusters(ActivityCluster ac,
-            Map<ActivityId, ActivityPlan> activityPlanMap) {
-        List<Task> taskStates = new ArrayList<Task>();
-        for (ActivityId anId : ac.getActivityMap().keySet()) {
-            ActivityPlan ap = activityPlanMap.get(anId);
-            Task[] tasks = ap.getTasks();
-            for (Task t : tasks) {
-                taskStates.add(t);
-            }
-        }
-        TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getId(), 0), ac, taskStates.toArray(new Task[taskStates
-                .size()]));
-        for (Task t : tc.getTasks()) {
-            t.setTaskCluster(tc);
-        }
-        return new TaskCluster[] { tc };
-    }
-
-    private Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> computeTaskConnectivity(JobRun jobRun,
-            Map<ActivityId, ActivityPlan> activityPlanMap, Set<ActivityId> activities) {
-        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
-        ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
-        BitSet targetBitmap = new BitSet();
-        for (ActivityId ac1 : activities) {
-            ActivityCluster ac = acg.getActivityMap().get(ac1);
-            Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
-            int nProducers = ac1TaskStates.length;
-            List<IConnectorDescriptor> outputConns = ac.getActivityOutputMap().get(ac1);
-            if (outputConns != null) {
-                for (IConnectorDescriptor c : outputConns) {
-                    ConnectorDescriptorId cdId = c.getConnectorId();
-                    ActivityId ac2 = ac.getConsumerActivity(cdId);
-                    Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
-                    int nConsumers = ac2TaskStates.length;
-                    if (c.allProducersToAllConsumers()) {
-                        List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
-                        for (int j = 0; j < nConsumers; j++) {
-                            TaskId targetTID = ac2TaskStates[j].getTaskId();
-                            cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
-                        }
-                        for (int i = 0; i < nProducers; ++i) {
-                            taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
-                        }
-                    } else {
-                        for (int i = 0; i < nProducers; ++i) {
-                            c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
-                            List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
-                                    .getTaskId());
-                            if (cInfoList == null) {
-                                cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
-                                taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
-                            }
-                            for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
-                                TaskId targetTID = ac2TaskStates[j].getTaskId();
-                                cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        return taskConnectivity;
-    }
-
-    private TaskCluster[] buildConnectorPolicyAwareTaskClusters(ActivityCluster ac,
-            Map<ActivityId, ActivityPlan> activityPlanMap,
-            Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity) {
-        Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
-        for (ActivityId anId : ac.getActivityMap().keySet()) {
-            ActivityPlan ap = activityPlanMap.get(anId);
-            Task[] tasks = ap.getTasks();
-            for (Task t : tasks) {
-                Set<TaskId> cluster = new HashSet<TaskId>();
-                TaskId tid = t.getTaskId();
-                cluster.add(tid);
-                taskClusterMap.put(tid, cluster);
-            }
-        }
-
-        JobRun jobRun = scheduler.getJobRun();
-        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
-        for (Map.Entry<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> e : taskConnectivity.entrySet()) {
-            Set<TaskId> cluster = taskClusterMap.get(e.getKey());
-            for (Pair<TaskId, ConnectorDescriptorId> p : e.getValue()) {
-                IConnectorPolicy cPolicy = connectorPolicies.get(p.getRight());
-                if (cPolicy.requiresProducerConsumerCoscheduling()) {
-                    cluster.add(p.getLeft());
-                }
-            }
-        }
-
-        /*
-         * taskClusterMap contains for every TID x, x -> { coscheduled consumer TIDs U x }
-         * We compute the transitive closure of this relation to find the largest set of
-         * tasks that need to be co-scheduled
-         */
-        int counter = 0;
-        TaskId[] ordinalList = new TaskId[taskClusterMap.size()];
-        Map<TaskId, Integer> ordinalMap = new HashMap<TaskId, Integer>();
-        for (TaskId tid : taskClusterMap.keySet()) {
-            ordinalList[counter] = tid;
-            ordinalMap.put(tid, counter);
-            ++counter;
-        }
-
-        int n = ordinalList.length;
-        BitSet[] paths = new BitSet[n];
-        for (Map.Entry<TaskId, Set<TaskId>> e : taskClusterMap.entrySet()) {
-            int i = ordinalMap.get(e.getKey());
-            BitSet bsi = paths[i];
-            if (bsi == null) {
-                bsi = new BitSet(n);
-                paths[i] = bsi;
-            }
-            for (TaskId ttid : e.getValue()) {
-                int j = ordinalMap.get(ttid);
-                paths[i].set(j);
-                BitSet bsj = paths[j];
-                if (bsj == null) {
-                    bsj = new BitSet(n);
-                    paths[j] = bsj;
-                }
-                bsj.set(i);
-            }
-        }
-        for (int k = 0; k < n; ++k) {
-            for (int i = paths[k].nextSetBit(0); i >= 0; i = paths[k].nextSetBit(i + 1)) {
-                for (int j = paths[i].nextClearBit(0); j < n && j >= 0; j = paths[i].nextClearBit(j + 1)) {
-                    paths[i].set(j, paths[k].get(j));
-                    paths[j].set(i, paths[i].get(j));
-                }
-            }
-        }
-        BitSet pending = new BitSet(n);
-        pending.set(0, n);
-        List<List<TaskId>> clusters = new ArrayList<List<TaskId>>();
-        for (int i = pending.nextSetBit(0); i >= 0; i = pending.nextSetBit(i)) {
-            List<TaskId> cluster = new ArrayList<TaskId>();
-            for (int j = paths[i].nextSetBit(0); j >= 0; j = paths[i].nextSetBit(j + 1)) {
-                cluster.add(ordinalList[j]);
-                pending.clear(j);
-            }
-            clusters.add(cluster);
-        }
-
-        List<TaskCluster> tcSet = new ArrayList<TaskCluster>();
-        counter = 0;
-        for (List<TaskId> cluster : clusters) {
-            List<Task> taskStates = new ArrayList<Task>();
-            for (TaskId tid : cluster) {
-                taskStates.add(activityPlanMap.get(tid.getActivityId()).getTasks()[tid.getPartition()]);
-            }
-            TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getId(), counter++), ac,
-                    taskStates.toArray(new Task[taskStates.size()]));
-            tcSet.add(tc);
-            for (TaskId tid : cluster) {
-                activityPlanMap.get(tid.getActivityId()).getTasks()[tid.getPartition()].setTaskCluster(tc);
-            }
-        }
-        TaskCluster[] taskClusters = tcSet.toArray(new TaskCluster[tcSet.size()]);
-        return taskClusters;
-    }
-
-    private TaskCluster getTaskCluster(TaskId tid) {
-        JobRun run = scheduler.getJobRun();
-        ActivityCluster ac = run.getActivityClusterGraph().getActivityMap().get(tid.getActivityId());
-        ActivityClusterPlan acp = run.getActivityClusterPlanMap().get(ac.getId());
-        Task[] tasks = acp.getActivityPlanMap().get(tid.getActivityId()).getTasks();
-        Task task = tasks[tid.getPartition()];
-        assert task.getTaskId().equals(tid);
-        return task.getTaskCluster();
-    }
-
-    private void getDependencyActivityIds(Set<ActivityId> depAnIds, ActivityId anId, ActivityCluster ac) {
-        Set<ActivityId> blockers = ac.getBlocked2BlockerMap().get(anId);
-        if (blockers != null) {
-            depAnIds.addAll(blockers);
-        }
-    }
-
-    private void assignConnectorPolicy(ActivityCluster ac, Map<ActivityId, ActivityPlan> taskMap) {
-        Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
-        Set<ActivityId> activities = ac.getActivityMap().keySet();
-        BitSet targetBitmap = new BitSet();
-        for (ActivityId a1 : activities) {
-            Task[] ac1TaskStates = taskMap.get(a1).getTasks();
-            int nProducers = ac1TaskStates.length;
-            List<IConnectorDescriptor> outputConns = ac.getActivityOutputMap().get(a1);
-            if (outputConns != null) {
-                for (IConnectorDescriptor c : outputConns) {
-                    ConnectorDescriptorId cdId = c.getConnectorId();
-                    ActivityId a2 = ac.getConsumerActivity(cdId);
-                    Task[] ac2TaskStates = taskMap.get(a2).getTasks();
-                    int nConsumers = ac2TaskStates.length;
-
-                    int[] fanouts = new int[nProducers];
-                    if (c.allProducersToAllConsumers()) {
-                        for (int i = 0; i < nProducers; ++i) {
-                            fanouts[i] = nConsumers;
-                        }
-                    } else {
-                        for (int i = 0; i < nProducers; ++i) {
-                            c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
-                            fanouts[i] = targetBitmap.cardinality();
-                        }
-                    }
-                    IConnectorPolicy cp = assignConnectorPolicy(ac, c, nProducers, nConsumers, fanouts);
-                    cPolicyMap.put(cdId, cp);
-                }
-            }
-        }
-        scheduler.getJobRun().getConnectorPolicyMap().putAll(cPolicyMap);
-    }
-
-    private IConnectorPolicy assignConnectorPolicy(ActivityCluster ac, IConnectorDescriptor c, int nProducers,
-            int nConsumers, int[] fanouts) {
-        IConnectorPolicyAssignmentPolicy cpap = ac.getConnectorPolicyAssignmentPolicy();
-        if (cpap != null) {
-            return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
-        }
-        cpap = ac.getActivityClusterGraph().getConnectorPolicyAssignmentPolicy();
-        if (cpap != null) {
-            return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
-        }
-        return new PipeliningConnectorPolicy();
-    }
-
-    private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
-            throws HyracksException {
-        PartitionConstraintSolver solver = scheduler.getSolver();
-        Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
-        for (ActivityId anId : ac.getActivityMap().keySet()) {
-            lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
-        }
-        solver.solve(lValues);
-        Map<OperatorDescriptorId, Integer> nPartMap = new HashMap<OperatorDescriptorId, Integer>();
-        for (LValueConstraintExpression lv : lValues) {
-            Object value = solver.getValue(lv);
-            if (value == null) {
-                throw new HyracksException("No value found for " + lv);
-            }
-            if (!(value instanceof Number)) {
-                throw new HyracksException("Unexpected type of value bound to " + lv + ": " + value.getClass() + "("
-                        + value + ")");
-            }
-            int nParts = ((Number) value).intValue();
-            if (nParts <= 0) {
-                throw new HyracksException("Unsatisfiable number of partitions for " + lv + ": " + nParts);
-            }
-            nPartMap.put(((PartitionCountExpression) lv).getOperatorDescriptorId(), Integer.valueOf(nParts));
-        }
-        Map<ActivityId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityId, ActivityPartitionDetails>();
-        for (ActivityId anId : ac.getActivityMap().keySet()) {
-            int nParts = nPartMap.get(anId.getOperatorDescriptorId());
-            int[] nInputPartitions = null;
-            List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(anId);
-            if (inputs != null) {
-                nInputPartitions = new int[inputs.size()];
-                for (int i = 0; i < nInputPartitions.length; ++i) {
-                    ConnectorDescriptorId cdId = inputs.get(i).getConnectorId();
-                    ActivityId aid = ac.getProducerActivity(cdId);
-                    Integer nPartInt = nPartMap.get(aid.getOperatorDescriptorId());
-                    nInputPartitions[i] = nPartInt;
-                }
-            }
-            int[] nOutputPartitions = null;
-            List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(anId);
-            if (outputs != null) {
-                nOutputPartitions = new int[outputs.size()];
-                for (int i = 0; i < nOutputPartitions.length; ++i) {
-                    ConnectorDescriptorId cdId = outputs.get(i).getConnectorId();
-                    ActivityId aid = ac.getConsumerActivity(cdId);
-                    Integer nPartInt = nPartMap.get(aid.getOperatorDescriptorId());
-                    nOutputPartitions[i] = nPartInt;
-                }
-            }
-            ActivityPartitionDetails apd = new ActivityPartitionDetails(nParts, nInputPartitions, nOutputPartitions);
-            activityPartsMap.put(anId, apd);
-        }
-        return activityPartsMap;
-    }
-
-    public Map<? extends PartitionId, ? extends TaskCluster> getPartitionProducingTaskClusterMap() {
-        return partitionProducingTaskClusterMap;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityPartitionDetails.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
deleted file mode 100644
index 97b459c..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.scheduler;
-
-import java.util.Arrays;
-
-public class ActivityPartitionDetails {
-    private final int nPartitions;
-
-    private final int[] nInputPartitions;
-
-    private final int[] nOutputPartitions;
-
-    public ActivityPartitionDetails(int nPartitions, int[] nInputPartitions, int[] nOutputPartitions) {
-        this.nPartitions = nPartitions;
-        this.nInputPartitions = nInputPartitions;
-        this.nOutputPartitions = nOutputPartitions;
-    }
-
-    public int getPartitionCount() {
-        return nPartitions;
-    }
-
-    public int[] getInputPartitionCounts() {
-        return nInputPartitions;
-    }
-
-    public int[] getOutputPartitionCounts() {
-        return nOutputPartitions;
-    }
-
-    @Override
-    public String toString() {
-        return nPartitions + ":" + (nInputPartitions == null ? "[]" : Arrays.toString(nInputPartitions)) + ":"
-                + (nOutputPartitions == null ? "[]" : Arrays.toString(nOutputPartitions));
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
new file mode 100644
index 0000000..eac9800
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.scheduler;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.cc.job.JobRun;
+
+/**
+ * An implementation of IJobQueue that gives more priority to jobs that are submitted earlier.
+ */
+public class FIFOJobQueue implements IJobQueue {
+
+    private static final Logger LOGGER = Logger.getLogger(FIFOJobQueue.class.getName());
+
+    private static final int CAPACITY = 4096;
+    private final List<JobRun> jobQueue = new LinkedList<>();
+    private final IJobManager jobManager;
+    private final IJobCapacityController jobCapacityController;
+
+    public FIFOJobQueue(IJobManager jobManager, IJobCapacityController jobCapacityController) {
+        this.jobManager = jobManager;
+        this.jobCapacityController = jobCapacityController;
+    }
+
+    @Override
+    public void add(JobRun run) throws HyracksException {
+        int size = jobQueue.size();
+        if (size >= CAPACITY) {
+            throw HyracksException.create(ErrorCode.JOB_QUEUE_FULL, new Integer(CAPACITY));
+        }
+        jobQueue.add(run);
+    }
+
+    @Override
+    public List<JobRun> pull() {
+        List<JobRun> jobRuns = new ArrayList<>();
+        Iterator<JobRun> runIterator = jobQueue.iterator();
+        while (runIterator.hasNext()) {
+            JobRun run = runIterator.next();
+            JobSpecification job = run.getActivityClusterGraphFactory().getJobSpecification();
+            // Cluster maximum capacity can change over time, thus we have to re-check if the job should be rejected
+            // or not.
+            try {
+                IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
+                // Checks if the job can be executed immediately.
+                if (status == IJobCapacityController.JobSubmissionStatus.EXECUTE) {
+                    jobRuns.add(run);
+                    runIterator.remove(); // Removes the selected job.
+                }
+            } catch (HyracksException exception) {
+                // The required capacity exceeds maximum capacity.
+                List<Exception> exceptions = new ArrayList<>();
+                exceptions.add(exception);
+                runIterator.remove(); // Removes the job from the queue.
+                try {
+                    // Fails the job.
+                    jobManager.prepareComplete(run, JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
+                } catch (HyracksException e) {
+                    LOGGER.log(Level.SEVERE, e.getMessage(), e);
+                }
+                continue;
+            }
+        }
+        return jobRuns;
+    }
+
+    @Override
+    public Collection<JobRun> jobs() {
+        return Collections.unmodifiableCollection(jobQueue);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
new file mode 100644
index 0000000..2c26799
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.scheduler;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.control.cc.job.JobRun;
+
+/**
+ * This interface specifies a job queue.
+ */
+public interface IJobQueue {
+
+    /**
+     * Adds a job into the job queue.
+     *
+     * @param run,
+     *            the descriptor of a job.
+     * @throws HyracksException
+     *             when the size of the queue exceeds its capacity.
+     */
+    void add(JobRun run) throws HyracksException;
+
+    /**
+     * Pull a list of jobs from the job queque, when more cluster capacity becomes available.
+     *
+     * @return a list of jobs whose capacity requirements can all be met at the same time.
+     */
+    List<JobRun> pull();
+
+    /**
+     * @return all pending jobs in the queue.
+     */
+    Collection<JobRun> jobs();
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IResourceManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IResourceManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IResourceManager.java
new file mode 100644
index 0000000..a4ac9e7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IResourceManager.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.scheduler;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
+
+/**
+ * This interface abstracts the resource management of a cluster.
+ */
+public interface IResourceManager {
+
+    /**
+     * @return the maximum capacity of the cluster, assuming that there is no running job
+     *         that occupies capacity.
+     */
+    IReadOnlyClusterCapacity getMaximumCapacity();
+
+    /**
+     * @return the current capacity for computation.
+     */
+    IClusterCapacity getCurrentCapacity();
+
+    /**
+     * Updates the cluster capacity when a node is added, removed, or updated.
+     *
+     * @param nodeId,
+     *            the id of the node for updating.
+     * @param capacity,
+     *            the capacity of one particular node.
+     * @throws HyracksException
+     *             when the parameters are invalid.
+     */
+    void update(String nodeId, NodeCapacity capacity) throws HyracksException;
+}