You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2014/09/26 17:19:55 UTC

[09/15] git commit: Get max job count from compute resource description of particular resource instead of global max value

Get max job count from compute resource description of particular resource instead of global max value


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/8bc79959
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/8bc79959
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/8bc79959

Branch: refs/heads/orchestratorJobThrottleFeature
Commit: 8bc79959ff306f7236b233377c6acf94aa7078ab
Parents: a78137b
Author: shamrath <sh...@gmail.com>
Authored: Mon Sep 22 10:22:00 2014 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Mon Sep 22 10:22:00 2014 -0400

----------------------------------------------------------------------
 .../handlers/GridPullMonitorHandler.java        |  19 ++-
 .../monitor/impl/pull/qstat/HPCPullMonitor.java |  23 ++-
 .../airavata/gfac/monitor/util/CommonUtils.java |  50 ++++++
 modules/orchestrator/orchestrator-core/pom.xml  |   2 +-
 .../core/validator/impl/JobCountValidator.java  | 162 ++++++++-----------
 5 files changed, 148 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/8bc79959/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index 451466d..5188163 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -22,6 +22,7 @@ package org.apache.airavata.gfac.monitor.handlers;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.util.AuthenticationInfo;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
@@ -32,8 +33,9 @@ import org.apache.airavata.gfac.monitor.HPCMonitorID;
 import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor;
 import org.apache.airavata.gfac.monitor.util.CommonUtils;
-import org.apache.airavata.credential.store.util.AuthenticationInfo;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -101,7 +103,20 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{
             }
             CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID);
             if (ServerSettings.getEnableJobRestrictionValidation().equals("true")) {
-                CommonUtils.increaseZkJobCount(monitorID); // update change job count to zookeeper
+                try {
+                    TaskDetails taskDetails = monitorID.getJobExecutionContext().getTaskData();
+
+                    ComputeResourceDescription computeResourceDescription =
+                            CommonUtils.getComputeResourceDescription(taskDetails);
+                    if (computeResourceDescription.getBatchQueues().size() > 0 &&
+                            computeResourceDescription.getBatchQueues().get(0).getMaxJobsInQueue() > 0) {
+
+                    CommonUtils.increaseZkJobCount(monitorID); // update change job count to zookeeper
+                    }
+                } catch (Exception e) {
+                    logger.error("Error reading max job count from Computer Resource Description," +
+                            " zookeeper job count update process failed");
+                }
             }
         } catch (AiravataMonitorException e) {
             logger.error("Error adding monitorID object to the queue with experiment ", monitorID.getExperimentID());

http://git-wip-us.apache.org/repos/asf/airavata/blob/8bc79959/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 25a1ab2..dcbe905 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -38,6 +38,7 @@ import org.apache.airavata.gfac.monitor.impl.push.amqp.SimpleJobFinishConsumer;
 import org.apache.airavata.gfac.monitor.util.CommonUtils;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.credential.store.util.AuthenticationInfo;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.model.workspace.experiment.TaskState;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
@@ -273,15 +274,21 @@ public class HPCPullMonitor extends PullMonitor {
             for (MonitorID completedJob : completedJobs) {
                 CommonUtils.removeMonitorFromQueue(queue, completedJob);
                 if (ServerSettings.getEnableJobRestrictionValidation().equals("true")) { // is job restriction available?
-                    if (zk == null) {
-                        zk = completedJob.getJobExecutionContext().getZk();
-                    }
-                    String key = CommonUtils.getJobCountUpdatePath(completedJob);
-                    int i = 0;
-                    if (jobRemoveCountMap.containsKey(key)) {
-                        i = Integer.valueOf(jobRemoveCountMap.get(key));
+                    ComputeResourceDescription computeResourceDesc = CommonUtils.getComputeResourceDescription(
+                            completedJob.getJobExecutionContext().getTaskData());
+                    if (computeResourceDesc.getBatchQueuesSize() > 0 && computeResourceDesc.getBatchQueues().get(0).getMaxJobsInQueue() > 0) {
+                        if (zk == null) {
+                            zk = completedJob.getJobExecutionContext().getZk();
+                        }
+                        String key = CommonUtils.getJobCountUpdatePath(completedJob);
+                        int i = 0;
+                        if (jobRemoveCountMap.containsKey(key)) {
+                            i = Integer.valueOf(jobRemoveCountMap.get(key));
+                        }
+                        jobRemoveCountMap.put(key, ++i);
+                    } else {
+                        // ignore
                     }
-                    jobRemoveCountMap.put(key, ++i);
                 }
             }
             if (ServerSettings.getEnableJobRestrictionValidation().equals("true") && completedJobs.size() > 0) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/8bc79959/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index fb4d898..923eb78 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -20,18 +20,29 @@
 */
 package org.apache.airavata.gfac.monitor.util;
 
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.airavata.appcatalog.cpi.ComputeResource;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.aiaravata.application.catalog.data.resources.AbstractResource;
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.handler.GFacHandler;
 import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.scheduler.HostScheduler;
 import org.apache.airavata.gfac.monitor.HostMonitorData;
 import org.apache.airavata.gfac.monitor.UserMonitorData;
 import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -43,6 +54,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -251,6 +263,7 @@ public class CommonUtils {
             for (String path : changeCountMap.keySet()) {
                 if (isAdd) {
                     CommonUtils.checkAndCreateZNode(zk, path);
+                    logger.info("Recursively created znode : " + path);
                 }
                 byte[] byteData = zk.getData(path, null, null);
                 String nodeData;
@@ -330,4 +343,41 @@ public class CommonUtils {
             zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// create a znode
         }
     }
+
+    public static ComputeResourceDescription getComputeResourceDescription(TaskDetails taskDetails) throws AiravataException {
+        try {
+            AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+            ApplicationInterfaceDescription applicationInterface = appCatalog.getApplicationInterface().
+                    getApplicationInterface(taskDetails.getApplicationId());
+
+            List<String> applicationModules = applicationInterface.getApplicationModules();
+            String selectedModuleId = applicationModules.get(0); // get the first module
+            Map<String, String> moduleIdFilter = new HashMap<String, String>();
+            moduleIdFilter.put(AbstractResource.ApplicationDeploymentConstants.APP_MODULE_ID, selectedModuleId);
+            if (taskDetails.getTaskScheduling()!=null && taskDetails.getTaskScheduling().getResourceHostId() != null) {
+                moduleIdFilter.put(AbstractResource.ApplicationDeploymentConstants.COMPUTE_HOST_ID,
+                        taskDetails.getTaskScheduling().getResourceHostId());
+            }
+            List<ApplicationDeploymentDescription> applicationDeployements = appCatalog.getApplicationDeployment()
+                    .getApplicationDeployements(moduleIdFilter);
+            Map<ComputeResourceDescription, ApplicationDeploymentDescription> deploymentMap =
+                    new HashMap<ComputeResourceDescription, ApplicationDeploymentDescription>();
+            ComputeResource computeResource = appCatalog.getComputeResource();
+            for (ApplicationDeploymentDescription deploymentDescription : applicationDeployements) {
+                deploymentMap.put(computeResource.getComputeResource(deploymentDescription.getComputeHostId()),
+                        deploymentDescription);
+            }
+            List<ComputeResourceDescription> computeHostList = new ArrayList<ComputeResourceDescription>();
+            computeHostList.addAll(deploymentMap.keySet());
+            Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(
+                    HostScheduler.class);
+            HostScheduler hostScheduler = aClass.newInstance();
+            ComputeResourceDescription ComputeResourceDescription = hostScheduler.schedule(computeHostList);
+            ApplicationDeploymentDescription applicationDeploymentDescription = deploymentMap.get(ComputeResourceDescription);
+            return appCatalog.getComputeResource().getComputeResource(applicationDeploymentDescription.getComputeHostId());
+        } catch (Exception e) {
+            throw new AiravataException("Error while getting Compute Resource Description", e);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8bc79959/modules/orchestrator/orchestrator-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml
index 61a7efc..576690a 100644
--- a/modules/orchestrator/orchestrator-core/pom.xml
+++ b/modules/orchestrator/orchestrator-core/pom.xml
@@ -68,7 +68,7 @@ the License. -->
             <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-gfac-hpc-monitor</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
+            <!--<scope>test</scope>-->
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/8bc79959/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java
index 2c66fa2..0fb98ac 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java
@@ -23,17 +23,14 @@ package org.apache.airavata.orchestrator.core.validator.impl;
 
 import org.airavata.appcatalog.cpi.AppCatalog;
 import org.airavata.appcatalog.cpi.AppCatalogException;
-import org.airavata.appcatalog.cpi.ComputeResource;
 import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
-import org.apache.aiaravata.application.catalog.data.resources.AbstractResource;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.RequestData;
 import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.scheduler.HostScheduler;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
-import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
 import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
@@ -47,9 +44,6 @@ import org.apache.airavata.persistance.registry.jpa.model.TaskDetail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -63,79 +57,50 @@ public class JobCountValidator implements JobMetadataValidator {
                                     String credStoreToken) {
         ValidatorResult result;
         try {
-            AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
-            ApplicationInterfaceDescription applicationInterface = appCatalog.getApplicationInterface().
-                    getApplicationInterface(taskID.getApplicationId());
-
-            List<String> applicationModules = applicationInterface.getApplicationModules();
-            String selectedModuleId = applicationModules.get(0);
-            Map<String, String> moduleIdFilter = new HashMap<String, String>();
-            moduleIdFilter.put(AbstractResource.ApplicationDeploymentConstants.APP_MODULE_ID, selectedModuleId);
-            if (taskID.getTaskScheduling()!=null && taskID.getTaskScheduling().getResourceHostId() != null) {
-                moduleIdFilter.put(AbstractResource.ApplicationDeploymentConstants.COMPUTE_HOST_ID,
-                        taskID.getTaskScheduling().getResourceHostId());
-            }
-            List<ApplicationDeploymentDescription> applicationDeployements = appCatalog.getApplicationDeployment()
-                    .getApplicationDeployements(moduleIdFilter);
-            Map<ComputeResourceDescription, ApplicationDeploymentDescription> deploymentMap =
-                    new HashMap<ComputeResourceDescription, ApplicationDeploymentDescription>();
-            ComputeResource computeResource = appCatalog.getComputeResource();
-            for (ApplicationDeploymentDescription deploymentDescription : applicationDeployements) {
-                deploymentMap.put(computeResource.getComputeResource(deploymentDescription.getComputeHostId()),
-                        deploymentDescription);
-            }
-            List<ComputeResourceDescription> computeHostList = new ArrayList<ComputeResourceDescription>();
-            computeHostList.addAll(deploymentMap.keySet());
-
-            Class<? extends HostScheduler> aClass = Class.forName(
-                    ServerSettings.getHostScheduler()).asSubclass(
-                    HostScheduler.class);
-            HostScheduler hostScheduler = aClass.newInstance();
-            ComputeResourceDescription ComputeResourceDescription = hostScheduler.schedule(computeHostList);
-            ApplicationDeploymentDescription applicationDeploymentDescription = deploymentMap.get(ComputeResourceDescription);
-
-            ComputeResourceDescription computeResourceDescription = appCatalog.getComputeResource().
-                    getComputeResource(applicationDeploymentDescription.getComputeHostId());
-            for (JobSubmissionInterface jobSubmissionInterface : computeResourceDescription.getJobSubmissionInterfaces()) {
-                switch (jobSubmissionInterface.getJobSubmissionProtocol()) {
-                    case LOCAL:
-                        // nothing to do
-                        return new ValidatorResult(true);
-                    case SSH:
-                        SSHJobSubmission sshJobSubmission =
-                                appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
-                        switch (sshJobSubmission.getSecurityProtocol()) {
-                            case GSI:
-                                // gsi
-                                RequestData requestData = new RequestData(ServerSettings.getDefaultUserGateway());
-                                requestData.setTokenId(credStoreToken);
-                                if (isJobSpaceAvailable(requestData.getMyProxyUserName(), computeHostList)) {
-                                    return new ValidatorResult(true);
-                                } else {
+            ComputeResourceDescription computeResourceDes = CommonUtils.getComputeResourceDescription(taskID);
+            if (computeResourceDes.getBatchQueuesSize() > 0 &&
+                    computeResourceDes.getBatchQueues().get(0).getMaxJobsInQueue() > 0) {
+                int resourceMaxJobCount = computeResourceDes.getBatchQueues().get(0).getMaxJobsInQueue();
+                for (JobSubmissionInterface jobSubmissionInterface : computeResourceDes.getJobSubmissionInterfaces()) {
+                    switch (jobSubmissionInterface.getJobSubmissionProtocol()) {
+                        case LOCAL:
+                            // nothing to do
+                            return new ValidatorResult(true);
+                        case SSH:
+                            SSHJobSubmission sshJobSubmission =
+                                    AppCatalogFactory.getAppCatalog().getComputeResource().getSSHJobSubmission(
+                                            jobSubmissionInterface.getJobSubmissionInterfaceId());
+                            switch (sshJobSubmission.getSecurityProtocol()) {
+                                case GSI:
+                                    // gsi
+                                    RequestData requestData = new RequestData(ServerSettings.getDefaultUserGateway());
+                                    requestData.setTokenId(credStoreToken);
+                                    return isJobSpaceAvailable(requestData.getMyProxyUserName(),
+                                            computeResourceDes.getHostName(), resourceMaxJobCount);
+                                case SSH_KEYS:
                                     result = new ValidatorResult(false);
-                                    result.setErrorDetails("Please honour to the gobal max job count " + ServerSettings.getGlobalMaxJobCount());
+                                    result.setErrorDetails("SSH_KEY base job count validation is not yet implemented");
                                     return result;
-                                }
-//                                TokenizedMyProxyAuthInfo tokenizedMyProxyAuthInfo = new TokenizedMyProxyAuthInfo(requestData);
-                            case SSH_KEYS:
-                                result = new ValidatorResult(false);
-                                result.setErrorDetails("SSH_KEY base job count validation is not yet implemented");
-                                return result;
                                 // ssh
-                            default:
-                                result = new ValidatorResult(false);
-                                result.setErrorDetails("Doesn't support " + sshJobSubmission.getSecurityProtocol() + " protocol yet");
-                                return result;
-                        }
-                    default:
-                        result = new ValidatorResult(false);
-                        result.setErrorDetails("Doesn't support " + jobSubmissionInterface.getJobSubmissionProtocol() + " protocol yet");
-                        return result;
+                                default:
+                                    result = new ValidatorResult(false);
+                                    result.setErrorDetails("Doesn't support " + sshJobSubmission.getSecurityProtocol() +
+                                            " protocol yet");
+                                    return result;
+                            }
+                        default:
+                            result = new ValidatorResult(false);
+                            result.setErrorDetails("Doesn't support " +
+                                    jobSubmissionInterface.getJobSubmissionProtocol() + " protocol yet");
+                            return result;
+                    }
                 }
+                result = new ValidatorResult(false);
+                result.setErrorDetails("No JobSubmission interface found");
+                return result;
+            } else {
+                return new ValidatorResult(true);
             }
-            result = new ValidatorResult(false);
-            result.setErrorDetails("No JobSubmission interface found");
-            return result;
         } catch (Exception e) {
             result = new ValidatorResult(false);
             result.setErrorDetails("Exception occur while running validation process ");
@@ -144,29 +109,32 @@ public class JobCountValidator implements JobMetadataValidator {
 
     }
 
-    private boolean isJobSpaceAvailable(String communityUserName, List<ComputeResourceDescription> computeHostList) throws ApplicationSettingsException {
-        String keyPath = new StringBuilder("/" + Constants.STAT).append("/").append(communityUserName).append("/").toString();
-        for (ComputeResourceDescription computeResDesc : computeHostList) {
-            String key = keyPath + computeResDesc.getHostName() + "/" + Constants.JOB;
-            Map<String, Integer> jobCountMap = AiravataUtils.getJobCountMap(OrchestratorContext.getZk());
-            if (jobCountMap.containsKey(key)) {
-                int count = jobCountMap.get(key);
-                if (count < Integer.parseInt(ServerSettings.getGlobalMaxJobCount())) {
-                    return true;
-                }
-            }else {
-                return true;
+    private ValidatorResult isJobSpaceAvailable(String communityUserName, String computeHostName, int resourceMaxJobCount)
+            throws ApplicationSettingsException {
+        if (communityUserName == null) {
+            throw new IllegalArgumentException("Community user name should not be null");
+        }
+        if (computeHostName == null) {
+            throw new IllegalArgumentException("Compute resource should not be null");
+        }
+        String keyPath = new StringBuilder("/" + Constants.STAT).append("/").append(communityUserName)
+                .append("/").toString();
+        String key = keyPath + computeHostName + "/" + Constants.JOB;
+        Map<String, Integer> jobCountMap = AiravataUtils.getJobCountMap(OrchestratorContext.getZk());
+        if (jobCountMap.containsKey(key)) {
+            int count = jobCountMap.get(key);
+            logger.info("Submitted job count = " + count + ", max job count = " + resourceMaxJobCount);
+            if (count < resourceMaxJobCount) {
+                return new ValidatorResult(true);
             }
+        } else {
+            logger.info("Job count map doesn't has key : " + key);
+            return new ValidatorResult(true);
         }
-        return false;
-    }
-
-    private void getAppDeployment(String applicationId, TaskDetail taskData) throws AppCatalogException {
-        return;
-
-    }
-
-    private ApplicationDeploymentDescription getAppDeployment(AppCatalog appCatalog, TaskDetail taskData, String selectedModuleId) {
-        return null;
+        logger.info("Resource " + computeHostName + " doesn't has space to submit another job, " +
+                "Configured resource max job count is " + resourceMaxJobCount + ".");
+        ValidatorResult result = new ValidatorResult(false);
+        result.setErrorDetails("Please honour to the gobal max job count " + resourceMaxJobCount);
+        return result;
     }
 }