You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/19 21:08:31 UTC
airavata git commit: adding more fixes to worker based submission
Repository: airavata
Updated Branches:
refs/heads/queue-gfac-rabbitmq 48be39fea -> cb6c4ccf2
adding more fixes to worker based submission
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/cb6c4ccf
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/cb6c4ccf
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/cb6c4ccf
Branch: refs/heads/queue-gfac-rabbitmq
Commit: cb6c4ccf267c165e64d6858584b2300eaaa37df4
Parents: 48be39f
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Thu Mar 19 16:08:29 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Thu Mar 19 16:08:29 2015 -0400
----------------------------------------------------------------------
.../airavata/api/server/AiravataAPIServer.java | 4 +-
.../client/samples/CreateLaunchExperiment.java | 432 ++++++++++---------
.../airavata/common/utils/AiravataZKUtils.java | 5 +
.../apache/airavata/common/utils/Constants.java | 1 +
.../main/resources/airavata-server.properties | 10 +-
.../main/resources/airavata-server.properties | 1 +
.../airavata/gfac/server/GfacServerHandler.java | 14 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 5 +-
.../core/monitor/GfacInternalStatusUpdator.java | 32 +-
.../airavata/gfac/core/monitor/MonitorID.java | 3 +-
.../airavata/gfac/core/utils/GFacUtils.java | 127 +++---
.../gfac/local/provider/impl/LocalProvider.java | 4 +-
.../airavata/gfac/monitor/util/CommonUtils.java | 2 +-
.../core/impl/RabbitMQTaskLaunchConsumer.java | 5 +-
.../server/OrchestratorServerHandler.java | 4 +-
.../util/OrchestratorRecoveryHandler.java | 2 +-
.../core/impl/GFACPassiveJobSubmitter.java | 44 +-
.../core/impl/GFACRPCJobSubmitter.java | 4 +-
18 files changed, 369 insertions(+), 330 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
index 2556df1..159e0e4 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
@@ -127,7 +127,7 @@ public class AiravataAPIServer implements IServer, Watcher{
String rabbitMqBrokerURL = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.RABBITMQ_BROKER_URL);
String rabbitMqExchange = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.RABBITMQ_EXCHANGE);
String rabbitMq = rabbitMqBrokerURL + File.separator + rabbitMqExchange;
- zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is required, this will only use to store some data
+ zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); // no watcher is required, this will only use to store some data
String apiServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_API_SERVER_NODE, "/airavata-server");
String OrchServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE, "/orchestrator-server");
String gfacServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
@@ -309,7 +309,7 @@ public class AiravataAPIServer implements IServer, Watcher{
case Expired:case Disconnected:
try {
mutex = -1;
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this);
+ zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
synchronized (mutex) {
mutex.wait(); // waiting for the syncConnected event
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index f499345..231da87 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -52,13 +52,13 @@ public class CreateLaunchExperiment {
public static final int THRIFT_SERVER_PORT = 8930;
// public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org";
// public static final int THRIFT_SERVER_PORT = 9930;
-
+
private final static Logger logger = LoggerFactory.getLogger(CreateLaunchExperiment.class);
private static final String DEFAULT_USER = "default.registry.user";
private static final String DEFAULT_GATEWAY = "php_reference_gateway";
private static Airavata.Client airavataClient;
- private static String echoAppId = "Echo_1365a7fd-eae1-4575-b447-99afb4d79c82";
+ private static String echoAppId = "Echo_78c785c6-3748-4a93-8569-19fee56581bc";
private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9";
private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
private static String amberAppId = "Amber_aa083c86-4680-4002-b3ef-fad93c181926";
@@ -78,27 +78,26 @@ public class CreateLaunchExperiment {
private static String gatewayId;
- // unicore service endpoint url
+ // unicore service endpoint url
private static final String unicoreEndPointURL = "https://fsd-cloud15.zam.kfa-juelich.de:7000/INTEROP1/services/BESFactory?res=default_bes_factory";
-
-
+
+
public static void main(String[] args) throws Exception {
airavataClient = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
System.out.println("API version is " + airavataClient.getAPIVersion());
- getAvailableAppInterfaceComputeResources("Echo_4fb76cb3-6bf6-409e-aa17-7cc7ee6e41af");
// createGateway();
// getGateway("testGatewayId");
// registerApplications(); // run this only the first time
-// createAndLaunchExp();
+ createAndLaunchExp();
}
-
+
private static String fsdResourceId;
public static void getAvailableAppInterfaceComputeResources(String appInterfaceId) {
try {
Map<String, String> availableAppInterfaceComputeResources = airavataClient.getAvailableAppInterfaceComputeResources(appInterfaceId);
- for (String key : availableAppInterfaceComputeResources.keySet()){
+ for (String key : availableAppInterfaceComputeResources.keySet()) {
System.out.println("id : " + key);
System.out.println("name : " + availableAppInterfaceComputeResources.get(key));
}
@@ -115,7 +114,7 @@ public class CreateLaunchExperiment {
}
- public static void createGateway(){
+ public static void createGateway() {
try {
Gateway gateway = new Gateway();
gateway.setGatewayId("testGatewayId2");
@@ -134,14 +133,14 @@ public class CreateLaunchExperiment {
}
- public static void getGateway(String gatewayId){
+ public static void getGateway(String gatewayId) {
try {
Gateway gateway = airavataClient.getGateway(gatewayId);
gateway.setDomain("testDomain");
airavataClient.updateGateway(gatewayId, gateway);
List<Gateway> allGateways = airavataClient.getAllGateways();
System.out.println(allGateways.size());
- if (airavataClient.isGatewayExist(gatewayId)){
+ if (airavataClient.isGatewayExist(gatewayId)) {
Gateway gateway1 = airavataClient.getGateway(gatewayId);
System.out.println(gateway1.getGatewayName());
}
@@ -161,10 +160,9 @@ public class CreateLaunchExperiment {
public static void createAndLaunchExp() throws TException {
-// final String expId = createEchoExperimentForFSD(airavataClient);
List<String> experimentIds = new ArrayList<String>();
try {
- for (int i = 0; i < 1; i++) {
+ for (int i = 0; i < 100; i++) {
// final String expId = createExperimentForSSHHost(airavata);
// final String expId = createEchoExperimentForFSD(airavataClient);
// final String expId = createMPIExperimentForFSD(airavataClient);
@@ -192,11 +190,11 @@ public class CreateLaunchExperiment {
launchExperiment(airavataClient, expId);
}
- Thread.sleep(100);
- for (String exId : experimentIds) {
- Experiment experiment = airavataClient.getExperiment(exId);
- System.out.println(experiment.getExperimentStatus().toString());
- }
+ Thread.sleep(10000);
+ for (String exId : experimentIds) {
+ Experiment experiment = airavataClient.getExperiment(exId);
+ System.out.println(experiment.getExperimentID() + " " + experiment.getExperimentStatus().getExperimentState().name());
+ }
} catch (Exception e) {
@@ -206,8 +204,7 @@ public class CreateLaunchExperiment {
}
}
-
-
+
public static void registerApplications() {
RegisterSampleApplications registerSampleApplications = new RegisterSampleApplications(airavataClient);
@@ -233,25 +230,25 @@ public class CreateLaunchExperiment {
}
public static String registerUnicoreEndpoint(String hostName, String hostDesc, JobSubmissionProtocol protocol, SecurityProtocol securityProtocol) throws TException {
-
- ComputeResourceDescription computeResourceDescription = RegisterSampleApplicationsUtils
- .createComputeResourceDescription(hostName, hostDesc, null, null);
-
- fsdResourceId = airavataClient.registerComputeResource(computeResourceDescription);
-
- if (fsdResourceId.isEmpty())
- throw new AiravataClientException();
-
- System.out.println("FSD Compute ResourceID: "+fsdResourceId);
-
- JobSubmissionInterface jobSubmission = RegisterSampleApplicationsUtils.createJobSubmissionInterface(fsdResourceId, protocol, 2);
- UnicoreJobSubmission ucrJobSubmission = new UnicoreJobSubmission();
- ucrJobSubmission.setSecurityProtocol(securityProtocol);
- ucrJobSubmission.setUnicoreEndPointURL(unicoreEndPointURL);
-
- return jobSubmission.getJobSubmissionInterfaceId();
- }
-
+
+ ComputeResourceDescription computeResourceDescription = RegisterSampleApplicationsUtils
+ .createComputeResourceDescription(hostName, hostDesc, null, null);
+
+ fsdResourceId = airavataClient.registerComputeResource(computeResourceDescription);
+
+ if (fsdResourceId.isEmpty())
+ throw new AiravataClientException();
+
+ System.out.println("FSD Compute ResourceID: " + fsdResourceId);
+
+ JobSubmissionInterface jobSubmission = RegisterSampleApplicationsUtils.createJobSubmissionInterface(fsdResourceId, protocol, 2);
+ UnicoreJobSubmission ucrJobSubmission = new UnicoreJobSubmission();
+ ucrJobSubmission.setSecurityProtocol(securityProtocol);
+ ucrJobSubmission.setUnicoreEndPointURL(unicoreEndPointURL);
+
+ return jobSubmission.getJobSubmissionInterfaceId();
+ }
+
public static String createEchoExperimentForTrestles(Airavata.Client client) throws TException {
try {
List<InputDataObjectType> exInputs = client.getApplicationInputs(echoAppId);
@@ -299,28 +296,27 @@ public class CreateLaunchExperiment {
}
return null;
}
-
-
+
+
public static String createEchoExperimentForFSD(Airavata.Client client) throws TException {
try {
- List<InputDataObjectType> exInputs = client.getApplicationInputs(echoAppId);
+ List<InputDataObjectType> exInputs = client.getApplicationInputs(echoAppId);
for (InputDataObjectType inputDataObjectType : exInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo")) {
- inputDataObjectType.setValue("Hello World");
- }else if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo2")) {
+ if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo")) {
+ inputDataObjectType.setValue("Hello World");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo2")) {
inputDataObjectType.setValue("http://www.textfiles.com/100/ad.txt");
- }else if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo3")) {
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo3")) {
inputDataObjectType.setValue("file:///tmp/test.txt");
- }
+ }
}
List<OutputDataObjectType> exOut = client.getApplicationOutputs(echoAppId);
- Experiment simpleExperiment =
+ Experiment simpleExperiment =
ExperimentModelUtil.createSimpleExperiment("default", "admin", "echoExperiment", "SimpleEcho2", echoAppId, exInputs);
simpleExperiment.setExperimentOutputs(exOut);
-
-
-
+
+
Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(echoAppId);
if (computeResources != null && computeResources.size() != 0) {
for (String id : computeResources.keySet()) {
@@ -331,13 +327,13 @@ public class CreateLaunchExperiment {
userConfigurationData.setAiravataAutoSchedule(false);
userConfigurationData.setOverrideManualScheduledParams(false);
userConfigurationData.setComputationalResourceScheduling(scheduling);
-
+
// set output directory
AdvancedOutputDataHandling dataHandling = new AdvancedOutputDataHandling();
- dataHandling.setOutputDataDir("/tmp/airavata/output/"+UUID.randomUUID().toString()+"/");
+ dataHandling.setOutputDataDir("/tmp/airavata/output/" + UUID.randomUUID().toString() + "/");
userConfigurationData.setAdvanceOutputDataHandling(dataHandling);
simpleExperiment.setUserConfigurationData(userConfigurationData);
-
+
return client.createExperiment(DEFAULT_GATEWAY, simpleExperiment);
}
}
@@ -357,25 +353,24 @@ public class CreateLaunchExperiment {
}
return null;
}
-
-
+
+
public static String createMPIExperimentForFSD(Airavata.Client client) throws TException {
try {
- List<InputDataObjectType> exInputs = client.getApplicationInputs(mpiAppId);
+ List<InputDataObjectType> exInputs = client.getApplicationInputs(mpiAppId);
for (InputDataObjectType inputDataObjectType : exInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("Sample_Input")) {
- inputDataObjectType.setValue("");
- }
+ if (inputDataObjectType.getName().equalsIgnoreCase("Sample_Input")) {
+ inputDataObjectType.setValue("");
+ }
}
List<OutputDataObjectType> exOut = client.getApplicationOutputs(mpiAppId);
-
- Experiment simpleExperiment =
+
+ Experiment simpleExperiment =
ExperimentModelUtil.createSimpleExperiment("default", "admin", "mpiExperiment", "HelloMPI", mpiAppId, null);
- simpleExperiment.setExperimentOutputs(exOut);
-
-
-
+ simpleExperiment.setExperimentOutputs(exOut);
+
+
Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(mpiAppId);
if (computeResources != null && computeResources.size() != 0) {
for (String id : computeResources.keySet()) {
@@ -386,13 +381,13 @@ public class CreateLaunchExperiment {
userConfigurationData.setAiravataAutoSchedule(false);
userConfigurationData.setOverrideManualScheduledParams(false);
userConfigurationData.setComputationalResourceScheduling(scheduling);
-
+
// set output directory
AdvancedOutputDataHandling dataHandling = new AdvancedOutputDataHandling();
- dataHandling.setOutputDataDir("/tmp/airavata/output/"+UUID.randomUUID().toString()+"/");
+ dataHandling.setOutputDataDir("/tmp/airavata/output/" + UUID.randomUUID().toString() + "/");
userConfigurationData.setAdvanceOutputDataHandling(dataHandling);
simpleExperiment.setUserConfigurationData(userConfigurationData);
-
+
return client.createExperiment(DEFAULT_GATEWAY, simpleExperiment);
}
}
@@ -413,15 +408,13 @@ public class CreateLaunchExperiment {
return null;
}
-
-
-
+
public static String createExperimentWRFStampede(Airavata.Client client) throws TException {
try {
- List<InputDataObjectType> exInputs = client.getApplicationInputs(wrfAppId);
+ List<InputDataObjectType> exInputs = client.getApplicationInputs(wrfAppId);
setWRFInputs(exInputs);
List<OutputDataObjectType> exOut = client.getApplicationOutputs(wrfAppId);
-
+
Experiment simpleExperiment =
ExperimentModelUtil.createSimpleExperiment("default", "admin", "WRFExperiment", "Testing", wrfAppId, exInputs);
simpleExperiment.setExperimentOutputs(exOut);
@@ -458,26 +451,26 @@ public class CreateLaunchExperiment {
}
- private static void setWRFInputs(List<InputDataObjectType> exInputs) {
- for (InputDataObjectType inputDataObjectType : exInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("Config_Namelist_File")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/WRF_FILES/namelist.input");
- }else if (inputDataObjectType.getName().equalsIgnoreCase("WRF_Initial_Conditions")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/WRF_FILES/wrfinput_d01");
- }else if (inputDataObjectType.getName().equalsIgnoreCase("WRF_Boundary_File")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/WRF_FILES/wrfbdy_d01");
- }
- }
- }
+ private static void setWRFInputs(List<InputDataObjectType> exInputs) {
+ for (InputDataObjectType inputDataObjectType : exInputs) {
+ if (inputDataObjectType.getName().equalsIgnoreCase("Config_Namelist_File")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/WRF_FILES/namelist.input");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("WRF_Initial_Conditions")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/WRF_FILES/wrfinput_d01");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("WRF_Boundary_File")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/WRF_FILES/wrfbdy_d01");
+ }
+ }
+ }
public static String createExperimentGROMACSStampede(Airavata.Client client) throws TException {
try {
-
- List<InputDataObjectType> exInputs = client.getApplicationInputs(gromacsAppId);
- setGROMACSInputs(exInputs);
+
+ List<InputDataObjectType> exInputs = client.getApplicationInputs(gromacsAppId);
+ setGROMACSInputs(exInputs);
List<OutputDataObjectType> exOut = client.getApplicationOutputs(gromacsAppId);
-
+
Experiment simpleExperiment =
ExperimentModelUtil.createSimpleExperiment("default", "admin", "GromacsExperiment", "Testing", gromacsAppId, exInputs);
simpleExperiment.setExperimentOutputs(exOut);
@@ -514,20 +507,21 @@ public class CreateLaunchExperiment {
}
private static void setGROMACSInputs(List<InputDataObjectType> exInputs) {
- for (InputDataObjectType inputDataObjectType : exInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("GROMOS_Coordinate_File")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/GROMMACS_FILES/pdb1y6l-EM-vacuum.gro");
- }else if (inputDataObjectType.getName().equalsIgnoreCase("Portable_Input_Binary_File")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/GROMMACS_FILES/pdb1y6l-EM-vacuum.tpr");
- }
- }
- }
+ for (InputDataObjectType inputDataObjectType : exInputs) {
+ if (inputDataObjectType.getName().equalsIgnoreCase("GROMOS_Coordinate_File")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/GROMMACS_FILES/pdb1y6l-EM-vacuum.gro");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("Portable_Input_Binary_File")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/GROMMACS_FILES/pdb1y6l-EM-vacuum.tpr");
+ }
+ }
+ }
+
public static String createExperimentESPRESSOStampede(Airavata.Client client) throws TException {
try {
- List<InputDataObjectType> exInputs = client.getApplicationInputs(espressoAppId);
- setESPRESSOInputs(exInputs);
+ List<InputDataObjectType> exInputs = client.getApplicationInputs(espressoAppId);
+ setESPRESSOInputs(exInputs);
List<OutputDataObjectType> exOut = client.getApplicationOutputs(espressoAppId);
-
+
Experiment simpleExperiment =
ExperimentModelUtil.createSimpleExperiment("default", "admin", "EspressoExperiment", "Testing", espressoAppId, exInputs);
simpleExperiment.setExperimentOutputs(exOut);
@@ -562,22 +556,23 @@ public class CreateLaunchExperiment {
}
return null;
}
+
private static void setESPRESSOInputs(List<InputDataObjectType> exInputs) {
- for (InputDataObjectType inputDataObjectType : exInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("AI_Pseudopotential_File")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/ESPRESSO_FILES/Al.sample.in");
- }else if (inputDataObjectType.getName().equalsIgnoreCase("AI_Primitive_Cell")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/ESPRESSO_FILES/Al.pz-vbc.UPF");
- }
- }
-}
+ for (InputDataObjectType inputDataObjectType : exInputs) {
+ if (inputDataObjectType.getName().equalsIgnoreCase("AI_Pseudopotential_File")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/ESPRESSO_FILES/Al.sample.in");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("AI_Primitive_Cell")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/ESPRESSO_FILES/Al.pz-vbc.UPF");
+ }
+ }
+ }
public static String createExperimentTRINITYStampede(Airavata.Client client) throws TException {
try {
- List<InputDataObjectType> exInputs = client.getApplicationInputs(trinityAppId);
- setTRINITYInputs(exInputs);
+ List<InputDataObjectType> exInputs = client.getApplicationInputs(trinityAppId);
+ setTRINITYInputs(exInputs);
List<OutputDataObjectType> exOut = client.getApplicationOutputs(trinityAppId);
-
+
Experiment simpleExperiment =
ExperimentModelUtil.createSimpleExperiment("default", "admin", "TrinityExperiment", "Testing", trinityAppId, exInputs);
simpleExperiment.setExperimentOutputs(exOut);
@@ -612,21 +607,23 @@ public class CreateLaunchExperiment {
}
return null;
}
+
private static void setTRINITYInputs(List<InputDataObjectType> exInputs) {
- for (InputDataObjectType inputDataObjectType : exInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("RNA_Seq_Left_Input")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/TRINITY_FILES/reads.left.fq");
- }else if (inputDataObjectType.getName().equalsIgnoreCase("RNA_Seq_Right_Input")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/TRINITY_FILES/reads.right.fq");
- }
- }
+ for (InputDataObjectType inputDataObjectType : exInputs) {
+ if (inputDataObjectType.getName().equalsIgnoreCase("RNA_Seq_Left_Input")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/TRINITY_FILES/reads.left.fq");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("RNA_Seq_Right_Input")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/TRINITY_FILES/reads.right.fq");
+ }
+ }
}
+
public static String createExperimentLAMMPSStampede(Airavata.Client client) throws TException {
try {
- List<InputDataObjectType> exInputs = client.getApplicationInputs(lammpsAppId);
- setLAMMPSInputs(exInputs);
+ List<InputDataObjectType> exInputs = client.getApplicationInputs(lammpsAppId);
+ setLAMMPSInputs(exInputs);
List<OutputDataObjectType> exOut = client.getApplicationOutputs(lammpsAppId);
-
+
Experiment simpleExperiment =
ExperimentModelUtil.createSimpleExperiment("default", "admin", "LAMMPSExperiment", "Testing", lammpsAppId, exInputs);
simpleExperiment.setExperimentOutputs(exOut);
@@ -661,17 +658,19 @@ public class CreateLaunchExperiment {
}
return null;
}
+
private static void setLAMMPSInputs(List<InputDataObjectType> exInputs) {
- for (InputDataObjectType inputDataObjectType : exInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("Friction_Simulation_Input")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/LAMMPS_FILES/in.friction");
- }
- }
+ for (InputDataObjectType inputDataObjectType : exInputs) {
+ if (inputDataObjectType.getName().equalsIgnoreCase("Friction_Simulation_Input")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/LAMMPS_FILES/in.friction");
+ }
+ }
}
+
public static String createExperimentNWCHEMStampede(Airavata.Client client) throws TException {
try {
- List<InputDataObjectType> exInputs = client.getApplicationInputs(nwchemAppId);
- setNWCHEMInputs(exInputs);
+ List<InputDataObjectType> exInputs = client.getApplicationInputs(nwchemAppId);
+ setNWCHEMInputs(exInputs);
List<OutputDataObjectType> exOut = client.getApplicationOutputs(nwchemAppId);
Experiment simpleExperiment =
@@ -708,17 +707,19 @@ public class CreateLaunchExperiment {
}
return null;
}
+
private static void setNWCHEMInputs(List<InputDataObjectType> exInputs) {
- for (InputDataObjectType inputDataObjectType : exInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("Water_Molecule_Input")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/NWCHEM_FILES/water.nw");
- }
- }
+ for (InputDataObjectType inputDataObjectType : exInputs) {
+ if (inputDataObjectType.getName().equalsIgnoreCase("Water_Molecule_Input")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/NWCHEM_FILES/water.nw");
+ }
+ }
}
+
public static String createExperimentAUTODOCKStampede(Airavata.Client client) throws TException {
try {
- List<InputDataObjectType> exInputs = client.getApplicationInputs(nwchemAppId);
- setAUTODOCKInputs(exInputs);
+ List<InputDataObjectType> exInputs = client.getApplicationInputs(nwchemAppId);
+ setAUTODOCKInputs(exInputs);
List<OutputDataObjectType> exOut = client.getApplicationOutputs(nwchemAppId);
Experiment simpleExperiment =
@@ -755,43 +756,45 @@ public class CreateLaunchExperiment {
}
return null;
}
+
private static void setAUTODOCKInputs(List<InputDataObjectType> exInputs) {
- for (InputDataObjectType inputDataObjectType : exInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("AD4_parameters.dat")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/AD4_parameters.dat");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.A.map")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.A.map");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.C.map")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.C.map");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.d.map")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.d.map");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.e.map")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.e.map");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.HD.map")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.HD.map");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.maps.fld")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.maps.fld");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.NA.map")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.NA.map");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.N.map")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.N.map");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.OA.map")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.OA.map");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("ind.dpf")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/ind.dpf");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("ind.pdbqt")) {
- inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/ind.pdbqt");
- }
- }
+ for (InputDataObjectType inputDataObjectType : exInputs) {
+ if (inputDataObjectType.getName().equalsIgnoreCase("AD4_parameters.dat")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/AD4_parameters.dat");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.A.map")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.A.map");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.C.map")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.C.map");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.d.map")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.d.map");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.e.map")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.e.map");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.HD.map")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.HD.map");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.maps.fld")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.maps.fld");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.NA.map")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.NA.map");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.N.map")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.N.map");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("hsg1.OA.map")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/hsg1.OA.map");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("ind.dpf")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/ind.dpf");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("ind.pdbqt")) {
+ inputDataObjectType.setValue("/Users/shameera/Downloads/PHP-Gateway-Scripts/appScripts/AUTODOCK_FILES/ind.pdbqt");
+ }
+ }
}
+
public static String createExperimentWRFTrestles(Airavata.Client client) throws TException {
try {
- List<InputDataObjectType> exInputs = client.getApplicationInputs(wrfAppId);
+ List<InputDataObjectType> exInputs = client.getApplicationInputs(wrfAppId);
setWRFInputs(exInputs);
List<OutputDataObjectType> exOut = client.getApplicationOutputs(wrfAppId);
-
- Experiment simpleExperiment =
+
+ Experiment simpleExperiment =
ExperimentModelUtil.createSimpleExperiment("default", "admin", "WRFExperiment", "Testing", wrfAppId, exInputs);
simpleExperiment.setExperimentOutputs(exOut);
@@ -952,11 +955,11 @@ public class CreateLaunchExperiment {
try {
List<InputDataObjectType> exInputs = client.getApplicationInputs(echoAppId);
for (InputDataObjectType inputDataObjectType : exInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo")) {
- inputDataObjectType.setValue("Hello World");
- }
- }
- List<OutputDataObjectType> exOut = client.getApplicationOutputs(echoAppId);
+ if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo")) {
+ inputDataObjectType.setValue("Hello World");
+ }
+ }
+ List<OutputDataObjectType> exOut = client.getApplicationOutputs(echoAppId);
Project project = ProjectModelUtil.createProject("default", "admin", "test project");
String projectId = client.createProject(DEFAULT_GATEWAY, project);
@@ -1068,13 +1071,13 @@ public class CreateLaunchExperiment {
public static String createExperimentForBR2(Airavata.Client client) throws TException {
try {
- List<InputDataObjectType> exInputs = client.getApplicationInputs(echoAppId);
- for (InputDataObjectType inputDataObjectType : exInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo")) {
- inputDataObjectType.setValue("Hello World");
- }
- }
- List<OutputDataObjectType> exOut = client.getApplicationOutputs(echoAppId);
+ List<InputDataObjectType> exInputs = client.getApplicationInputs(echoAppId);
+ for (InputDataObjectType inputDataObjectType : exInputs) {
+ if (inputDataObjectType.getName().equalsIgnoreCase("Input_to_Echo")) {
+ inputDataObjectType.setValue("Hello World");
+ }
+ }
+ List<OutputDataObjectType> exOut = client.getApplicationOutputs(echoAppId);
Project project = ProjectModelUtil.createProject("default", "lahiru", "test project");
@@ -1163,6 +1166,7 @@ public class CreateLaunchExperiment {
}
return null;
}
+
public static String createExperimentLAMMPSForLSF(Airavata.Client client) throws TException {
try {
List<InputDataObjectType> exInputs = client.getApplicationInputs(lammpsAppId);
@@ -1225,7 +1229,7 @@ public class CreateLaunchExperiment {
public static String createExperimentForBR2Amber(Airavata.Client client) throws TException {
try {
- List<InputDataObjectType> exInputs = client.getApplicationInputs(amberAppId);
+ List<InputDataObjectType> exInputs = client.getApplicationInputs(amberAppId);
// for (InputDataObjectType inputDataObjectType : exInputs) {
// if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
// inputDataObjectType.setValue("/Users/raminder/Documents/Sample/Amber/02_Heat.rst");
@@ -1236,17 +1240,17 @@ public class CreateLaunchExperiment {
// }
//
// }
- for (InputDataObjectType inputDataObjectType : exInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
- inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/02_Heat.rst");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) {
- inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/03_Prod.in");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) {
- inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/prmtop");
- }
- }
-
- List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId);
+ for (InputDataObjectType inputDataObjectType : exInputs) {
+ if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
+ inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/02_Heat.rst");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) {
+ inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/03_Prod.in");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) {
+ inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/prmtop");
+ }
+ }
+
+ List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId);
Project project = ProjectModelUtil.createProject("default", "admin", "test project");
String projectId = client.createProject(DEFAULT_GATEWAY, project);
@@ -1289,7 +1293,7 @@ public class CreateLaunchExperiment {
public static String createExperimentForStampedeAmber(Airavata.Client client) throws TException {
try {
- List<InputDataObjectType> exInputs = client.getApplicationInputs(amberAppId);
+ List<InputDataObjectType> exInputs = client.getApplicationInputs(amberAppId);
// for (InputDataObjectType inputDataObjectType : exInputs) {
// if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
// inputDataObjectType.setValue("/Users/raminder/Documents/Sample/Amber/02_Heat.rst");
@@ -1300,17 +1304,17 @@ public class CreateLaunchExperiment {
// }
//
// }
- for (InputDataObjectType inputDataObjectType : exInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
- inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/02_Heat.rst");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) {
- inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/03_Prod.in");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) {
- inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/prmtop");
- }
- }
+ for (InputDataObjectType inputDataObjectType : exInputs) {
+ if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
+ inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/02_Heat.rst");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) {
+ inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/03_Prod.in");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) {
+ inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/prmtop");
+ }
+ }
- List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId);
+ List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId);
Project project = ProjectModelUtil.createProject("default", "admin", "test project");
@@ -1353,8 +1357,8 @@ public class CreateLaunchExperiment {
public static String createExperimentForTrestlesAmber(Airavata.Client client) throws TException {
try {
-
- List<InputDataObjectType> exInputs = client.getApplicationInputs(amberAppId);
+
+ List<InputDataObjectType> exInputs = client.getApplicationInputs(amberAppId);
// for (InputDataObjectType inputDataObjectType : exInputs) {
// if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
// inputDataObjectType.setValue("/Users/raminder/Documents/Sample/Amber/02_Heat.rst");
@@ -1365,16 +1369,16 @@ public class CreateLaunchExperiment {
// }
//
// }
- for (InputDataObjectType inputDataObjectType : exInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
- inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/02_Heat.rst");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) {
- inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/03_Prod.in");
- } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) {
- inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/prmtop");
- }
- }
- List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId);
+ for (InputDataObjectType inputDataObjectType : exInputs) {
+ if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
+ inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/02_Heat.rst");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) {
+ inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/03_Prod.in");
+ } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) {
+ inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/prmtop");
+ }
+ }
+ List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId);
Project project = ProjectModelUtil.createProject("default", "admin", "test project");
String projectId = client.createProject(DEFAULT_GATEWAY, project);
@@ -1418,7 +1422,7 @@ public class CreateLaunchExperiment {
public static void launchExperiment(Airavata.Client client, String expId)
throws TException {
try {
- String tokenId ="-0bbb-403b-a88a-42b6dbe198e9";
+ String tokenId = "-0bbb-403b-a88a-42b6dbe198e9";
client.launchExperiment(expId, tokenId);
} catch (ExperimentNotFoundException e) {
logger.error("Error occured while launching the experiment...", e.getMessage());
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
index 46f06f1..a0cc142 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -64,6 +64,10 @@ public class AiravataZKUtils {
+ ":" + ServerSettings.getSetting(Constants.ZOOKEEPER_SERVER_PORT,"2181");
}
+ public static int getZKTimeout()throws ApplicationSettingsException {
+ return Integer.parseInt(ServerSettings.getSetting(Constants.ZOOKEEPER_TIMEOUT,"30000"));
+ }
+
public static String getExpStatePath(String experimentId, String taskId) throws ApplicationSettingsException {
return AiravataZKUtils.getExpZnodePath(experimentId, taskId) +
File.separator +
@@ -88,6 +92,7 @@ public class AiravataZKUtils {
return null;
}
+
public static int getExpStateValueWithGivenPath(ZooKeeper zk,String fullPath)throws ApplicationSettingsException,
KeeperException, InterruptedException {
Stat exists = zk.exists(fullPath, false);
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
index 154bea1..391a3c6 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -57,4 +57,5 @@ public final class Constants {
public static final String ZOOKEEPER_API_SERVER_NAME = "api-server-name";
public static final String STAT = "stat";
public static final String JOB = "job";
+ public static final String ZOOKEEPER_TIMEOUT = "zookeeper.timeout";
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index c65feeb..2d8bc48 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -214,20 +214,19 @@ amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
connection.name=xsede
#publisher
+rabbitmq.broker.url=amqp://localhost:5672
+
activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
publish.rabbitmq=false
-<<<<<<< HEAD
status.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher
task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchPublisher
-rabbitmq.broker.url=amqp://localhost:5672
rabbitmq.status.exchange.name=airavata_rabbitmq_exchange
rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange
-=======
+
+
activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
-rabbitmq.broker.url=amqp://gw111.iu.xsede.org:5672
rabbitmq.exchange.name=airavata_rabbitmq_exchange
->>>>>>> master
###########################################################################
# Orchestrator module Configuration
@@ -252,6 +251,7 @@ embedded.zk=true
zookeeper.server.host=localhost
zookeeper.server.port=2181
airavata-server=/api-server
+zookeeper.timeout=30000
orchestrator-server=/orchestrator-server
gfac-server=/gfac-server
gfac-experiments=/gfac-experiments
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/credential-store/credential-store-webapp/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/credential-store/credential-store-webapp/src/main/resources/airavata-server.properties b/modules/credential-store/credential-store-webapp/src/main/resources/airavata-server.properties
index 64e0160..f2e8d82 100644
--- a/modules/credential-store/credential-store-webapp/src/main/resources/airavata-server.properties
+++ b/modules/credential-store/credential-store-webapp/src/main/resources/airavata-server.properties
@@ -228,6 +228,7 @@ orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer
embedded.zk=true
zookeeper.server.host=localhost
zookeeper.server.port=2181
+zookeeper.timeout=30000
airavata-server=/api-server
orchestrator-server=/orchestrator-server
gfac-server=/gfac-server
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 88979a4..d45710e 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -66,6 +66,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+ private static int requestCount=0;
+
private Registry registry;
private AppCatalog appCatalog;
@@ -96,7 +98,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
String zkhostPort = AiravataZKUtils.getZKhostPort();
airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
+ ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
- zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is required, this will only use to store some data
+ zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); // no watcher is required, this will only use to store some data
gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
synchronized (mutex) {
@@ -110,14 +112,14 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
appCatalog = AppCatalogFactory.getAppCatalog();
setGatewayProperties();
BetterGfacImpl.startDaemonHandlers();
- BetterGfacImpl.startStatusUpdators(registry, zk, publisher);
- inHandlerFutures = new ArrayList<Future>();
if (ServerSettings.isGFacPassiveMode()) {
rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
-
}
+ BetterGfacImpl.startStatusUpdators(registry, zk, publisher, rabbitMQTaskLaunchConsumer);
+ inHandlerFutures = new ArrayList<Future>();
+
} catch (ApplicationSettingsException e) {
logger.error("Error initialising GFAC", e);
throw new Exception("Error initialising GFAC", e);
@@ -182,7 +184,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
break;
case Expired:case Disconnected:
try {
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this);
+ zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
synchronized (mutex) {
mutex.wait(); // waiting for the syncConnected event
}
@@ -231,6 +233,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
* @param gatewayId
*/
public boolean submitJob(String experimentId, String taskId, String gatewayId) throws TException {
+ requestCount++;
+ logger.info("-----------------------------------------------------" + requestCount+"-----------------------------------------------------");
logger.infoId(experimentId, "GFac Received submit jog request for the Experiment: {} TaskId: {}", experimentId, taskId);
GFac gfac = getGfac();
InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId, taskId, gatewayId);
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 7559878..3cd07c6 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -49,6 +49,7 @@ import org.apache.airavata.gfac.core.states.GfacPluginState;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.PublisherFactory;
+import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.appinterface.DataType;
@@ -111,7 +112,7 @@ public class BetterGfacImpl implements GFac,Watcher {
this.appCatalog = appCatalog;
}
- public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) {
+ public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher,RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
try {
String[] listenerClassList = ServerSettings.getActivityListeners();
Publisher rabbitMQPublisher = null;
@@ -122,7 +123,7 @@ public class BetterGfacImpl implements GFac,Watcher {
Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
AbstractActivityListener abstractActivityListener = aClass.newInstance();
activityListeners.add(abstractActivityListener);
- abstractActivityListener.setup(publisher, registry, zk, rabbitMQPublisher);
+ abstractActivityListener.setup(publisher, registry, zk, rabbitMQPublisher,rabbitMQTaskLaunchConsumer);
log.info("Registering listener: " + listenerClass);
publisher.registerListener(abstractActivityListener);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 26902e7..eaa3c5f 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -30,6 +30,9 @@ import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQProducer;
+import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -44,19 +47,21 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
private static Integer mutex = -1;
+ private RabbitMQTaskLaunchConsumer consumer;
+
@Subscribe
public void updateZK(GfacExperimentStateChangeRequest statusChangeRequest) throws Exception {
logger.info("Gfac internal state changed to: " + statusChangeRequest.getState().toString());
MonitorID monitorID = statusChangeRequest.getMonitorID();
- String experimentPath = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments") +
- File.separator + ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME) + File.separator + statusChangeRequest.getMonitorID().getExperimentID() + "+" + monitorID.getTaskID();
- String deliveryTagPath = experimentPath + GFacUtils.DELIVERY_TAG_POSTFIX;
+ String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+ String experimentPath = experimentNode + File.separator + ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)
+ + File.separator + statusChangeRequest.getMonitorID().getExperimentID() + "+" + monitorID.getTaskID();
Stat exists = null;
try {
if (!zk.getState().isConnected()) {
String zkhostPort = AiravataZKUtils.getZKhostPort();
- zk = new ZooKeeper(zkhostPort, 6000, this);
- synchronized (mutex){
+ zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
+ synchronized (mutex) {
mutex.wait();
}
}
@@ -77,11 +82,11 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
throw new Exception(e.getMessage(), e);
}
Stat state = zk.exists(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
- if(state == null) {
+ if (state == null) {
// state znode has to be created
zk.create(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }else {
+ } else {
zk.setData(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), state.getVersion());
}
@@ -89,12 +94,20 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
case COMPLETED:
logger.info("Experiment Completed, So removing the ZK entry for the experiment" + monitorID.getExperimentID());
logger.info("Zookeeper experiment Path: " + experimentPath);
+ if (ServerSettings.isGFacPassiveMode()) {
+ consumer.sendAck(GFacUtils.getDeliveryTag(statusChangeRequest.getMonitorID().getExperimentID(),
+ monitorID.getTaskID(), zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)));
+ }
ZKUtil.deleteRecursive(zk, experimentPath);
break;
case FAILED:
logger.info("Experiment Failed, So removing the ZK entry for the experiment" + monitorID.getExperimentID());
logger.info("Zookeeper experiment Path: " + experimentPath);
- ZKUtil.deleteRecursive(zk,experimentPath);
+ if (ServerSettings.isGFacPassiveMode()) {
+ consumer.sendAck(GFacUtils.getDeliveryTag(statusChangeRequest.getMonitorID().getExperimentID(),
+ monitorID.getTaskID(), zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)));
+ }
+ ZKUtil.deleteRecursive(zk, experimentPath);
break;
default:
}
@@ -105,6 +118,9 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
if (configuration instanceof ZooKeeper) {
this.zk = (ZooKeeper) configuration;
}
+ if (configuration instanceof RabbitMQTaskLaunchConsumer) {
+ this.consumer = (RabbitMQTaskLaunchConsumer) configuration;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
index 55da288..aefe490 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
@@ -94,10 +94,11 @@ public class MonitorID {
experimentID = jobExecutionContext.getExperiment().getExperimentID();
workflowNodeID = jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId();// at this point we only have one node todo: fix this
try {
- jobID = jobExecutionContext.getJobDetails().getJobID();
jobName = jobExecutionContext.getJobDetails().getJobName();
+ jobID = jobExecutionContext.getJobDetails().getJobID();
}catch(NullPointerException e){
logger.error("There is not job created at this point");
+ // this is not a big deal we create MonitorId before having a jobId or job Name
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 83928c3..707cf97 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -1159,29 +1159,13 @@ public class GFacUtils {
public static boolean createExperimentEntryForPassive(String experimentID,
String taskID, ZooKeeper zk, String experimentNode,
String pickedChild, String tokenId,long deliveryTag) throws KeeperException,
- InterruptedException {
+ InterruptedException, ApplicationSettingsException {
String experimentPath = experimentNode + File.separator + pickedChild;
String newExpNode = experimentPath + File.separator + experimentID
+ "+" + taskID;
Stat exists1 = zk.exists(newExpNode, false);
String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk);
- String foundExperimentPath = null;
if (exists1 == null && experimentEntry == null) { // this means this is a very new experiment
- List<String> runningGfacNodeNames = AiravataZKUtils.getAllGfacNodeNames(zk); // here we take old gfac servers
-
- for (String gfacServerNode : runningGfacNodeNames) {
- if (!gfacServerNode.equals(pickedChild)) {
- foundExperimentPath = experimentNode + File.separator
- + gfacServerNode + File.separator + experimentID
- + "+" + taskID;
- exists1 = zk.exists(foundExperimentPath, true);
- if (exists1 != null) { // when the experiment is found we
- // break the loop
- break;
- }
- }
- }
- if (exists1 == null) { // OK this is a pretty new experiment so we
// are going to create a new node
log.info("This is a new Job, so creating all the experiment docs from the scratch");
Stat expParent = zk.exists(newExpNode, false);
@@ -1199,34 +1183,35 @@ public class GFacUtils {
String s1 = zk.create(newExpNode + File.separator + "operation", "submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
zk.exists(s1, true);// we want to know when this node get deleted
- String s2 = zk.create(newExpNode + DELIVERY_TAG_POSTFIX, ByteBuffer.allocate(8).putLong(deliveryTag).array(), ZooDefs.Ids.OPEN_ACL_UNSAFE, // here we store the value of delivery message
+ String s2 = zk.create(newExpNode + DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE, // here we store the value of delivery message
CreateMode.PERSISTENT);
- } else {
- // ohhh this node exists in some other failed gfac folder, we
- // have to move it to this gfac experiment list,safely
+ }else if(experimentEntry != null && GFacUtils.isCancelled(experimentID,taskID,zk) ){
+ // this happens when a cancel request comes to a differnt gfac node, in this case we do not move gfac experiment
+ // node to gfac node specific location, because original request execution will fail with errors
+ log.error("This experiment is already cancelled and its already executing the cancel operation so cannot submit again !");
+ return false;
+ } else if(experimentEntry != null && !GFacUtils.isCancelled(experimentID,taskID,zk)){
+ if(ServerSettings.isGFacPassiveMode()){
+ log.error("ExperimentID: " + experimentID + " taskID: " + taskID
+ + " was running by some Gfac instance,but it failed");
log.info("This is an old Job, so copying data from old experiment location");
zk.create(newExpNode,
- zk.getData(foundExperimentPath, false, exists1),
+ zk.getData(experimentEntry, false, exists1),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- List<String> children = zk.getChildren(foundExperimentPath,
+ List<String> children = zk.getChildren(experimentEntry,
false);
for (String childNode1 : children) {
- String level1 = foundExperimentPath + File.separator
+ String level1 = experimentEntry + File.separator
+ childNode1;
- Stat exists2 = zk.exists(level1, false); // no need to check
- // exists
+ Stat exists2 = zk.exists(level1, false); // no need to check exists
String newLeve1 = newExpNode + File.separator + childNode1;
- log.info("Creating new znode: " + newLeve1); // these has to
- // be info
- // logs
+ log.info("Creating new znode: " + newLeve1); // these has to be info logs
zk.create(newLeve1, zk.getData(level1, false, exists2),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
for (String childNode2 : zk.getChildren(level1, false)) {
String level2 = level1 + File.separator + childNode2;
- Stat exists3 = zk.exists(level2, false); // no need to
- // check
- // exists
+ Stat exists3 = zk.exists(level2, false); // no need to check exists
String newLeve2 = newLeve1 + File.separator
+ childNode2;
log.info("Creating new znode: " + newLeve2);
@@ -1239,40 +1224,47 @@ public class GFacUtils {
// old experiment,otherwise we do
// not delete a single file
log.info("After a successful copying of experiment data for an old experiment we delete the old data");
- log.info("Deleting experiment data: " + foundExperimentPath);
- ZKUtil.deleteRecursive(zk, foundExperimentPath);
- }
- }else if(experimentEntry != null && GFacUtils.isCancelled(experimentID,taskID,zk) ){
- // this happens when a cancel request comes to a differnt gfac node, in this case we do not move gfac experiment
- // node to gfac node specific location, because original request execution will fail with errors
- log.error("This experiment is already cancelled and its already executing the cancel operation so cannot submit again !");
- return false;
- } else {
- log.error("ExperimentID: " + experimentID + " taskID: " + taskID
- + " is already running by this Gfac instance");
- List<String> runningGfacNodeNames = AiravataZKUtils
- .getAllGfacNodeNames(zk); // here we take old gfac servers
- // too
- for (String gfacServerNode : runningGfacNodeNames) {
- if (!gfacServerNode.equals(pickedChild)) {
- foundExperimentPath = experimentNode + File.separator
- + gfacServerNode + File.separator + experimentID
- + "+" + taskID;
- break;
+ log.info("Deleting experiment data: " + experimentEntry);
+ ZKUtil.deleteRecursive(zk, experimentEntry);
+ }else {
+ log.error("ExperimentID: " + experimentID + " taskID: " + taskID
+ + " is already running by this Gfac instance");
+ List<String> runningGfacNodeNames = AiravataZKUtils
+ .getAllGfacNodeNames(zk); // here we take old gfac servers
+ // too
+ for (String gfacServerNode : runningGfacNodeNames) {
+ if (!gfacServerNode.equals(pickedChild)) {
+ experimentEntry = experimentNode + File.separator
+ + gfacServerNode + File.separator + experimentID
+ + "+" + taskID;
+ break;
+ }
+ }
+ if(experimentEntry!=null) {
+ ZKUtil.deleteRecursive(zk, experimentEntry);
}
}
- ZKUtil.deleteRecursive(zk, foundExperimentPath);
+
}
return true;
}
+ /**
+ * This will return a value if the server is down because we iterate through exisiting experiment nodes, not
+ * through gfac-server nodes
+ * @param experimentID
+ * @param taskID
+ * @param zk
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
public static String findExperimentEntry(String experimentID,
String taskID, ZooKeeper zk
) throws KeeperException,
InterruptedException {
- String gfacServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
- List<String> children = zk.getChildren(gfacServer, false);
+ List<String> children = zk.getChildren(experimentNode, false);
for(String pickedChild:children) {
String experimentPath = experimentNode + File.separator + pickedChild;
String newExpNode = experimentPath + File.separator + experimentID
@@ -1341,6 +1333,18 @@ public class GFacUtils {
}
}
+ public static long getDeliveryTag(String experimentID,
+ String taskID, ZooKeeper zk, String experimentNode,
+ String pickedChild) throws KeeperException, InterruptedException,GFacException {
+ String experimentPath = experimentNode + File.separator + pickedChild;
+ String deliveryTagPath = experimentPath + File.separator + experimentID
+ + "+" + taskID + DELIVERY_TAG_POSTFIX;
+ Stat exists = zk.exists(deliveryTagPath, false);
+ if(exists==null) {
+ throw new GFacException("Cannot find delivery Tag for this experiment");
+ }
+ return bytesToLong(zk.getData(deliveryTagPath, false, exists));
+ }
public static String getPluginData(JobExecutionContext jobExecutionContext,
String className) throws ApplicationSettingsException,
KeeperException, InterruptedException {
@@ -1452,4 +1456,17 @@ public class GFacUtils {
return sb.toString();
}
+
+ public static byte[] longToBytes(long x) {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong(x);
+ return buffer.array();
+ }
+
+ public static long bytesToLong(byte[] bytes) {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.put(bytes);
+ buffer.flip();//need flip
+ return buffer.getLong();
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index 9f055e9..d62d3d7 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -124,9 +124,9 @@ public class LocalProvider extends AbstractProvider {
// log info
log.info("Command = " + InputUtils.buildCommand(cmdList));
log.info("Working dir = " + builder.directory());
- for (String key : builder.environment().keySet()) {
+ /*for (String key : builder.environment().keySet()) {
log.info("Env[" + key + "] = " + builder.environment().get(key));
- }
+ }*/
}
public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index cbac726..15b7241 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -199,7 +199,7 @@ public class CommonUtils {
if (zk == null || !zk.getState().isConnected()) {
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, new Watcher() {
+ zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), new Watcher() {
@Override
public void process(WatchedEvent event) {
countDownLatch.countDown();
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
index 7c88a25..9cad924 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -165,14 +165,15 @@ public class RabbitMQTaskLaunchConsumer {
event = taskTerminateEvent;
gatewayId = null;
}
+ System.out.println("*deliveryTag:"+deliveryTag);
MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId,deliveryTag);
messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
handler.onMessage(messageContext);
- try {
+ /*try {
channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component to ack when the job is done
} catch (IOException e) {
logger.error(e.getMessage(), e);
- }
+ }*/
} catch (TException e) {
String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
log.warn(msg, e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index f430bc9..63f0d9c 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -115,7 +115,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
// setGatewayName(ServerSettings.getDefaultUserGateway());
setAiravataUserName(ServerSettings.getDefaultUser());
try {
- zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is
+ zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); // no watcher is
// required, this
// will only use to
// store some data
@@ -303,7 +303,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
case Expired:
case Disconnected:
try {
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this);
+ zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
synchronized (mutex) {
mutex.wait(); // waiting for the syncConnected event
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
index f19b949..993a303 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
@@ -70,7 +70,7 @@ public class OrchestratorRecoveryHandler implements Watcher {
*/
public void recover() throws OrchestratorException, ApplicationSettingsException, IOException, KeeperException, InterruptedException {
String zkhostPort = AiravataZKUtils.getZKhostPort();
- zk = new ZooKeeper(zkhostPort, 6000, this);
+ zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
synchronized (mutex) {
mutex.wait();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 8066113..c17638b 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -105,42 +105,30 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
try {
if (zk == null || !zk.getState().isConnected()) {
String zkhostPort = AiravataZKUtils.getZKhostPort();
- zk = new ZooKeeper(zkhostPort, 6000, this);
+ zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
synchronized (mutex) {
mutex.wait();
}
}
- String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
- List<String> children = zk.getChildren(gfacServer, this);
-
- if (children.size() == 0) {
- // Zookeeper data need cleaning
- throw new OrchestratorException("There is no active GFac instance to route the request");
- } else {
- String gatewayId = null;
- CredentialReader credentialReader = GFacUtils.getCredentialReader();
- if (credentialReader != null) {
- try {
- gatewayId = credentialReader.getGatewayID(tokenId);
- } catch (Exception e) {
- logger.error(e.getLocalizedMessage());
- }
+ String gatewayId = null;
+ CredentialReader credentialReader = GFacUtils.getCredentialReader();
+ if (credentialReader != null) {
+ try {
+ gatewayId = credentialReader.getGatewayID(tokenId);
+ } catch (Exception e) {
+ logger.error(e.getLocalizedMessage());
}
- if(gatewayId == null || gatewayId.isEmpty()){
- gatewayId = ServerSettings.getDefaultUserGateway();
- }
-
- TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId,tokenId);
- MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId);
- messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- publisher.publish(messageContext);
}
+ if (gatewayId == null || gatewayId.isEmpty()) {
+ gatewayId = ServerSettings.getDefaultUserGateway();
+ }
+ TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId, tokenId);
+ MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId);
+ messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ publisher.publish(messageContext);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
throw new OrchestratorException(e);
- } catch (KeeperException e) {
- logger.error(e.getMessage(), e);
- throw new OrchestratorException(e);
} catch (ApplicationSettingsException e) {
logger.error(e.getMessage(), e);
throw new OrchestratorException(e);
@@ -167,7 +155,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
try {
if (zk == null || !zk.getState().isConnected()) {
String zkhostPort = AiravataZKUtils.getZKhostPort();
- zk = new ZooKeeper(zkhostPort, 6000, this);
+ zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
synchronized (mutex) {
mutex.wait();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cb6c4ccf/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
index b855de2..5a1be5a 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
@@ -78,7 +78,7 @@ public class GFACRPCJobSubmitter implements JobSubmitter, Watcher {
try {
if (zk == null || !zk.getState().isConnected()) {
String zkhostPort = AiravataZKUtils.getZKhostPort();
- zk = new ZooKeeper(zkhostPort, 6000, this);
+ zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
synchronized (mutex) {
mutex.wait();
}
@@ -146,7 +146,7 @@ public class GFACRPCJobSubmitter implements JobSubmitter, Watcher {
try {
if (zk == null || !zk.getState().isConnected()) {
String zkhostPort = AiravataZKUtils.getZKhostPort();
- zk = new ZooKeeper(zkhostPort, 6000, this);
+ zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
synchronized (mutex) {
mutex.wait();
}