You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/06/24 21:49:18 UTC
git commit: move the orchestrator submitter to embedded mode
Repository: airavata
Updated Branches:
refs/heads/master 4217a17e7 -> d7c711667
move the orchestrator submitter to embedded mode
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d7c71166
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d7c71166
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d7c71166
Branch: refs/heads/master
Commit: d7c711667cc4be267fc85c149a3fd372c07b4f53
Parents: 4217a17
Author: lahiru <la...@apache.org>
Authored: Tue Jun 24 15:49:01 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Jun 24 15:49:01 2014 -0400
----------------------------------------------------------------------
.../client/samples/CreateLaunchExperiment.java | 226 ++++++++++++++-----
.../airavata/common/utils/AiravataZKUtils.java | 7 +
.../main/resources/airavata-server.properties | 4 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 10 +-
.../apache/airavata/gfac/core/cpi/GFacImpl.java | 2 +-
.../state/GfacExperimentStateChangeRequest.java | 2 +-
.../airavata/gfac/core/utils/GFacUtils.java | 203 ++++++++++++-----
.../gfac/core/utils/GfacExperimentState.java | 81 -------
.../core/impl/GFACServiceJobSubmitter.java | 3 +-
9 files changed, 333 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/d7c71166/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 170fb99..12f5435 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -64,23 +64,127 @@ public class CreateLaunchExperiment {
// addDescriptors();
// final String expId = createExperimentForSSHHost(airavata);
-// final String expId = createExperimentForTrestles(airavata);
+ final String expId = createExperimentForTrestles(airavata);
// final String expId = createExperimentForStampede(airavata);
- for (int i = 0; i < 1; i++) {
- final String expId = createExperimentForLocalHost(airavata);
+// final String expId = createExperimentForLocalHost(airavata);
// final String expId = createExperimentForLonestar(airavata);
// final String expId = createExperimentWRFTrestles(airavata);
- System.out.println("Experiment ID : " + expId);
+ System.out.println("Experiment ID : " + expId);
// updateExperiment(airavata, expId);
- launchExperiment(airavata, expId);
+ launchExperiment(airavata, expId);
+ System.out.println("Launched successfully");
+ List<Experiment> experiments = getExperimentsForUser(airavata, "admin");
+ List<ExperimentSummary> searchedExps1 = searchExperimentsByName(airavata, "admin", "echo");
+ List<ExperimentSummary> searchedExps2 = searchExperimentsByDesc(airavata, "admin", "Echo");
+ List<ExperimentSummary> searchedExps3 = searchExperimentsByApplication(airavata, "admin", "cho");
+ List<Project> projects = getAllUserProject(airavata, "admin");
+ List<Project> searchProjects1 = searchProjectsByProjectName(airavata, "admin", "project");
+ List<Project> searchProjects2 = searchProjectsByProjectDesc(airavata, "admin", "test");
+ for (Experiment exp : experiments){
+ System.out.println(" exp id : " + exp.getExperimentID());
+ System.out.println("experiment Description : " + exp.getDescription()) ;
+ if (exp.getExperimentStatus() != null) {
+ System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
+ }
+ }
+
+ for (ExperimentSummary exp : searchedExps1){
+ System.out.println("search results by experiment name");
+ System.out.println("experiment ID : " + exp.getExperimentID()) ;
+ System.out.println("experiment Description : " + exp.getDescription()) ;
+ if (exp.getExperimentStatus() != null) {
+ System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
+ }
+ }
+
+ for (ExperimentSummary exp : searchedExps2){
+ System.out.println("search results by experiment desc");
+ System.out.println("experiment ID : " + exp.getExperimentID()) ;
+ if (exp.getExperimentStatus() != null) {
+ System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
+ }
+ }
+
+ for (ExperimentSummary exp : searchedExps3){
+ System.out.println("search results by application");
+ System.out.println("experiment ID : " + exp.getExperimentID()) ;
+ if (exp.getExperimentStatus() != null) {
+ System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
+ }
}
+
+ for (Project pr : searchProjects1){
+ System.out.println(" project id : " + pr.getProjectID());
+ }
+
+ for (Project pr : searchProjects2){
+ System.out.println(" project id : " + pr.getProjectID());
+ System.out.println(" project desc : " + pr.getDescription());
+ }
+
+ Thread monitor = (new Thread(){
+ public void run() {
+ Map<String, JobStatus> jobStatuses = null;
+ while (true) {
+ try {
+ jobStatuses = airavata.getJobStatuses(expId);
+ Set<String> strings = jobStatuses.keySet();
+ for (String key : strings) {
+ JobStatus jobStatus = jobStatuses.get(key);
+ if(jobStatus == null){
+ return;
+ }else {
+ if (JobState.COMPLETE.equals(jobStatus.getJobState())) {
+ System.out.println("Job completed Job ID: " + key);
+ return;
+ }else{
+ System.out.println("Job ID:" + key + jobStatuses.get(key).getJobState().toString());
+ }
+ }
+ }
+ ExperimentStatus experimentStatus = airavata.getExperimentStatus(expId);
+ if(experimentStatus.getExperimentState().equals(ExperimentState.FAILED)){
+ return;
+ }
+ System.out.println(experimentStatus);
+ Thread.sleep(5000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+ });
+ monitor.start();
+ try {
+ monitor.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace(); // To change body of catch statement use
+ // File | Settings | File Templates.
+ }
+
+// System.out.println(airavata.getExperimentStatus(expId));
+ List<DataObjectType> output = airavata.getExperimentOutputs(expId);
+ for (DataObjectType dataObjectType : output) {
+ System.out.println(dataObjectType.getKey() + " : " + dataObjectType.getType() + " : " + dataObjectType.getValue());
+
+
+ }
+ String clonedExpId = cloneExperiment(airavata, expId);
+ System.out.println("Cloned Experiment ID : " + clonedExpId);
+// System.out.println("retrieved exp id : " + experiment.getExperimentID());
} catch (Exception e) {
logger.error("Error while connecting with server", e.getMessage());
e.printStackTrace();
}
}
- public static void addDescriptors() throws AiravataAPIInvocationException, ApplicationSettingsException {
+ public static void addDescriptors() throws AiravataAPIInvocationException,ApplicationSettingsException {
try {
DocumentCreator documentCreator = new DocumentCreator(getAiravataAPI());
documentCreator.createLocalHostDocs();
@@ -115,8 +219,8 @@ public class CreateLaunchExperiment {
return airavataAPI;
}
- public static String createExperimentForTrestles(Airavata.Client client) throws TException {
- try {
+ public static String createExperimentForTrestles(Airavata.Client client) throws TException {
+ try{
List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
DataObjectType input = new DataObjectType();
input.setKey("echo_input");
@@ -151,50 +255,50 @@ public class CreateLaunchExperiment {
} catch (AiravataClientException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new AiravataClientException(e);
- } catch (TException e) {
+ }catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
-
- public static String createExperimentWRFTrestles(Airavata.Client client) throws TException {
- try {
+
+ public static String createExperimentWRFTrestles(Airavata.Client client) throws TException {
+ try{
List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
DataObjectType input = new DataObjectType();
input.setKey("WRF_Namelist");
input.setType(DataType.URI);
input.setValue("/Users/raminder/Downloads/wrf_sample_inputs/namelist.input");
-
+
DataObjectType input1 = new DataObjectType();
input1.setKey("WRF_Input_File");
input1.setType(DataType.URI);
input1.setValue("/Users/raminder/Downloads/wrf_sample_inputs/wrfinput_d01");
-
+
DataObjectType input2 = new DataObjectType();
input2.setKey("WRF_Boundary_File");
input2.setType(DataType.URI);
input2.setValue("/Users/raminder/Downloads/wrf_sample_inputs/wrfbdy_d01");
-
+
exInputs.add(input);
exInputs.add(input1);
exInputs.add(input2);
-
+
List<DataObjectType> exOut = new ArrayList<DataObjectType>();
DataObjectType output = new DataObjectType();
output.setKey("WRF_Output");
output.setType(DataType.URI);
output.setValue("");
-
+
DataObjectType output1 = new DataObjectType();
output1.setKey("WRF_Execution_Log");
output1.setType(DataType.URI);
output1.setValue("");
-
-
+
+
exOut.add(output);
exOut.add(output1);
-
+
Experiment simpleExperiment =
ExperimentModelUtil.createSimpleExperiment("default", "admin", "WRFExperiment", "Testing", "WRF", exInputs);
@@ -216,35 +320,35 @@ public class CreateLaunchExperiment {
} catch (AiravataClientException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new AiravataClientException(e);
- } catch (TException e) {
+ }catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
- public static String cloneExperiment(Airavata.Client client, String expId) throws TException {
- try {
+ public static String cloneExperiment(Airavata.Client client, String expId) throws TException {
+ try{
return client.cloneExperiment(expId, "cloneExperiment1");
- } catch (TException e) {
+ }catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
- public static void updateExperiment(Airavata.Client client, String expId) throws TException {
- try {
+ public static void updateExperiment(Airavata.Client client, String expId) throws TException {
+ try{
Experiment experiment = client.getExperiment(expId);
experiment.setDescription("updatedDescription");
- client.updateExperiment(expId, experiment);
- } catch (TException e) {
+ client.updateExperiment(expId, experiment );
+ }catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
- public static String createExperimentForLocalHost(Airavata.Client client) throws TException {
- try {
+ public static String createExperimentForLocalHost(Airavata.Client client) throws TException {
+ try{
List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
DataObjectType input = new DataObjectType();
input.setKey("echo_input");
@@ -282,14 +386,14 @@ public class CreateLaunchExperiment {
} catch (AiravataClientException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new AiravataClientException(e);
- } catch (TException e) {
+ }catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
-
- public static String createExperimentForSSHHost(Airavata.Client client) throws TException {
- try {
+
+ public static String createExperimentForSSHHost(Airavata.Client client) throws TException {
+ try{
List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
DataObjectType input = new DataObjectType();
input.setKey("echo_input");
@@ -328,14 +432,13 @@ public class CreateLaunchExperiment {
} catch (AiravataClientException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new AiravataClientException(e);
- } catch (TException e) {
+ }catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
-
- public static String createExperimentForStampede(Airavata.Client client) throws TException {
- try {
+ public static String createExperimentForStampede(Airavata.Client client) throws TException {
+ try{
List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
DataObjectType input = new DataObjectType();
input.setKey("echo_input");
@@ -374,14 +477,13 @@ public class CreateLaunchExperiment {
} catch (AiravataClientException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new AiravataClientException(e);
- } catch (TException e) {
+ }catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
-
- public static String createExperimentForLonestar(Airavata.Client client) throws TException {
- try {
+ public static String createExperimentForLonestar(Airavata.Client client) throws TException {
+ try{
List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
DataObjectType input = new DataObjectType();
input.setKey("echo_input");
@@ -429,15 +531,15 @@ public class CreateLaunchExperiment {
}
}
throw e;
- } catch (TException e) {
+ }catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
-
-
- public static void launchExperiment(Airavata.Client client, String expId)
- throws TException {
+
+
+ public static void launchExperiment (Airavata.Client client, String expId)
+ throws TException{
try {
client.launchExperiment(expId, "testToken");
} catch (ExperimentNotFoundException e) {
@@ -452,13 +554,13 @@ public class CreateLaunchExperiment {
} catch (AiravataClientException e) {
logger.error("Error occured while launching the experiment...", e.getMessage());
throw new AiravataClientException(e);
- } catch (TException e) {
+ }catch (TException e) {
logger.error("Error occured while launching the experiment...", e.getMessage());
throw new TException(e);
}
}
- public static List<Experiment> getExperimentsForUser(Airavata.Client client, String user) {
+ public static List<Experiment> getExperimentsForUser (Airavata.Client client, String user){
try {
return client.getAllUserExperiments(user);
} catch (AiravataSystemException e) {
@@ -467,13 +569,13 @@ public class CreateLaunchExperiment {
e.printStackTrace();
} catch (AiravataClientException e) {
e.printStackTrace();
- } catch (TException e) {
+ }catch (TException e){
e.printStackTrace();
}
return null;
}
- public static List<Project> getAllUserProject(Airavata.Client client, String user) {
+ public static List<Project> getAllUserProject (Airavata.Client client, String user){
try {
return client.getAllUserProjects(user);
} catch (AiravataSystemException e) {
@@ -482,13 +584,13 @@ public class CreateLaunchExperiment {
e.printStackTrace();
} catch (AiravataClientException e) {
e.printStackTrace();
- } catch (TException e) {
+ }catch (TException e){
e.printStackTrace();
}
return null;
}
- public static List<Project> searchProjectsByProjectName(Airavata.Client client, String user, String projectName) {
+ public static List<Project> searchProjectsByProjectName (Airavata.Client client, String user, String projectName){
try {
return client.searchProjectsByProjectName(user, projectName);
} catch (AiravataSystemException e) {
@@ -497,13 +599,13 @@ public class CreateLaunchExperiment {
e.printStackTrace();
} catch (AiravataClientException e) {
e.printStackTrace();
- } catch (TException e) {
+ }catch (TException e){
e.printStackTrace();
}
return null;
}
- public static List<Project> searchProjectsByProjectDesc(Airavata.Client client, String user, String desc) {
+ public static List<Project> searchProjectsByProjectDesc (Airavata.Client client, String user, String desc){
try {
return client.searchProjectsByProjectDesc(user, desc);
} catch (AiravataSystemException e) {
@@ -512,14 +614,14 @@ public class CreateLaunchExperiment {
e.printStackTrace();
} catch (AiravataClientException e) {
e.printStackTrace();
- } catch (TException e) {
+ }catch (TException e){
e.printStackTrace();
}
return null;
}
- public static List<ExperimentSummary> searchExperimentsByName(Airavata.Client client, String user, String expName) {
+ public static List<ExperimentSummary> searchExperimentsByName (Airavata.Client client, String user, String expName){
try {
return client.searchExperimentsByName(user, expName);
} catch (AiravataSystemException e) {
@@ -528,13 +630,13 @@ public class CreateLaunchExperiment {
e.printStackTrace();
} catch (AiravataClientException e) {
e.printStackTrace();
- } catch (TException e) {
+ }catch (TException e){
e.printStackTrace();
}
return null;
}
- public static List<ExperimentSummary> searchExperimentsByDesc(Airavata.Client client, String user, String desc) {
+ public static List<ExperimentSummary> searchExperimentsByDesc(Airavata.Client client, String user, String desc){
try {
return client.searchExperimentsByDesc(user, desc);
} catch (AiravataSystemException e) {
@@ -543,13 +645,13 @@ public class CreateLaunchExperiment {
e.printStackTrace();
} catch (AiravataClientException e) {
e.printStackTrace();
- } catch (TException e) {
+ }catch (TException e){
e.printStackTrace();
}
return null;
}
- public static List<ExperimentSummary> searchExperimentsByApplication(Airavata.Client client, String user, String app) {
+ public static List<ExperimentSummary> searchExperimentsByApplication(Airavata.Client client, String user, String app){
try {
return client.searchExperimentsByApplication(user, app);
} catch (AiravataSystemException e) {
@@ -558,7 +660,7 @@ public class CreateLaunchExperiment {
e.printStackTrace();
} catch (AiravataClientException e) {
e.printStackTrace();
- } catch (TException e) {
+ }catch (TException e){
e.printStackTrace();
}
return null;
http://git-wip-us.apache.org/repos/asf/airavata/blob/d7c71166/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
index 7349b7e..be5f11e 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -37,6 +37,13 @@ public class AiravataZKUtils {
+ experimentId + "+" + taskId;
}
+ public static String getExpZnodeHandlerPath(String experimentId, String taskId,String className) throws ApplicationSettingsException {
+ return ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE) +
+ File.separator +
+ ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME) + File.separator
+ + experimentId + "+" + taskId + File.separator + className;
+ }
+
public static String getZKhostPort() throws ApplicationSettingsException {
return ServerSettings.getSetting(Constants.ZOOKEEPER_SERVER_HOST)
+ ":" + ServerSettings.getSetting(Constants.ZOOKEEPER_SERVER_PORT);
http://git-wip-us.apache.org/repos/asf/airavata/blob/d7c71166/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index ecc0932..726225a 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -279,8 +279,8 @@ connection.name=xsede
activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator
###---------------------------Orchestrator module Configurations---------------------------###
-#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
-job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
+job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
+#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
submitter.interval=10000
threadpool.size=10
http://git-wip-us.apache.org/repos/asf/airavata/blob/d7c71166/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 6fb3e24..fa7eb33 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -56,8 +56,9 @@ import org.apache.airavata.gfac.core.scheduler.HostScheduler;
import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
+import org.apache.airavata.gfac.core.states.GfacPluginState;
import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.core.utils.GfacExperimentState;
+import org.apache.airavata.gfac.core.states.GfacExperimentState;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.registry.api.AiravataRegistry2;
import org.apache.airavata.registry.cpi.RegistryModelType;
@@ -426,6 +427,7 @@ public class BetterGfacImpl implements GFac {
Class<? extends GFacHandler> handlerClass;
GFacHandler handler;
try {
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
handler = handlerClass.newInstance();
handler.initProperties(handlerClassName.getProperties());
@@ -438,6 +440,8 @@ public class BetterGfacImpl implements GFac {
}
try {
handler.invoke(jobExecutionContext);
+ GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKED);
+ // if exception thrown before that we do not make it finished
} catch (GFacHandlerException e) {
throw new GFacException("Error Executing a InFlow Handler", e.getCause());
}
@@ -581,6 +585,7 @@ public class BetterGfacImpl implements GFac {
Class<? extends GFacHandler> handlerClass;
GFacHandler handler;
try {
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
handler = handlerClass.newInstance();
handler.initProperties(handlerClassName.getProperties());
@@ -593,9 +598,12 @@ public class BetterGfacImpl implements GFac {
} catch (IllegalAccessException e) {
log.error(e.getMessage());
throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (Exception e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
}
try {
handler.invoke(jobExecutionContext);
+ GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKED);
} catch (Exception e) {
// TODO: Better error reporting.
throw new GFacException("Error Executing a OutFlow Handler", e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/d7c71166/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index 5bc789c..1bc639a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -58,7 +58,7 @@ import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.core.utils.GfacExperimentState;
+import org.apache.airavata.gfac.core.states.GfacExperimentState;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.registry.api.AiravataRegistry2;
import org.apache.airavata.registry.cpi.RegistryModelType;
http://git-wip-us.apache.org/repos/asf/airavata/blob/d7c71166/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
index 5f7f2c2..704bf26 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
@@ -22,7 +22,7 @@ package org.apache.airavata.gfac.core.monitor.state;
import org.apache.airavata.gfac.core.monitor.JobIdentity;
import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.utils.GfacExperimentState;
+import org.apache.airavata.gfac.core.states.GfacExperimentState;
public class GfacExperimentStateChangeRequest {
private GfacExperimentState state;
http://git-wip-us.apache.org/repos/asf/airavata/blob/d7c71166/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index f67b592..7624c8c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -20,11 +20,7 @@
*/
package org.apache.airavata.gfac.core.utils;
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
@@ -41,6 +37,8 @@ import org.apache.airavata.gfac.ExecutionMode;
import org.apache.airavata.gfac.GFacConfiguration;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.states.GfacExperimentState;
+import org.apache.airavata.gfac.core.states.GfacPluginState;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.api.workflow.ApplicationJob;
@@ -50,8 +48,11 @@ import org.apache.airavata.registry.cpi.CompositeIdentifier;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.schemas.gfac.*;
import org.apache.axiom.om.OMElement;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,16 +93,18 @@ public class GFacUtils {
/**
* this can be used to do framework opertaions specific to different modes
+ *
* @param jobExecutionContext
* @return
*/
- public static boolean isSynchronousMode(JobExecutionContext jobExecutionContext){
+ public static boolean isSynchronousMode(JobExecutionContext jobExecutionContext) {
GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
- if(ExecutionMode.ASYNCHRONOUS.equals(gFacConfiguration.getExecutionMode())){
+ if (ExecutionMode.ASYNCHRONOUS.equals(gFacConfiguration.getExecutionMode())) {
return false;
}
return true;
}
+
public static String readFileToString(String file) throws FileNotFoundException, IOException {
BufferedReader instream = null;
try {
@@ -139,9 +142,6 @@ public class GFacUtils {
}
-
-
-
public static String createGsiftpURIAsString(String host, String localPath) throws URISyntaxException {
StringBuffer buf = new StringBuffer();
if (!host.startsWith("gsiftp://"))
@@ -159,7 +159,7 @@ public class GFacUtils {
actualParameter = new ActualParameter(StringParameterType.type);
if (!"".equals(element.getValue())) {
((StringParameterType) actualParameter.getType()).setValue(element.getValue());
- } else {
+ } else {
((StringParameterType) actualParameter.getType()).setValue("");
}
} else if ("Double".equals(parameter.getParameterType().getName())) {
@@ -203,8 +203,7 @@ public class GFacUtils {
((StdOutParameterType) actualParameter.getType()).setValue("");
}
- }
- else if ("StdErr".equals(parameter.getParameterType().getName())) {
+ } else if ("StdErr".equals(parameter.getParameterType().getName())) {
actualParameter = new ActualParameter(StdErrParameterType.type);
if (!"".equals(element.getValue())) {
((StdErrParameterType) actualParameter.getType()).setValue(element.getValue());
@@ -215,6 +214,7 @@ public class GFacUtils {
}
return actualParameter;
}
+
public static ActualParameter getInputActualParameter(Parameter parameter, OMElement element) {
OMElement innerelement = null;
ActualParameter actualParameter = new ActualParameter();
@@ -394,7 +394,7 @@ public class GFacUtils {
return actualParameter;
}
- public static ActualParameter getInputActualParameter(Parameter parameter, String inputVal) throws GFacException{
+ public static ActualParameter getInputActualParameter(Parameter parameter, String inputVal) throws GFacException {
OMElement innerelement = null;
ActualParameter actualParameter = new ActualParameter();
if ("String".equals(parameter.getParameterType().getName())) {
@@ -474,7 +474,7 @@ public class GFacUtils {
innerelement = (OMElement) value.next();
((URIArrayType) actualParameter.getType()).insertValue(i++, innerelement.getText());
}
- } else{
+ } else {
throw new GFacException("Input parameters are not configured properly ");
}
return actualParameter;
@@ -604,55 +604,57 @@ public class GFacUtils {
log.error("Error in persisting application job data for application job " + job.getJobId() + "!!!", e);
}
}
+
public static void saveJobStatus(JobExecutionContext jobExecutionContext, JobDetails details, JobState state) throws GFacException {
- try {
- Registry registry = jobExecutionContext.getRegistry();
- JobStatus status = new JobStatus();
- status.setJobState(state);
- details.setJobStatus(status);
- registry.add(ChildDataType.JOB_DETAIL,details, new CompositeIdentifier(jobExecutionContext.getTaskData().getTaskID(), details.getJobID()));
- } catch (Exception e) {
- throw new GFacException("Error persisting job status" + e.getLocalizedMessage(),e);
- }
- }
-
- public static void updateJobStatus(JobExecutionContext jobExecutionContext,JobDetails details, JobState state) throws GFacException {
- try {
+ try {
Registry registry = jobExecutionContext.getRegistry();
- JobStatus status = new JobStatus();
- status.setJobState(state);
- status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
- details.setJobStatus(status);
- registry.update(org.apache.airavata.registry.cpi.RegistryModelType.JOB_DETAIL, details, details.getJobID());
- } catch (Exception e) {
- throw new GFacException("Error persisting job status" + e.getLocalizedMessage(),e);
- }
- }
+ JobStatus status = new JobStatus();
+ status.setJobState(state);
+ details.setJobStatus(status);
+ registry.add(ChildDataType.JOB_DETAIL, details, new CompositeIdentifier(jobExecutionContext.getTaskData().getTaskID(), details.getJobID()));
+ } catch (Exception e) {
+ throw new GFacException("Error persisting job status" + e.getLocalizedMessage(), e);
+ }
+ }
+
+ public static void updateJobStatus(JobExecutionContext jobExecutionContext, JobDetails details, JobState state) throws GFacException {
+ try {
+ Registry registry = jobExecutionContext.getRegistry();
+ JobStatus status = new JobStatus();
+ status.setJobState(state);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ details.setJobStatus(status);
+ registry.update(org.apache.airavata.registry.cpi.RegistryModelType.JOB_DETAIL, details, details.getJobID());
+ } catch (Exception e) {
+ throw new GFacException("Error persisting job status" + e.getLocalizedMessage(), e);
+ }
+ }
+
public static void saveErrorDetails(JobExecutionContext jobExecutionContext, String errorMessage, CorrectiveAction action, ErrorCategory errorCatogory) throws GFacException {
- try {
- Registry registry = RegistryFactory.getDefaultRegistry();
- ErrorDetails details = new ErrorDetails();
- details.setActualErrorMessage(errorMessage);
- details.setCorrectiveAction(action);
- details.setActionableGroup(ActionableGroup.GATEWAYS_ADMINS);
- details.setCreationTime(Calendar.getInstance().getTimeInMillis());
- details.setErrorCategory(errorCatogory);
- registry.add(ChildDataType.ERROR_DETAIL, details, jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e) {
- throw new GFacException("Error persisting job status" + e.getLocalizedMessage(),e);
- }
+ try {
+ Registry registry = RegistryFactory.getDefaultRegistry();
+ ErrorDetails details = new ErrorDetails();
+ details.setActualErrorMessage(errorMessage);
+ details.setCorrectiveAction(action);
+ details.setActionableGroup(ActionableGroup.GATEWAYS_ADMINS);
+ details.setCreationTime(Calendar.getInstance().getTimeInMillis());
+ details.setErrorCategory(errorCatogory);
+ registry.add(ChildDataType.ERROR_DETAIL, details, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e) {
+ throw new GFacException("Error persisting job status" + e.getLocalizedMessage(), e);
+ }
}
public static Map<String, Object> getMessageContext(List<DataObjectType> experimentData,
Parameter[] parameters) throws GFacException {
HashMap<String, Object> stringObjectHashMap = new HashMap<String, Object>();
- Map<String,DataObjectType> map = new HashMap<String,DataObjectType>();
- for(DataObjectType objectType : experimentData){
- map.put(objectType.getKey(), objectType);
+ Map<String, DataObjectType> map = new HashMap<String, DataObjectType>();
+ for (DataObjectType objectType : experimentData) {
+ map.put(objectType.getKey(), objectType);
}
for (int i = 0; i < parameters.length; i++) {
- DataObjectType input = map.get(parameters[i].getParameterName());
+ DataObjectType input = map.get(parameters[i].getParameterName());
if (input != null) {
stringObjectHashMap.put(parameters[i].getParameterName(), GFacUtils.getInputActualParameter(parameters[i], input));
} else {
@@ -662,7 +664,7 @@ public class GFacUtils {
return stringObjectHashMap;
}
- public static GfacExperimentState getZKExperimentState(ZooKeeper zk,JobExecutionContext jobExecutionContext)
+ public static GfacExperimentState getZKExperimentState(ZooKeeper zk, JobExecutionContext jobExecutionContext)
throws ApplicationSettingsException, KeeperException, InterruptedException {
String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext.getExperimentID(),
jobExecutionContext.getTaskData().getTaskID());
@@ -673,11 +675,102 @@ public class GFacUtils {
throws ApplicationSettingsException, KeeperException, InterruptedException {
String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext.getExperimentID(),
jobExecutionContext.getTaskData().getTaskID());
- if(expState == null){
+ if (expState == null) {
return -1;
}
return Integer.parseInt(expState);
}
+ public static boolean createPluginZnode(ZooKeeper zk, JobExecutionContext jobExecutionContext, String className)
+ throws ApplicationSettingsException, KeeperException, InterruptedException {
+ String expState = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID(), className);
+ Stat exists = zk.exists(expState, false);
+ if (exists == null) {
+ zk.create(expState, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ zk.create(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } else {
+ exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ if (exists == null) {
+ zk.create(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ }
+
+ exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ if (exists != null) {
+ zk.setData(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, GfacPluginState.INVOKING.toString().getBytes(),
+ exists.getVersion());
+ }
+ return true;
+ }
+
+ public static boolean createPluginZnode(ZooKeeper zk, JobExecutionContext jobExecutionContext, String className,
+ GfacPluginState state)throws ApplicationSettingsException, KeeperException, InterruptedException {
+ String expState = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID(), className);
+ Stat exists = zk.exists(expState, false);
+ if (exists == null) {
+ zk.create(expState, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ zk.create(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } else {
+ exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ if (exists == null) {
+ zk.create(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ }
+
+ exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ if (exists != null) {
+ zk.setData(expState + File.separator +
+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes(),
+ exists.getVersion());
+ }
+ return true;
+ }
+
+ public static boolean updatePluginState(ZooKeeper zk, JobExecutionContext jobExecutionContext, String className,
+ GfacPluginState state)
+ throws ApplicationSettingsException, KeeperException, InterruptedException {
+ String expState = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID(), className);
+
+ Stat exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ if (exists != null) {
+ zk.setData(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes(),
+ exists.getVersion());
+ }else {
+ createPluginZnode(zk, jobExecutionContext, className,state);
+ }
+ return true;
+ }
+
+ public static boolean getPluginState(ZooKeeper zk, JobExecutionContext jobExecutionContext, String className) {
+ try {
+ String expState = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID(), className);
+
+ Stat exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ if (exists != null) {
+ return Boolean.valueOf(new String(zk.getData(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false, exists)));
+ }
+ return false; // if the node doesn't exist or any other error we return false
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d7c71166/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java
deleted file mode 100644
index db2cab0..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Autogenerated by Thrift Compiler (0.9.1)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
- */
-package org.apache.airavata.gfac.core.utils;
-
-
-public enum GfacExperimentState {
- LAUNCHED(0),
- ACCEPTED(1),
- INHANDLERSINVOKING(2),
- INHANDLERSINVOKED(3),
- PROVIDERINVOKING(4),
- PROVIDERINVOKED(5),
- OUTHANDLERSINVOKING(6),
- OUTHANDLERSINVOKED(7),
- COMPLETED(8),
- FAILED(9),
- UNKNOWN(10);
-
- private final int value;
-
- private GfacExperimentState(int value) {
- this.value = value;
- }
-
- /**
- * Get the integer value of this enum value, as defined in the Thrift IDL.
- */
- public int getValue() {
- return value;
- }
-
- /**
- * Find a the enum type by its integer value, as defined in the Thrift IDL.
- *
- * @return null if the value is not found.
- */
- public static GfacExperimentState findByValue(int value) {
- switch (value) {
- case 0:
- return INHANDLERSINVOKING;
- case 1:
- return INHANDLERSINVOKED;
- case 2:
- return PROVIDERINVOKING;
- case 3:
- return PROVIDERINVOKED;
- case 4:
- return OUTHANDLERSINVOKING;
- case 5:
- return OUTHANDLERSINVOKED;
- case 6:
- return COMPLETED;
- case 7:
- return FAILED;
- case 8:
- return UNKNOWN;
- default:
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d7c71166/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
index d85e731..00cb130 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
@@ -21,11 +21,10 @@
package org.apache.airavata.orchestrator.core.impl;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.cpi.GfacService;
-import org.apache.airavata.gfac.core.utils.GfacExperimentState;
+import org.apache.airavata.gfac.core.states.GfacExperimentState;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;