You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/06/24 20:05:50 UTC
[3/5] git commit: more zk related changes
more zk related changes
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e86504aa
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e86504aa
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e86504aa
Branch: refs/heads/master
Commit: e86504aae30401d109690d8814cbf4bcc6f77ae9
Parents: 362da4e
Author: lahiru <la...@apache.org>
Authored: Tue Jun 24 13:46:33 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Jun 24 13:46:33 2014 -0400
----------------------------------------------------------------------
.../airavata/client/tools/DocumentCreator.java | 2 +-
modules/commons/utils/pom.xml | 12 +-
.../airavata/common/utils/AiravataZKUtils.java | 59 +++
.../airavata/gfac/server/GfacServerHandler.java | 13 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 399 +++++++++-----
.../apache/airavata/gfac/core/cpi/GFacImpl.java | 3 +-
.../core/monitor/GfacInternalStatusUpdator.java | 24 +-
.../state/GfacExperimentStateChangeRequest.java | 2 +-
.../airavata/gfac/core/utils/GFacUtils.java | 21 +
.../gfac/core/utils/GfacExperimentState.java | 81 +++
.../experiment/GfacExperimentState.java | 82 ---
.../experiment/GfacExperimentStatus.java | 516 -------------------
.../experiment/gfacDataModelConstants.java | 59 ---
.../server/OrchestratorServerHandler.java | 5 +-
.../core/impl/GFACServiceJobSubmitter.java | 6 +
15 files changed, 483 insertions(+), 801 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
index ffcff17..d573da9 100644
--- a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
+++ b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
@@ -671,7 +671,7 @@ public class DocumentCreator {
ApplicationDescription applicationDeploymentDescription = new ApplicationDescription();
ApplicationDeploymentDescriptionType applicationDeploymentDescriptionType = applicationDeploymentDescription.getType();
applicationDeploymentDescriptionType.addNewApplicationName().setStringValue(serviceName);
- applicationDeploymentDescriptionType.setExecutableLocation("/tmp/echo.sh");
+ applicationDeploymentDescriptionType.setExecutableLocation("/bin/echo");
applicationDeploymentDescriptionType.setScratchWorkingDirectory("/tmp");
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/commons/utils/pom.xml
----------------------------------------------------------------------
diff --git a/modules/commons/utils/pom.xml b/modules/commons/utils/pom.xml
index 151a141..aca8fea 100644
--- a/modules/commons/utils/pom.xml
+++ b/modules/commons/utils/pom.xml
@@ -8,7 +8,8 @@
ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.apache.airavata</groupId>
@@ -96,7 +97,7 @@
<artifactId>tomcat-embed-core</artifactId>
<version>7.0.22</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
@@ -107,7 +108,7 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.airavata</groupId>
<artifactId>airavata-server-configuration</artifactId>
<scope>test</scope>
@@ -122,6 +123,11 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.0</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
new file mode 100644
index 0000000..7349b7e
--- /dev/null
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.common.utils;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.File;
+
+public class AiravataZKUtils {
+ public static final String ZK_EXPERIMENT_STATE_NODE = "state";
+
+ public static String getExpZnodePath(String experimentId, String taskId) throws ApplicationSettingsException {
+ return ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE) +
+ File.separator +
+ ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME) + File.separator
+ + experimentId + "+" + taskId;
+ }
+
+ public static String getZKhostPort() throws ApplicationSettingsException {
+ return ServerSettings.getSetting(Constants.ZOOKEEPER_SERVER_HOST)
+ + ":" + ServerSettings.getSetting(Constants.ZOOKEEPER_SERVER_PORT);
+ }
+
+ public static String getExpStatePath(String experimentId, String taskId) throws ApplicationSettingsException {
+ return AiravataZKUtils.getExpZnodePath(experimentId, taskId) +
+ File.separator +
+ "state";
+ }
+
+ public static String getExpState(ZooKeeper zk,String expId,String tId) throws ApplicationSettingsException,
+ KeeperException, InterruptedException {
+ Stat exists = zk.exists(getExpStatePath(expId, tId), false);
+ if(exists != null) {
+ return new String(zk.getData(getExpStatePath(expId, tId),false, exists));
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 27733f9..60a0f21 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -23,6 +23,7 @@ package org.apache.airavata.gfac.server;
import com.google.common.eventbus.EventBus;
import org.apache.airavata.common.exception.AiravataConfigurationException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.GFacException;
@@ -74,8 +75,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
public GfacServerHandler() {
// registering with zk
try {
- String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
- + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+ String zkhostPort = AiravataZKUtils.getZKhostPort();
String airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
+ ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
try {
@@ -140,6 +140,15 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
if (state == Event.KeeperState.SyncConnected) {
mutex.notify();
connected = true;
+ } else if(state == Event.KeeperState.Expired ||
+ state == Event.KeeperState.Disconnected){
+ try {
+ zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(),6000,this);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 195bfc1..6fb3e24 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -28,8 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import com.google.common.eventbus.EventBus;
-
import org.apache.airavata.client.api.AiravataAPI;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
@@ -59,12 +57,12 @@ import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.airavata.gfac.core.utils.GfacExperimentState;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.registry.api.AiravataRegistry2;
import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.airavata.registry.cpi.Registry;
-import org.apache.tools.ant.taskdefs.optional.j2ee.HotDeploymentTool;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,12 +86,12 @@ public class BetterGfacImpl implements GFac {
private AiravataRegistry2 airavataRegistry2;
private ZooKeeper zk; // we are not storing zk instance in to jobExecution context
-
+
private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
private static File gfacConfigFile;
- private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
+ private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
private static MonitorPublisher monitorPublisher;
@@ -114,35 +112,36 @@ public class BetterGfacImpl implements GFac {
this.zk = zooKeeper;
}
- public static void startStatusUpdators(Registry registry,ZooKeeper zk,MonitorPublisher publisher) {
+ public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) {
try {
String[] listenerClassList = ServerSettings.getActivityListeners();
for (String listenerClass : listenerClassList) {
Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
AbstractActivityListener abstractActivityListener = aClass.newInstance();
activityListeners.add(abstractActivityListener);
- abstractActivityListener.setup(publisher, registry,zk);
+ abstractActivityListener.setup(publisher, registry, zk);
log.info("Registering listener: " + listenerClass);
publisher.registerListener(abstractActivityListener);
}
- }catch (ClassNotFoundException e) {
- log.error("Error loading the listener classes configured in airavata-server.properties",e);
+ } catch (ClassNotFoundException e) {
+ log.error("Error loading the listener classes configured in airavata-server.properties", e);
} catch (InstantiationException e) {
- log.error("Error loading the listener classes configured in airavata-server.properties",e);
+ log.error("Error loading the listener classes configured in airavata-server.properties", e);
} catch (IllegalAccessException e) {
- log.error("Error loading the listener classes configured in airavata-server.properties",e);
- } catch (ApplicationSettingsException e){
- log.error("Error loading the listener classes configured in airavata-server.properties",e);
+ log.error("Error loading the listener classes configured in airavata-server.properties", e);
+ } catch (ApplicationSettingsException e) {
+ log.error("Error loading the listener classes configured in airavata-server.properties", e);
}
}
- public static void startDaemonHandlers() {
+
+ public static void startDaemonHandlers() {
List<GFacHandlerConfig> daemonHandlerConfig = null;
URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
gfacConfigFile = new File(resource.getPath());
try {
daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile);
} catch (ParserConfigurationException e) {
- log.error("Error parsing gfac-config.xml, double check the xml configuration",e);
+ log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
} catch (IOException e) {
log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
} catch (SAXException e) {
@@ -151,14 +150,14 @@ public class BetterGfacImpl implements GFac {
log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
}
- for(GFacHandlerConfig handlerConfig:daemonHandlerConfig){
+ for (GFacHandlerConfig handlerConfig : daemonHandlerConfig) {
String className = handlerConfig.getClassName();
try {
Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class);
ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance();
threadedHandler.initProperties(handlerConfig.getProperties());
daemonHandlers.add(threadedHandler);
- }catch (ClassNotFoundException e){
+ } catch (ClassNotFoundException e) {
log.error("Error initializing the handler: " + className);
log.error(className + " class has to implement " + ThreadedHandler.class);
} catch (InstantiationException e) {
@@ -173,7 +172,7 @@ public class BetterGfacImpl implements GFac {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
- for(ThreadedHandler tHandler:daemonHandlers){
+ for (ThreadedHandler tHandler : daemonHandlers) {
(new Thread(tHandler)).start();
}
}
@@ -194,7 +193,7 @@ public class BetterGfacImpl implements GFac {
* @return
* @throws GFacException
*/
- public boolean submitJob(String experimentID,String taskID) throws GFacException {
+ public boolean submitJob(String experimentID, String taskID) throws GFacException {
JobExecutionContext jobExecutionContext = null;
try {
jobExecutionContext = createJEC(experimentID, taskID);
@@ -216,31 +215,31 @@ public class BetterGfacImpl implements GFac {
// 2. Add another property to jobExecutionContext and read them inside the provider and use it.
String serviceName = taskData.getApplicationId();
if (serviceName == null) {
- throw new GFacException("Error executing the job because there is not Application Name in this Experiment: " + serviceName );
+ throw new GFacException("Error executing the job because there is not Application Name in this Experiment: " + serviceName);
}
-
+
ServiceDescription serviceDescription = airavataRegistry2.getServiceDescriptor(serviceName);
- if (serviceDescription == null ) {
- throw new GFacException("Error executing the job because there is not Application Name in this Experiment: " + serviceName );
+ if (serviceDescription == null) {
+ throw new GFacException("Error executing the job because there is not Application Name in this Experiment: " + serviceName);
}
String hostName;
HostDescription hostDescription = null;
- if(taskData.getTaskScheduling().getResourceHostId() != null){
+ if (taskData.getTaskScheduling().getResourceHostId() != null) {
hostName = taskData.getTaskScheduling().getResourceHostId();
hostDescription = airavataRegistry2.getHostDescriptor(hostName);
- }else{
- List<HostDescription> registeredHosts = new ArrayList<HostDescription>();
- Map<String, ApplicationDescription> applicationDescriptors = airavataRegistry2.getApplicationDescriptors(serviceName);
- for (String hostDescName : applicationDescriptors.keySet()) {
- registeredHosts.add(airavataRegistry2.getHostDescriptor(hostDescName));
- }
- Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
- HostScheduler hostScheduler = aClass.newInstance();
+ } else {
+ List<HostDescription> registeredHosts = new ArrayList<HostDescription>();
+ Map<String, ApplicationDescription> applicationDescriptors = airavataRegistry2.getApplicationDescriptors(serviceName);
+ for (String hostDescName : applicationDescriptors.keySet()) {
+ registeredHosts.add(airavataRegistry2.getHostDescriptor(hostDescName));
+ }
+ Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
+ HostScheduler hostScheduler = aClass.newInstance();
hostDescription = hostScheduler.schedule(registeredHosts);
- hostName = hostDescription.getType().getHostName();
+ hostName = hostDescription.getType().getHostName();
}
- if(hostDescription == null){
- throw new GFacException("Error executing the job as the host is not registered " + hostName);
+ if (hostDescription == null) {
+ throw new GFacException("Error executing the job as the host is not registered " + hostName);
}
ApplicationDescription applicationDescription = airavataRegistry2.getApplicationDescriptors(serviceName, hostName);
URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
@@ -283,6 +282,8 @@ public class BetterGfacImpl implements GFac {
public boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException {
// We need to check whether this job is submitted as a part of a large workflow. If yes,
// we need to setup workflow tracking listerner.
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status
String workflowInstanceID = null;
if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
// This mean we need to register workflow tracking listener.
@@ -291,11 +292,11 @@ public class BetterGfacImpl implements GFac {
}
// Register log event listener. This is required in all scenarios.
jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
- schedule(jobExecutionContext);
+ launch(jobExecutionContext);
return true;
}
- private void schedule(JobExecutionContext jobExecutionContext) throws GFacException {
+ private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
// Scheduler will decide the execution flow of handlers and provider which handles
// the job.
String experimentID = jobExecutionContext.getExperimentID();
@@ -303,27 +304,33 @@ public class BetterGfacImpl implements GFac {
Scheduler.schedule(jobExecutionContext);
// Executing in handlers in the order as they have configured in GFac configuration
- invokeInFlowHandlers(jobExecutionContext);
+ // here we do not skip handler if some handler does not have to be run again during re-run it can implement
+ // that logic in to the handler
+ int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+ if(stateVal >=2){
+ reInvokeInFlowHandlers(jobExecutionContext);
+ }else {
+ invokeInFlowHandlers(jobExecutionContext); // to keep the consistency we always try to re-run to avoid complexity
+ }
// if (experimentID != null){
// registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
// }
// After executing the in handlers provider instance should be set to job execution context.
// We get the provider instance and execute it.
- GFacProvider provider = jobExecutionContext.getProvider();
- if (provider != null) {
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.PROVIDERINVOKING));
- initProvider(provider, jobExecutionContext);
- executeProvider(provider, jobExecutionContext);
- disposeProvider(provider, jobExecutionContext);
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.PROVIDERINVOKED));
- }
- if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
- invokeOutFlowHandlers(jobExecutionContext);
+ stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+ if (stateVal == 4) { // if the job is completed during resubmission we handle it here
+ reInvokeProvider(jobExecutionContext);
+ }else if(stateVal == 3){
+ invokeProvider(jobExecutionContext);
+ }else{
+ log.info("We skip invoking Handler, because the experiment state is beyond the Provider Invocation !!!");
+ log.info("ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID());
}
} catch (Exception e) {
try {
// we make the experiment as failed due to exception scenario
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
monitorPublisher.publish(new
ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
ExperimentState.FAILED));
@@ -335,9 +342,9 @@ public class BetterGfacImpl implements GFac {
));
monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext),
new JobIdentity(jobExecutionContext.getExperimentID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getJobDetails().getJobID()), JobState.FAILED));
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.FAILED));
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getJobDetails().getJobID()), JobState.FAILED
+ ));
} catch (NullPointerException e1) {
log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " +
"NullPointerException occurred because at this point there might not have Job Created", e1, e);
@@ -348,6 +355,35 @@ public class BetterGfacImpl implements GFac {
}
}
+ private void invokeProvider(JobExecutionContext jobExecutionContext) throws GFacException {
+ GFacProvider provider = jobExecutionContext.getProvider();
+ if (provider != null) {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+ initProvider(provider, jobExecutionContext);
+ executeProvider(provider, jobExecutionContext);
+ disposeProvider(provider, jobExecutionContext);
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+ }
+ if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+ invokeOutFlowHandlers(jobExecutionContext);
+ }
+ }
+
+ private void reInvokeProvider(JobExecutionContext jobExecutionContext) throws GFacException {
+ GFacProvider provider = jobExecutionContext.getProvider();
+ if (provider != null) {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+ initProvider(provider, jobExecutionContext);
+ executeProvider(provider, jobExecutionContext);
+ disposeProvider(provider, jobExecutionContext);
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+ }
+ if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+ invokeOutFlowHandlers(jobExecutionContext);
+ }
+ }
+
+
private void initProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
try {
provider.initialize(jobExecutionContext);
@@ -358,7 +394,7 @@ public class BetterGfacImpl implements GFac {
private void executeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
try {
- provider.execute(jobExecutionContext);
+ provider.execute(jobExecutionContext);
} catch (Exception e) {
throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e);
}
@@ -383,86 +419,203 @@ public class BetterGfacImpl implements GFac {
private void invokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
- ,GfacExperimentState.INHANDLERSINVOKING));
- for (GFacHandlerConfig handlerClassName : handlers) {
- Class<? extends GFacHandler> handlerClass;
- GFacHandler handler;
- try {
- handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
- handler = handlerClass.newInstance();
- handler.initProperties(handlerClassName.getProperties());
- } catch (ClassNotFoundException e) {
- throw new GFacException("Cannot load handler class " + handlerClassName, e);
- } catch (InstantiationException e) {
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- } catch (IllegalAccessException e) {
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- }
- try {
- handler.invoke(jobExecutionContext);
- } catch (GFacHandlerException e) {
- throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+ try {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.INHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ try {
+ handler.invoke(jobExecutionContext);
+ } catch (GFacHandlerException e) {
+ throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+ }
}
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.INHANDLERSINVOKED));
+ } catch (Exception e) {
+ throw new GFacException("Error invoking ZK", e);
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
- ,GfacExperimentState.INHANDLERSINVOKED));
}
- public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
- GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
- List<GFacHandlerConfig> handlers = null;
- if(gFacConfiguration != null){
- handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
- }else {
- try {
- jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e) {
- log.error("Error constructing job execution context during outhandler invocation");
- throw new GFacException(e);
+ private void reInvokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
+ try {
+ int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+ if (stateVal == 8 || stateVal == -1) {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.INHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ try {
+ handler.invoke(jobExecutionContext);
+ } catch (GFacHandlerException e) {
+ throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+ }
+ }
}
- schedule(jobExecutionContext);
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.INHANDLERSINVOKED));
+ } catch (Exception e) {
+ throw new GFacException("Error invoking ZK", e);
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.OUTHANDLERSINVOKING));
- for (GFacHandlerConfig handlerClassName : handlers) {
- Class<? extends GFacHandler> handlerClass;
- GFacHandler handler;
- try {
- handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
- handler = handlerClass.newInstance();
- handler.initProperties(handlerClassName.getProperties());
- } catch (ClassNotFoundException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot load handler class " + handlerClassName, e);
- } catch (InstantiationException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- } catch (IllegalAccessException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+
+ public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ int stateVal = -1;
+ try {
+ stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ if (stateVal >= 0 && stateVal < 6) {
+ GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+ List<GFacHandlerConfig> handlers = null;
+ if (gFacConfiguration != null) {
+ handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+ } else {
+ try {
+ jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e) {
+ log.error("Error constructing job execution context during outhandler invocation");
+ throw new GFacException(e);
+ }
+ launch(jobExecutionContext);
}
- try {
- handler.invoke(jobExecutionContext);
- } catch (Exception e) {
- // TODO: Better error reporting.
- throw new GFacException("Error Executing a OutFlow Handler", e);
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ try {
+ handler.invoke(jobExecutionContext);
+ } catch (Exception e) {
+ // TODO: Better error reporting.
+ throw new GFacException("Error Executing a OutFlow Handler", e);
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.OUTHANDLERSINVOKED));
+
+ // At this point all the execution is finished so we update the task and experiment statuses.
+ // Handler authors does not have to worry about updating experiment or task statuses.
+ monitorPublisher.publish(new
+ ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+ ExperimentState.COMPLETED));
+ // Updating the task status if there's any task associated
+ monitorPublisher.publish(new TaskStatusChangeRequest(
+ new TaskIdentity(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
+ ));
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
}
+ }
- // At this point all the execution is finished so we update the task and experiment statuses.
- // Handler authors does not have to worry about updating experiment or task statuses.
- monitorPublisher.publish(new
- ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
- ExperimentState.COMPLETED));
- // Updating the task status if there's any task associated
- monitorPublisher.publish(new TaskStatusChangeRequest(
- new TaskIdentity(jobExecutionContext.getExperimentID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
- ));
-
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.COMPLETED));
+ public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ int stateVal = -1;
+ try {
+ stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ if (stateVal >= 0 && stateVal < 6) {
+ GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+ List<GFacHandlerConfig> handlers = null;
+ if (gFacConfiguration != null) {
+ handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+ } else {
+ try {
+ jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e) {
+ log.error("Error constructing job execution context during outhandler invocation");
+ throw new GFacException(e);
+ }
+ launch(jobExecutionContext);
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ try {
+ handler.invoke(jobExecutionContext);
+ } catch (Exception e) {
+ // TODO: Better error reporting.
+ throw new GFacException("Error Executing a OutFlow Handler", e);
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+ }
+
+ // At this point all the execution is finished so we update the task and experiment statuses.
+ // Handler authors does not have to worry about updating experiment or task statuses.
+ monitorPublisher.publish(new
+ ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+ ExperimentState.COMPLETED));
+ // Updating the task status if there's any task associated
+ monitorPublisher.publish(new TaskStatusChangeRequest(
+ new TaskIdentity(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
+ ));
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index a6908ba..5bc789c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -58,12 +58,11 @@ import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.airavata.gfac.core.utils.GfacExperimentState;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.registry.api.AiravataRegistry2;
import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.airavata.registry.cpi.Registry;
-import org.apache.tools.ant.taskdefs.optional.j2ee.HotDeploymentTool;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 24b3989..3933976 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -22,14 +22,11 @@ package org.apache.airavata.gfac.core.monitor;
import com.google.common.eventbus.Subscribe;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
-import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +39,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
private ZooKeeper zk;
- private Integer mutex = -1;
+ private static Integer mutex = -1;
@Subscribe
public void updateZK(GfacExperimentStateChangeRequest statusChangeRequest) throws KeeperException, InterruptedException, ApplicationSettingsException {
@@ -73,16 +70,23 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
} catch (IOException e) {
e.printStackTrace();
}
+ Stat state = zk.exists(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ if(state == null) {
+ // state znode has to be created
+ zk.create(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }else {
+ zk.setData(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), state.getVersion());
+ }
switch (statusChangeRequest.getState()) {
case COMPLETED:
- zk.delete(experimentPath, exists.getVersion());
+// ZKUtil.deleteRecursive(zk,experimentPath);
break;
case FAILED:
- zk.delete(experimentPath, exists.getVersion());
+ ZKUtil.deleteRecursive(zk,experimentPath);
break;
default:
- zk.setData(experimentPath, (statusChangeRequest.getMonitorID().getJobID() +
- "," + statusChangeRequest.getMonitorID().getWorkflowNodeID()).getBytes(), exists.getVersion());
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
index 50a60de..5f7f2c2 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
@@ -22,7 +22,7 @@ package org.apache.airavata.gfac.core.monitor.state;
import org.apache.airavata.gfac.core.monitor.JobIdentity;
import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.airavata.gfac.core.utils.GfacExperimentState;
public class GfacExperimentStateChangeRequest {
private GfacExperimentState state;
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 7013f3e..f67b592 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -32,6 +32,8 @@ import java.util.*;
import org.apache.airavata.client.api.AiravataAPI;
import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.StringUtil;
import org.apache.airavata.commons.gfac.type.ActualParameter;
import org.apache.airavata.gfac.Constants;
@@ -48,6 +50,8 @@ import org.apache.airavata.registry.cpi.CompositeIdentifier;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.schemas.gfac.*;
import org.apache.axiom.om.OMElement;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -658,5 +662,22 @@ public class GFacUtils {
return stringObjectHashMap;
}
+ public static GfacExperimentState getZKExperimentState(ZooKeeper zk,JobExecutionContext jobExecutionContext)
+ throws ApplicationSettingsException, KeeperException, InterruptedException {
+ String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID());
+ return GfacExperimentState.findByValue(Integer.parseInt(expState));
+ }
+
+ public static int getZKExperimentStateValue(ZooKeeper zk, JobExecutionContext jobExecutionContext)
+ throws ApplicationSettingsException, KeeperException, InterruptedException {
+ String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID());
+ if(expState == null){
+ return -1;
+ }
+ return Integer.parseInt(expState);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java
new file mode 100644
index 0000000..db2cab0
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.airavata.gfac.core.utils;
+
+
+public enum GfacExperimentState {
+ LAUNCHED(0),
+ ACCEPTED(1),
+ INHANDLERSINVOKING(2),
+ INHANDLERSINVOKED(3),
+ PROVIDERINVOKING(4),
+ PROVIDERINVOKED(5),
+ OUTHANDLERSINVOKING(6),
+ OUTHANDLERSINVOKED(7),
+ COMPLETED(8),
+ FAILED(9),
+ UNKNOWN(10);
+
+ private final int value;
+
+ private GfacExperimentState(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ *
+ * @return null if the value is not found.
+ */
+ public static GfacExperimentState findByValue(int value) {
+ switch (value) {
+ case 0:
+ return INHANDLERSINVOKING;
+ case 1:
+ return INHANDLERSINVOKED;
+ case 2:
+ return PROVIDERINVOKING;
+ case 3:
+ return PROVIDERINVOKED;
+ case 4:
+ return OUTHANDLERSINVOKING;
+ case 5:
+ return OUTHANDLERSINVOKED;
+ case 6:
+ return COMPLETED;
+ case 7:
+ return FAILED;
+ case 8:
+ return UNKNOWN;
+ default:
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
deleted file mode 100644
index 8d06aad..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
+++ /dev/null
@@ -1,82 +0,0 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Autogenerated by Thrift Compiler (0.9.1)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
- */
-package org.apache.airavata.gfac.workspace.experiment;
-
-
-import java.util.Map;
-import java.util.HashMap;
-import org.apache.thrift.TEnum;
-
-@SuppressWarnings("all") public enum GfacExperimentState implements org.apache.thrift.TEnum {
- INHANDLERSINVOKING(0),
- INHANDLERSINVOKED(1),
- PROVIDERINVOKING(2),
- PROVIDERINVOKED(3),
- OUTHANDLERSINVOKING(4),
- OUTHANDLERSINVOKED(5),
- COMPLETED(6),
- FAILED(7),
- UNKNOWN(8);
-
- private final int value;
-
- private GfacExperimentState(int value) {
- this.value = value;
- }
-
- /**
- * Get the integer value of this enum value, as defined in the Thrift IDL.
- */
- public int getValue() {
- return value;
- }
-
- /**
- * Find a the enum type by its integer value, as defined in the Thrift IDL.
- * @return null if the value is not found.
- */
- public static GfacExperimentState findByValue(int value) {
- switch (value) {
- case 0:
- return INHANDLERSINVOKING;
- case 1:
- return INHANDLERSINVOKED;
- case 2:
- return PROVIDERINVOKING;
- case 3:
- return PROVIDERINVOKED;
- case 4:
- return OUTHANDLERSINVOKING;
- case 5:
- return OUTHANDLERSINVOKED;
- case 6:
- return COMPLETED;
- case 7:
- return FAILED;
- case 8:
- return UNKNOWN;
- default:
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
deleted file mode 100644
index 3b9273d..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
+++ /dev/null
@@ -1,516 +0,0 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Autogenerated by Thrift Compiler (0.9.1)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
- */
-package org.apache.airavata.gfac.workspace.experiment;
-
-import org.apache.thrift.scheme.IScheme;
-import org.apache.thrift.scheme.SchemeFactory;
-import org.apache.thrift.scheme.StandardScheme;
-
-import org.apache.thrift.scheme.TupleScheme;
-import org.apache.thrift.protocol.TTupleProtocol;
-import org.apache.thrift.protocol.TProtocolException;
-import org.apache.thrift.EncodingUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.server.AbstractNonblockingServer.*;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.EnumMap;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.EnumSet;
-import java.util.Collections;
-import java.util.BitSet;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("all") public class GfacExperimentStatus implements org.apache.thrift.TBase<GfacExperimentStatus, GfacExperimentStatus._Fields>, java.io.Serializable, Cloneable, Comparable<GfacExperimentStatus> {
- private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("GfacExperimentStatus");
-
- private static final org.apache.thrift.protocol.TField GFAC_EXPERIMENT_STATE_FIELD_DESC = new org.apache.thrift.protocol.TField("gfacExperimentState", org.apache.thrift.protocol.TType.I32, (short)1);
- private static final org.apache.thrift.protocol.TField TIME_OF_STATE_CHANGE_FIELD_DESC = new org.apache.thrift.protocol.TField("timeOfStateChange", org.apache.thrift.protocol.TType.I64, (short)2);
-
- private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
- static {
- schemes.put(StandardScheme.class, new GfacExperimentStatusStandardSchemeFactory());
- schemes.put(TupleScheme.class, new GfacExperimentStatusTupleSchemeFactory());
- }
-
- /**
- *
- * @see GfacExperimentState
- */
- public GfacExperimentState gfacExperimentState; // required
- public long timeOfStateChange; // optional
-
- /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
- @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
- /**
- *
- * @see GfacExperimentState
- */
- GFAC_EXPERIMENT_STATE((short)1, "gfacExperimentState"),
- TIME_OF_STATE_CHANGE((short)2, "timeOfStateChange");
-
- private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
-
- static {
- for (_Fields field : EnumSet.allOf(_Fields.class)) {
- byName.put(field.getFieldName(), field);
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, or null if its not found.
- */
- public static _Fields findByThriftId(int fieldId) {
- switch(fieldId) {
- case 1: // GFAC_EXPERIMENT_STATE
- return GFAC_EXPERIMENT_STATE;
- case 2: // TIME_OF_STATE_CHANGE
- return TIME_OF_STATE_CHANGE;
- default:
- return null;
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, throwing an exception
- * if it is not found.
- */
- public static _Fields findByThriftIdOrThrow(int fieldId) {
- _Fields fields = findByThriftId(fieldId);
- if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
- return fields;
- }
-
- /**
- * Find the _Fields constant that matches name, or null if its not found.
- */
- public static _Fields findByName(String name) {
- return byName.get(name);
- }
-
- private final short _thriftId;
- private final String _fieldName;
-
- _Fields(short thriftId, String fieldName) {
- _thriftId = thriftId;
- _fieldName = fieldName;
- }
-
- public short getThriftFieldId() {
- return _thriftId;
- }
-
- public String getFieldName() {
- return _fieldName;
- }
- }
-
- // isset id assignments
- private static final int __TIMEOFSTATECHANGE_ISSET_ID = 0;
- private byte __isset_bitfield = 0;
- private _Fields optionals[] = {_Fields.TIME_OF_STATE_CHANGE};
- public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
- static {
- Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
- tmpMap.put(_Fields.GFAC_EXPERIMENT_STATE, new org.apache.thrift.meta_data.FieldMetaData("gfacExperimentState", org.apache.thrift.TFieldRequirementType.REQUIRED,
- new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, GfacExperimentState.class)));
- tmpMap.put(_Fields.TIME_OF_STATE_CHANGE, new org.apache.thrift.meta_data.FieldMetaData("timeOfStateChange", org.apache.thrift.TFieldRequirementType.OPTIONAL,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
- metaDataMap = Collections.unmodifiableMap(tmpMap);
- org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(GfacExperimentStatus.class, metaDataMap);
- }
-
- public GfacExperimentStatus() {
- }
-
- public GfacExperimentStatus(
- GfacExperimentState gfacExperimentState)
- {
- this();
- this.gfacExperimentState = gfacExperimentState;
- }
-
- /**
- * Performs a deep copy on <i>other</i>.
- */
- public GfacExperimentStatus(GfacExperimentStatus other) {
- __isset_bitfield = other.__isset_bitfield;
- if (other.isSetGfacExperimentState()) {
- this.gfacExperimentState = other.gfacExperimentState;
- }
- this.timeOfStateChange = other.timeOfStateChange;
- }
-
- public GfacExperimentStatus deepCopy() {
- return new GfacExperimentStatus(this);
- }
-
- @Override
- public void clear() {
- this.gfacExperimentState = null;
- setTimeOfStateChangeIsSet(false);
- this.timeOfStateChange = 0;
- }
-
- /**
- *
- * @see GfacExperimentState
- */
- public GfacExperimentState getGfacExperimentState() {
- return this.gfacExperimentState;
- }
-
- /**
- *
- * @see GfacExperimentState
- */
- public GfacExperimentStatus setGfacExperimentState(GfacExperimentState gfacExperimentState) {
- this.gfacExperimentState = gfacExperimentState;
- return this;
- }
-
- public void unsetGfacExperimentState() {
- this.gfacExperimentState = null;
- }
-
- /** Returns true if field gfacExperimentState is set (has been assigned a value) and false otherwise */
- public boolean isSetGfacExperimentState() {
- return this.gfacExperimentState != null;
- }
-
- public void setGfacExperimentStateIsSet(boolean value) {
- if (!value) {
- this.gfacExperimentState = null;
- }
- }
-
- public long getTimeOfStateChange() {
- return this.timeOfStateChange;
- }
-
- public GfacExperimentStatus setTimeOfStateChange(long timeOfStateChange) {
- this.timeOfStateChange = timeOfStateChange;
- setTimeOfStateChangeIsSet(true);
- return this;
- }
-
- public void unsetTimeOfStateChange() {
- __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID);
- }
-
- /** Returns true if field timeOfStateChange is set (has been assigned a value) and false otherwise */
- public boolean isSetTimeOfStateChange() {
- return EncodingUtils.testBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID);
- }
-
- public void setTimeOfStateChangeIsSet(boolean value) {
- __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID, value);
- }
-
- public void setFieldValue(_Fields field, Object value) {
- switch (field) {
- case GFAC_EXPERIMENT_STATE:
- if (value == null) {
- unsetGfacExperimentState();
- } else {
- setGfacExperimentState((GfacExperimentState)value);
- }
- break;
-
- case TIME_OF_STATE_CHANGE:
- if (value == null) {
- unsetTimeOfStateChange();
- } else {
- setTimeOfStateChange((Long)value);
- }
- break;
-
- }
- }
-
- public Object getFieldValue(_Fields field) {
- switch (field) {
- case GFAC_EXPERIMENT_STATE:
- return getGfacExperimentState();
-
- case TIME_OF_STATE_CHANGE:
- return Long.valueOf(getTimeOfStateChange());
-
- }
- throw new IllegalStateException();
- }
-
- /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
- public boolean isSet(_Fields field) {
- if (field == null) {
- throw new IllegalArgumentException();
- }
-
- switch (field) {
- case GFAC_EXPERIMENT_STATE:
- return isSetGfacExperimentState();
- case TIME_OF_STATE_CHANGE:
- return isSetTimeOfStateChange();
- }
- throw new IllegalStateException();
- }
-
- @Override
- public boolean equals(Object that) {
- if (that == null)
- return false;
- if (that instanceof GfacExperimentStatus)
- return this.equals((GfacExperimentStatus)that);
- return false;
- }
-
- public boolean equals(GfacExperimentStatus that) {
- if (that == null)
- return false;
-
- boolean this_present_gfacExperimentState = true && this.isSetGfacExperimentState();
- boolean that_present_gfacExperimentState = true && that.isSetGfacExperimentState();
- if (this_present_gfacExperimentState || that_present_gfacExperimentState) {
- if (!(this_present_gfacExperimentState && that_present_gfacExperimentState))
- return false;
- if (!this.gfacExperimentState.equals(that.gfacExperimentState))
- return false;
- }
-
- boolean this_present_timeOfStateChange = true && this.isSetTimeOfStateChange();
- boolean that_present_timeOfStateChange = true && that.isSetTimeOfStateChange();
- if (this_present_timeOfStateChange || that_present_timeOfStateChange) {
- if (!(this_present_timeOfStateChange && that_present_timeOfStateChange))
- return false;
- if (this.timeOfStateChange != that.timeOfStateChange)
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return 0;
- }
-
- @Override
- public int compareTo(GfacExperimentStatus other) {
- if (!getClass().equals(other.getClass())) {
- return getClass().getName().compareTo(other.getClass().getName());
- }
-
- int lastComparison = 0;
-
- lastComparison = Boolean.valueOf(isSetGfacExperimentState()).compareTo(other.isSetGfacExperimentState());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetGfacExperimentState()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.gfacExperimentState, other.gfacExperimentState);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- lastComparison = Boolean.valueOf(isSetTimeOfStateChange()).compareTo(other.isSetTimeOfStateChange());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetTimeOfStateChange()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timeOfStateChange, other.timeOfStateChange);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- return 0;
- }
-
- public _Fields fieldForId(int fieldId) {
- return _Fields.findByThriftId(fieldId);
- }
-
- public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
- schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
- }
-
- public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
- schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder("GfacExperimentStatus(");
- boolean first = true;
-
- sb.append("gfacExperimentState:");
- if (this.gfacExperimentState == null) {
- sb.append("null");
- } else {
- sb.append(this.gfacExperimentState);
- }
- first = false;
- if (isSetTimeOfStateChange()) {
- if (!first) sb.append(", ");
- sb.append("timeOfStateChange:");
- sb.append(this.timeOfStateChange);
- first = false;
- }
- sb.append(")");
- return sb.toString();
- }
-
- public void validate() throws org.apache.thrift.TException {
- // check for required fields
- if (gfacExperimentState == null) {
- throw new org.apache.thrift.protocol.TProtocolException("Required field 'gfacExperimentState' was not present! Struct: " + toString());
- }
- // check for sub-struct validity
- }
-
- private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
- try {
- write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
- } catch (org.apache.thrift.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
- try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bitfield = 0;
- read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
- } catch (org.apache.thrift.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private static class GfacExperimentStatusStandardSchemeFactory implements SchemeFactory {
- public GfacExperimentStatusStandardScheme getScheme() {
- return new GfacExperimentStatusStandardScheme();
- }
- }
-
- private static class GfacExperimentStatusStandardScheme extends StandardScheme<GfacExperimentStatus> {
-
- public void read(org.apache.thrift.protocol.TProtocol iprot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TField schemeField;
- iprot.readStructBegin();
- while (true)
- {
- schemeField = iprot.readFieldBegin();
- if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
- break;
- }
- switch (schemeField.id) {
- case 1: // GFAC_EXPERIMENT_STATE
- if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
- struct.gfacExperimentState = GfacExperimentState.findByValue(iprot.readI32());
- struct.setGfacExperimentStateIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- case 2: // TIME_OF_STATE_CHANGE
- if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
- struct.timeOfStateChange = iprot.readI64();
- struct.setTimeOfStateChangeIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- default:
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- iprot.readFieldEnd();
- }
- iprot.readStructEnd();
-
- // check for required fields of primitive type, which can't be checked in the validate method
- struct.validate();
- }
-
- public void write(org.apache.thrift.protocol.TProtocol oprot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
- struct.validate();
-
- oprot.writeStructBegin(STRUCT_DESC);
- if (struct.gfacExperimentState != null) {
- oprot.writeFieldBegin(GFAC_EXPERIMENT_STATE_FIELD_DESC);
- oprot.writeI32(struct.gfacExperimentState.getValue());
- oprot.writeFieldEnd();
- }
- if (struct.isSetTimeOfStateChange()) {
- oprot.writeFieldBegin(TIME_OF_STATE_CHANGE_FIELD_DESC);
- oprot.writeI64(struct.timeOfStateChange);
- oprot.writeFieldEnd();
- }
- oprot.writeFieldStop();
- oprot.writeStructEnd();
- }
-
- }
-
- private static class GfacExperimentStatusTupleSchemeFactory implements SchemeFactory {
- public GfacExperimentStatusTupleScheme getScheme() {
- return new GfacExperimentStatusTupleScheme();
- }
- }
-
- private static class GfacExperimentStatusTupleScheme extends TupleScheme<GfacExperimentStatus> {
-
- @Override
- public void write(org.apache.thrift.protocol.TProtocol prot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
- TTupleProtocol oprot = (TTupleProtocol) prot;
- oprot.writeI32(struct.gfacExperimentState.getValue());
- BitSet optionals = new BitSet();
- if (struct.isSetTimeOfStateChange()) {
- optionals.set(0);
- }
- oprot.writeBitSet(optionals, 1);
- if (struct.isSetTimeOfStateChange()) {
- oprot.writeI64(struct.timeOfStateChange);
- }
- }
-
- @Override
- public void read(org.apache.thrift.protocol.TProtocol prot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
- TTupleProtocol iprot = (TTupleProtocol) prot;
- struct.gfacExperimentState = GfacExperimentState.findByValue(iprot.readI32());
- struct.setGfacExperimentStateIsSet(true);
- BitSet incoming = iprot.readBitSet(1);
- if (incoming.get(0)) {
- struct.timeOfStateChange = iprot.readI64();
- struct.setTimeOfStateChangeIsSet(true);
- }
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
deleted file mode 100644
index 20099f2..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
+++ /dev/null
@@ -1,59 +0,0 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Autogenerated by Thrift Compiler (0.9.1)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
- */
-package org.apache.airavata.gfac.workspace.experiment;
-
-import org.apache.thrift.scheme.IScheme;
-import org.apache.thrift.scheme.SchemeFactory;
-import org.apache.thrift.scheme.StandardScheme;
-
-import org.apache.thrift.scheme.TupleScheme;
-import org.apache.thrift.protocol.TTupleProtocol;
-import org.apache.thrift.protocol.TProtocolException;
-import org.apache.thrift.EncodingUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.server.AbstractNonblockingServer.*;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.EnumMap;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.EnumSet;
-import java.util.Collections;
-import java.util.BitSet;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("all") public class gfacDataModelConstants {
-
- public static final String DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS";
-
- public static final String DEFAULT_PROJECT_NAME = "DEFAULT";
-
- public static final String SINGLE_APP_NODE_NAME = "SINGLE_APP_NODE";
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 15e2c63..e3705ac 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Random;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.model.error.LaunchValidationException;
@@ -73,8 +74,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat
public OrchestratorServerHandler() {
// registering with zk
try {
- String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
- + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+ String zkhostPort = AiravataZKUtils.getZKhostPort();
String airavataServerHostPort = ServerSettings.getSetting(Constants.ORCHESTRATOR_SERVER_HOST)
+ ":" + ServerSettings.getSetting(Constants.ORCHESTRATOR_SERVER_PORT);
try {
@@ -124,6 +124,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat
+
/**
* * After creating the experiment Data user have the
* * experimentID as the handler to the experiment, during the launchExperiment
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
index 23089aa..4629b15 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
@@ -21,9 +21,11 @@
package org.apache.airavata.orchestrator.core.impl;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.cpi.GfacService;
+import org.apache.airavata.gfac.core.utils.GfacExperimentState;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
@@ -74,6 +76,7 @@ public class GFACServiceJobSubmitter implements JobSubmitter,Watcher{
mutex.wait();
}
}
+ AiravataZKUtils.
String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
List<String> children = zk.getChildren(gfacServer, this);
@@ -92,6 +95,9 @@ public class GFACServiceJobSubmitter implements JobSubmitter,Watcher{
if (exists1 == null) {
zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+
+ zk.create(newExpNode + File.separator + "state", GfacExperimentState.LAUNCHED.toString().getBytes(),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
logger.info("ExperimentID: " + experimentID + " taskID: " + taskID + " is re-running due to gfac failure");
}