You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/12 23:36:36 UTC
[1/6] airavata git commit: Refactored GfacServerHandler
initialization, changed GFac interface , added GfacImpl
Repository: airavata
Updated Branches:
refs/heads/master 8a6b891d3 -> e76851d04
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
index ad9e62a..a78d3f0 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
@@ -20,13 +20,13 @@
*/
package org.apache.airavata.gfac.impl;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.Constants;
+import org.apache.airavata.gfac.core.GFacConstants;
import org.apache.airavata.gfac.core.GFacConfiguration;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.Scheduler;
@@ -98,7 +98,7 @@ public class BetterGfacImpl implements GFac {
private static String ERROR_SENT = "ErrorSent";
private ExperimentCatalog experimentCatalog;
private CuratorFramework curatorClient;
- private MonitorPublisher monitorPublisher;
+ private LocalEventPublisher localEventPublisher;
private static GFac gfacInstance;
private boolean initialized = false;
@@ -117,11 +117,10 @@ public class BetterGfacImpl implements GFac {
return gfacInstance;
}
- @Override
public boolean init(ExperimentCatalog experimentCatalog, AppCatalog appCatalog, CuratorFramework curatorClient,
- MonitorPublisher publisher) {
+ LocalEventPublisher publisher) {
this.experimentCatalog = experimentCatalog;
- monitorPublisher = publisher; // This is a EventBus common for gfac
+ localEventPublisher = publisher; // This is a EventBus common for gfac
this.curatorClient = curatorClient;
return initialized = true;
}
@@ -153,13 +152,13 @@ public class BetterGfacImpl implements GFac {
// FIXME: Here we need to update Experiment status to Failed, as we used chained update approach updating
// task status will cause to update Experiment status. Remove this chained update approach and fix this correctly (update experiment status)
if (jobExecutionContext != null) {
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
TaskStatusChangeRequestEvent event = new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity);
- monitorPublisher.publish(event);
+ localEventPublisher.publish(event);
}
throw new GFacException(e);
}
@@ -250,10 +249,10 @@ public class BetterGfacImpl implements GFac {
List<InputDataObjectType> taskInputs = taskData.getApplicationInputs();
jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getInputParamMap(taskInputs)));
- jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
+ jobExecutionContext.setProperty(GFacConstants.PROP_TOPIC, experimentID);
jobExecutionContext.setGfac(gfacInstance);
jobExecutionContext.setCuratorClient(curatorClient);
- jobExecutionContext.setMonitorPublisher(monitorPublisher);
+ jobExecutionContext.setLocalEventPublisher(localEventPublisher);
// handle job submission protocol
List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces();
@@ -431,7 +430,7 @@ public class BetterGfacImpl implements GFac {
// Register log event listener. This is required in all scenarios.
if (isNewJob(gfacExpState)) {
// In this scenario We do everything from the beginning
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status
launch(jobExecutionContext);
} else if (isCompletedJob(gfacExpState)) {
@@ -492,7 +491,7 @@ public class BetterGfacImpl implements GFac {
try {
GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
String workflowInstanceID = null;
- if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
+ if ((workflowInstanceID = (String) jobExecutionContext.getProperty(GFacConstants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
//todo implement WorkflowTrackingListener properly
}
if (gfacExpState == GfacExperimentState.PROVIDERINVOKING || gfacExpState == GfacExperimentState.JOBSUBMITTED
@@ -503,7 +502,7 @@ public class BetterGfacImpl implements GFac {
invokeProviderCancel(jobExecutionContext);
} catch (GFacException e) {
// we make the experiment as failed due to exception scenario
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
jobExecutionContext.setProperty(ERROR_SENT, "true");
throw new GFacException(e.getMessage(), e);
}
@@ -578,13 +577,13 @@ public class BetterGfacImpl implements GFac {
log.error(e.getMessage(), e);
try {
// we make the experiment as failed due to exception scenario
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
JobIdentifier jobIdentity = new JobIdentifier(
jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
+ localEventPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
} catch (NullPointerException e1) {
log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
@@ -593,7 +592,7 @@ public class BetterGfacImpl implements GFac {
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
+ localEventPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
}
@@ -632,7 +631,7 @@ public class BetterGfacImpl implements GFac {
// avoid complexity
} else {
log.info("Experiment is cancelled, so launch operation is stopping immediately");
- GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
+ GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.CANCELED);
return; // if the job is cancelled, status change is handled in cancel operation this thread simply has to be returned
}
// if (experimentID != null){
@@ -646,19 +645,19 @@ public class BetterGfacImpl implements GFac {
invokeProviderExecute(jobExecutionContext);
} else {
log.info("Experiment is cancelled, so launch operation is stopping immediately");
- GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
+ GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.CANCELED);
return;
}
} catch (Exception e) {
try {
// we make the experiment as failed due to exception scenario
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
- // monitorPublisher.publish(new
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+ // localEventPublisher.publish(new
// ExperimentStatusChangedEvent(new
// ExperimentIdentity(jobExecutionContext.getExperimentID()),
// ExperimentState.FAILED));
// Updating the task status if there's any task associated
- // monitorPublisher.publish(new TaskStatusChangeRequest(
+ // localEventPublisher.publish(new TaskStatusChangeRequest(
// new TaskIdentity(jobExecutionContext.getExperimentID(),
// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
// jobExecutionContext.getTaskData().getTaskID()),
@@ -668,17 +667,17 @@ public class BetterGfacImpl implements GFac {
jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
+ localEventPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
} catch (NullPointerException e1) {
log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
+ "NullPointerException occurred because at this point there might not have Job Created", e1, e);
- //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
+ //localEventPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
// Updating the task status if there's any task associated
TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
+ localEventPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
}
jobExecutionContext.setProperty(ERROR_SENT, "true");
@@ -689,13 +688,13 @@ public class BetterGfacImpl implements GFac {
private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws Exception {
GFacProvider provider = jobExecutionContext.getProvider();
if (provider != null) {
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
initProvider(provider, jobExecutionContext);
executeProvider(provider, jobExecutionContext);
disposeProvider(provider, jobExecutionContext);
GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
}
if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
invokeOutFlowHandlers(jobExecutionContext);
@@ -706,7 +705,7 @@ public class BetterGfacImpl implements GFac {
GFacProvider provider = jobExecutionContext.getProvider();
if (provider != null) {
if (submit) {
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
if (plState != null && plState == GfacHandlerState.INVOKING) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
@@ -717,11 +716,11 @@ public class BetterGfacImpl implements GFac {
provider.recover(jobExecutionContext);
}
GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
} else {
disposeProvider(provider, jobExecutionContext);
GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
}
}
@@ -748,7 +747,7 @@ public class BetterGfacImpl implements GFac {
private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws Exception {
GFacProvider provider = jobExecutionContext.getProvider();
if (provider != null) {
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
@@ -759,7 +758,7 @@ public class BetterGfacImpl implements GFac {
provider.recover(jobExecutionContext);
}
GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
}
if (GFacUtils.isSynchronousMode(jobExecutionContext))
@@ -815,7 +814,7 @@ public class BetterGfacImpl implements GFac {
private void invokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
try {
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.INHANDLERSINVOKING));
for (GFacHandlerConfig handlerClassName : handlers) {
if (!isCancelling(jobExecutionContext)) {
@@ -842,11 +841,11 @@ public class BetterGfacImpl implements GFac {
}
} else {
log.info("Experiment execution is cancelled, so InHandler invocation is going to stop");
- GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
+ GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.CANCELED);
break;
}
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.INHANDLERSINVOKED));
} catch (Exception e) {
throw new GFacException("Error Invoking Handlers:" + e.getMessage(), e);
@@ -879,7 +878,7 @@ public class BetterGfacImpl implements GFac {
}
}
try {
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
for (GFacHandlerConfig handlerClassName : handlers) {
if (!isCancel(jobExecutionContext)) {
Class<? extends GFacHandler> handlerClass;
@@ -900,7 +899,7 @@ public class BetterGfacImpl implements GFac {
handler.invoke(jobExecutionContext);
GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
} catch (Exception e) {
- GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.FAILED);
+ GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.FAILED);
try {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
@@ -913,12 +912,12 @@ public class BetterGfacImpl implements GFac {
} else {
log.info("Experiment execution is cancelled, so OutHandler invocation is stopped");
if (isCancelling(jobExecutionContext)) {
- GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
+ GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.CANCELED);
}
break;
}
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
} catch (Exception e) {
throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
}
@@ -928,7 +927,7 @@ public class BetterGfacImpl implements GFac {
// At this point all the execution is finished so we update the task and experiment statuses.
// Handler authors does not have to worry about updating experiment or task statuses.
-// monitorPublisher.publish(new
+// localEventPublisher.publish(new
// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
// ExperimentState.COMPLETED));
// Updating the task status if there's any task associated
@@ -936,8 +935,8 @@ public class BetterGfacImpl implements GFac {
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
+ localEventPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
}
@@ -952,7 +951,7 @@ public class BetterGfacImpl implements GFac {
private void reInvokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
try {
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.INHANDLERSINVOKING));
for (GFacHandlerConfig handlerClassName : handlers) {
Class<? extends GFacHandler> handlerClass;
@@ -982,7 +981,7 @@ public class BetterGfacImpl implements GFac {
throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
}
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.INHANDLERSINVOKED));
} catch (Exception e) {
try {
@@ -1016,7 +1015,7 @@ public class BetterGfacImpl implements GFac {
}
launch(jobExecutionContext);
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
for (GFacHandlerConfig handlerClassName : handlers) {
Class<? extends GFacHandler> handlerClass;
GFacHandler handler;
@@ -1076,11 +1075,11 @@ public class BetterGfacImpl implements GFac {
throw new GFacException("Error Executing a OutFlow Handler", e);
}
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
// At this point all the execution is finished so we update the task and experiment statuses.
// Handler authors does not have to worry about updating experiment or task statuses.
-// monitorPublisher.publish(new
+// localEventPublisher.publish(new
// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
// ExperimentState.COMPLETED));
// Updating the task status if there's any task associated
@@ -1089,8 +1088,8 @@ public class BetterGfacImpl implements GFac {
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
+ localEventPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
}
private boolean isCancelled(JobExecutionContext executionContext) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java
new file mode 100644
index 0000000..827ab55
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java
@@ -0,0 +1,28 @@
+package org.apache.airavata.gfac.impl;
+
+import org.apache.airavata.gfac.core.GFac;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+
+public class GFacImpl implements GFac {
+
+ @Override
+ public boolean submitProcess(ProcessContext processContext) throws GFacException {
+ return false;
+ }
+
+ @Override
+ public void invokeProcessOutFlow(ProcessContext processContext) throws GFacException {
+
+ }
+
+ @Override
+ public void reInvokeProcessOutFlow(ProcessContext processContext) throws GFacException {
+
+ }
+
+ @Override
+ public boolean cancelProcess(ProcessContext processContext) throws GFacException {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
index 048889a..b682007 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
@@ -20,7 +20,7 @@
*/
package org.apache.airavata.gfac.impl;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.GFac;
@@ -44,20 +44,20 @@ public class OutHandlerWorker implements Runnable {
private MonitorID monitorID;
- private MonitorPublisher monitorPublisher;
+ private LocalEventPublisher localEventPublisher;
private JobExecutionContext jEC;
- public OutHandlerWorker(GFac gfac, MonitorID monitorID,MonitorPublisher monitorPublisher) {
+ public OutHandlerWorker(GFac gfac, MonitorID monitorID,LocalEventPublisher localEventPublisher) {
this.gfac = gfac;
this.monitorID = monitorID;
- this.monitorPublisher = monitorPublisher;
+ this.localEventPublisher = localEventPublisher;
this.jEC = monitorID.getJobExecutionContext();
}
public OutHandlerWorker(JobExecutionContext jEC) {
this.jEC = jEC;
this.gfac = jEC.getGfac();
- this.monitorPublisher = jEC.getMonitorPublisher();
+ this.localEventPublisher = jEC.getLocalEventPublisher();
}
@Override
@@ -69,7 +69,7 @@ public class OutHandlerWorker implements Runnable {
logger.error(e.getMessage(),e);
TaskIdentifier taskIdentifier = new TaskIdentifier(monitorID.getTaskID(), monitorID.getWorkflowNodeID(),monitorID.getExperimentID(), monitorID.getJobExecutionContext().getGatewayID());
//FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
- monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier));
+ localEventPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier));
try {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
@@ -81,8 +81,8 @@ public class OutHandlerWorker implements Runnable {
// Save error details to registry
}
-// monitorPublisher.publish(monitorID.getStatus());
- monitorPublisher.publish(jEC.getJobDetails().getJobStatus());
+// localEventPublisher.publish(monitorID.getStatus());
+ localEventPublisher.publish(jEC.getJobDetails().getJobStatus());
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index a0ace45..5babd92 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -29,7 +29,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-import org.apache.airavata.gfac.core.Constants;
+import org.apache.airavata.gfac.core.GFacConstants;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.provider.AbstractProvider;
@@ -112,8 +112,8 @@ public class LocalProvider extends AbstractProvider {
initProcessBuilder(jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription());
// extra environment variables
- builder.environment().put(Constants.INPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getInputDir());
- builder.environment().put(Constants.OUTPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getOutputDir());
+ builder.environment().put(GFacConstants.INPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getInputDir());
+ builder.environment().put(GFacConstants.OUTPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getOutputDir());
// set working directory
builder.directory(new File(jobExecutionContext.getWorkingDir()));
@@ -178,7 +178,7 @@ public class LocalProvider extends AbstractProvider {
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- jobExecutionContext.getMonitorPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity));
+ jobExecutionContext.getLocalEventPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity));
} catch (IOException io) {
throw new GFacProviderException(io.getMessage(), io);
} catch (InterruptedException e) {
@@ -234,7 +234,7 @@ public class LocalProvider extends AbstractProvider {
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- jobExecutionContext.getMonitorPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity));
+ jobExecutionContext.getLocalEventPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity));
} catch (XmlException e) {
throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);
} catch (IOException io) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
index b4ac3a9..72ffad6 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
@@ -20,7 +20,6 @@
*/
package org.apache.airavata.gfac.monitor.core;
-import org.apache.airavata.common.utils.MonitorPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,7 +27,7 @@ import org.slf4j.LoggerFactory;
* This is the abstract Monitor which needs to be used by
* any Monitoring implementation which expect nto consume
* to store the status to registry. Because they have to
- * use the MonitorPublisher to publish the monitoring statuses
+ * use the LocalEventPublisher to publish the monitoring statuses
* to the Event Bus. All the Monitor statuses publish to the eventbus
* will be saved to the Registry.
*/
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 992317d..2c6b69b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -323,7 +323,7 @@ public class EmailBasedMonitor implements Runnable{
"experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
jobStatus.getJobIdentity().getTaskId());
- jobExecutionContext.getMonitorPublisher().publish(jobStatus);
+ jobExecutionContext.getLocalEventPublisher().publish(jobStatus);
}
private void writeEnvelopeOnError(Message m) throws MessagingException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index 58c0946..a7e5b90 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -75,7 +75,7 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
super.invoke(jobExecutionContext);
hpcPullMonitor.setGfac(jobExecutionContext.getGfac());
- hpcPullMonitor.setPublisher(jobExecutionContext.getMonitorPublisher());
+ hpcPullMonitor.setPublisher(jobExecutionContext.getLocalEventPublisher());
MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext);
try {
/* ZooKeeper zk = jobExecutionContext.getZk();
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 54dd8e3..d9e815b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -23,7 +23,7 @@ package org.apache.airavata.gfac.monitor.impl.pull.qstat;
import com.google.common.eventbus.EventBus;
import org.apache.airavata.common.logger.AiravataLogger;
import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
import org.apache.airavata.gfac.core.SSHApiException;
@@ -64,7 +64,7 @@ public class HPCPullMonitor extends PullMonitor {
private Map<String, ResourceConnection> connections;
- private MonitorPublisher publisher;
+ private LocalEventPublisher publisher;
private LinkedBlockingQueue<String> cancelJobList;
@@ -79,17 +79,17 @@ public class HPCPullMonitor extends PullMonitor {
public HPCPullMonitor() {
connections = new HashMap<String, ResourceConnection>();
queue = new LinkedBlockingDeque<UserMonitorData>();
- publisher = new MonitorPublisher(new EventBus());
+ publisher = new LocalEventPublisher(new EventBus());
cancelJobList = new LinkedBlockingQueue<String>();
completedJobsFromPush = new ArrayList<String>();
(new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
removeList = new ArrayList<MonitorID>();
}
- public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) {
+ public HPCPullMonitor(LocalEventPublisher localEventPublisher, AuthenticationInfo authInfo) {
connections = new HashMap<String, ResourceConnection>();
queue = new LinkedBlockingDeque<UserMonitorData>();
- publisher = monitorPublisher;
+ publisher = localEventPublisher;
authenticationInfo = authInfo;
cancelJobList = new LinkedBlockingQueue<String>();
this.completedJobsFromPush = new ArrayList<String>();
@@ -97,7 +97,7 @@ public class HPCPullMonitor extends PullMonitor {
removeList = new ArrayList<MonitorID>();
}
- public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
+ public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, LocalEventPublisher publisher) {
this.queue = queue;
this.publisher = publisher;
connections = new HashMap<String, ResourceConnection>();
@@ -396,11 +396,11 @@ public class HPCPullMonitor extends PullMonitor {
return true;
}
- public MonitorPublisher getPublisher() {
+ public LocalEventPublisher getPublisher() {
return publisher;
}
- public void setPublisher(MonitorPublisher publisher) {
+ public void setPublisher(LocalEventPublisher publisher) {
this.publisher = publisher;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
index de8cd8c..0d52f95 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
@@ -28,7 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.monitor.core.PushMonitor;
@@ -62,9 +62,9 @@ public class AMQPMonitor extends PushMonitor {
*/
private Map<String, Channel> availableChannels;
- private MonitorPublisher publisher;
+ private LocalEventPublisher publisher;
- private MonitorPublisher localPublisher;
+ private LocalEventPublisher localPublisher;
private BlockingQueue<MonitorID> runningQueue;
@@ -81,7 +81,7 @@ public class AMQPMonitor extends PushMonitor {
public AMQPMonitor(){
}
- public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<MonitorID> runningQueue,
+ public AMQPMonitor(LocalEventPublisher publisher, BlockingQueue<MonitorID> runningQueue,
BlockingQueue<MonitorID> finishQueue,
String proxyPath,String connectionName,List<String> hosts) {
this.publisher = publisher;
@@ -91,7 +91,7 @@ public class AMQPMonitor extends PushMonitor {
this.connectionName = connectionName;
this.proxyPath = proxyPath;
this.amqpHosts = hosts;
- this.localPublisher = new MonitorPublisher(new EventBus());
+ this.localPublisher = new LocalEventPublisher(new EventBus());
this.localPublisher.registerListener(this);
}
@@ -100,7 +100,7 @@ public class AMQPMonitor extends PushMonitor {
this.connectionName = connectionName;
this.proxyPath = proxyPath;
this.amqpHosts = hosts;
- this.localPublisher = new MonitorPublisher(new EventBus());
+ this.localPublisher = new LocalEventPublisher(new EventBus());
this.localPublisher.registerListener(this);
}
@@ -230,11 +230,11 @@ public class AMQPMonitor extends PushMonitor {
this.availableChannels = availableChannels;
}
- public MonitorPublisher getPublisher() {
+ public LocalEventPublisher getPublisher() {
return publisher;
}
- public void setPublisher(MonitorPublisher publisher) {
+ public void setPublisher(LocalEventPublisher publisher) {
this.publisher = publisher;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
index bd5c625..4247524 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
@@ -20,7 +20,7 @@
*/
package org.apache.airavata.gfac.monitor.impl.push.amqp;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.monitor.core.MessageParser;
import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
@@ -37,9 +37,9 @@ public class BasicConsumer implements Consumer {
private MessageParser parser;
- private MonitorPublisher publisher;
+ private LocalEventPublisher publisher;
- public BasicConsumer(MessageParser parser, MonitorPublisher publisher) {
+ public BasicConsumer(MessageParser parser, LocalEventPublisher publisher) {
this.parser = parser;
this.publisher = publisher;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
index a131557..3980dac 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
@@ -21,7 +21,7 @@
package org.apache.airavata.gfac.ssh.handler;
import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.Constants;
+import org.apache.airavata.gfac.core.GFacConstants;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
@@ -92,7 +92,7 @@ public class SSHOutputHandler extends AbstractHandler {
String timeStampedExperimentID = GFacUtils.createUniqueNameWithDate(jobExecutionContext.getExperimentID());
TaskDetails taskData = jobExecutionContext.getTaskData();
- String outputDataDir = ServerSettings.getSetting(Constants.OUTPUT_DATA_DIR, File.separator + "tmp");
+ String outputDataDir = ServerSettings.getSetting(GFacConstants.OUTPUT_DATA_DIR, File.separator + "tmp");
File localStdOutFile;
File localStdErrFile;
//FIXME: AdvancedOutput is remote location and third party transfer should work to make this work
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 38de3ba..988c604 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -26,8 +26,8 @@ import org.apache.airavata.model.workspace.experiment.TaskState;
import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.gfac.core.Constants;
+import org.apache.airavata.common.utils.LocalEventPublisher;
+import org.apache.airavata.gfac.core.GFacConstants;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.JobDescriptor;
import org.apache.airavata.gfac.core.SSHApiException;
@@ -95,7 +95,7 @@ public class SSHProvider extends AbstractProvider {
jobID = "SSH_" + jobExecutionContext.getHostName() + "_" + Calendar.getInstance().getTimeInMillis();
remoteCluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getRemoteCluster();
- String remoteFile = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME;
+ String remoteFile = jobExecutionContext.getWorkingDir() + File.separatorChar + GFacConstants.EXECUTABLE_NAME;
details.setJobID(taskID);
details.setJobDescription(remoteFile);
jobExecutionContext.setJobDetails(details);
@@ -125,7 +125,7 @@ public class SSHProvider extends AbstractProvider {
/*
* Execute
*/
- String executable = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME;
+ String executable = jobExecutionContext.getWorkingDir() + File.separatorChar + GFacConstants.EXECUTABLE_NAME;
details.setJobDescription(executable);
RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + executable + "; " + executable);
StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
@@ -141,7 +141,7 @@ public class SSHProvider extends AbstractProvider {
StringBuffer data = new StringBuffer();
JobDetails jobDetails = new JobDetails();
String hostAddress = jobExecutionContext.getHostName();
- MonitorPublisher monitorPublisher = jobExecutionContext.getMonitorPublisher();
+ LocalEventPublisher localEventPublisher = jobExecutionContext.getLocalEventPublisher();
try {
RemoteCluster remoteCluster = null;
if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
@@ -162,11 +162,11 @@ public class SSHProvider extends AbstractProvider {
if (jobID != null && !jobID.isEmpty()) {
jobDetails.setJobID(jobID);
GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.JOBSUBMITTED));
jobExecutionContext.setJobDetails(jobDetails);
if (verifyJobSubmissionByJobId(remoteCluster, jobID)) {
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.JOBSUBMITTED));
GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED);
}
@@ -179,7 +179,7 @@ public class SSHProvider extends AbstractProvider {
// JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
jobID = verifyJobId;
jobDetails.setJobID(jobID);
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.JOBSUBMITTED));
GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED);
break;
@@ -193,7 +193,7 @@ public class SSHProvider extends AbstractProvider {
+ jobDetails.getJobName() + ", both submit and verify steps doesn't return a valid JobId. Hence changing experiment state to Failed";
log.error(msg);
GFacUtils.saveErrorDetails(jobExecutionContext, msg, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.FAILED);
+ GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.FAILED);
return;
}
data.append("jobDesc=").append(jobDescriptor.toXML());
@@ -303,8 +303,8 @@ public class SSHProvider extends AbstractProvider {
out.write("#!/bin/bash\n".getBytes());
out.write(("cd " + jobExecutionContext.getWorkingDir() + "\n").getBytes());
- out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getInputDir() + "\n").getBytes());
- out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getOutputDir() + "\n")
+ out.write(("export " + GFacConstants.INPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getInputDir() + "\n").getBytes());
+ out.write(("export " + GFacConstants.OUTPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getOutputDir() + "\n")
.getBytes());
// get the env of the host and the application
List<SetEnvPaths> envPathList = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getSetEnvironment();
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java
index 049af7f..3fb97dc 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java
@@ -26,7 +26,7 @@ import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.credential.store.credential.Credential;
import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.gfac.core.Constants;
+import org.apache.airavata.gfac.core.GFacConstants;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.RequestData;
import org.apache.airavata.gfac.core.GFacUtils;
@@ -151,11 +151,11 @@ public class TokenizedSSHAuthInfo implements SSHPublicKeyFileAuthentication {
*/
public SSHCredential getDefaultCredentials() throws GFacException, ApplicationSettingsException, IOException {
Properties configurationProperties = ServerSettings.getProperties();
- String sshUserName = configurationProperties.getProperty(Constants.SSH_USER_NAME);
+ String sshUserName = configurationProperties.getProperty(GFacConstants.SSH_USER_NAME);
this.getRequestData().setRequestUser(sshUserName);
- this.privateKeyFile = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY);
- this.publicKeyFile = configurationProperties.getProperty(Constants.SSH_PUBLIC_KEY);
- this.passPhrase = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY_PASS);
+ this.privateKeyFile = configurationProperties.getProperty(GFacConstants.SSH_PRIVATE_KEY);
+ this.publicKeyFile = configurationProperties.getProperty(GFacConstants.SSH_PUBLIC_KEY);
+ this.passPhrase = configurationProperties.getProperty(GFacConstants.SSH_PRIVATE_KEY_PASS);
this.getRequestData().setRequestUser(sshUserName);
return new SSHCredential(IOUtil.readToByteArray(new File(this.privateKeyFile)), IOUtil.readToByteArray(new File(this.publicKeyFile)), this.passPhrase, requestData.getGatewayId(), sshUserName);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index 69c7df4..ce80232 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@ -27,7 +27,7 @@ import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
-import org.apache.airavata.gfac.core.Constants;
+import org.apache.airavata.gfac.core.GFacConstants;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.RequestData;
import org.apache.airavata.gfac.core.JobDescriptor;
@@ -111,7 +111,7 @@ public class GFACSSHUtils {
if(credentials.getPrivateKey()==null || credentials.getPublicKey()==null){
// now we fall back to username password authentication
Properties configurationProperties = ServerSettings.getProperties();
- tokenizedSSHAuthInfo = new DefaultPasswordAuthenticationInfo(configurationProperties.getProperty(Constants.SSH_PASSWORD));
+ tokenizedSSHAuthInfo = new DefaultPasswordAuthenticationInfo(configurationProperties.getProperty(GFacConstants.SSH_PASSWORD));
}
// This should be the login user name from compute resource preference
String loginUser = jobExecutionContext.getLoginUserName();
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
index 098b966..c63942d 100644
--- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
+++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
@@ -25,7 +25,7 @@
//import java.util.ArrayList;
//import java.util.List;
//
-//import org.apache.airavata.common.utils.MonitorPublisher;
+//import org.apache.airavata.common.utils.LocalEventPublisher;
//import org.apache.airavata.commons.gfac.type.ActualParameter;
//import org.apache.airavata.commons.gfac.type.ApplicationDescription;
//import org.apache.airavata.commons.gfac.type.HostDescription;
@@ -176,7 +176,7 @@
// LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler();
// localDirectorySetupHandler.invoke(jobExecutionContext);
// LocalProvider localProvider = new LocalProvider();
-// localProvider.setMonitorPublisher(new MonitorPublisher(new EventBus()));
+// localProvider.setLocalEventPublisher(new LocalEventPublisher(new EventBus()));
// localProvider.initialize(jobExecutionContext);
// localProvider.execute(jobExecutionContext);
// localProvider.dispose(jobExecutionContext);
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
index 0ffa02e..6364940 100644
--- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
+++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
@@ -26,7 +26,7 @@ import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory
import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.gfac.gsi.ssh.impl.HPCRemoteCluster;
import org.apache.airavata.registry.cpi.AppCatalog;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.gfac.core.JobDescriptor;
import org.apache.airavata.gfac.core.SSHApiException;
import org.apache.airavata.gfac.core.authentication.GSIAuthenticationInfo;
@@ -68,7 +68,7 @@ public class AMQPMonitorTest {
private String certificateLocation;
private String pbsFilePath;
private String workingDirectory;
- private MonitorPublisher monitorPublisher;
+ private LocalEventPublisher localEventPublisher;
private BlockingQueue<MonitorID> finishQueue;
private BlockingQueue<MonitorID> pushQueue;
private Thread pushThread;
@@ -96,13 +96,13 @@ public class AMQPMonitorTest {
throw new Exception("Need my proxy user name password to run tests.");
}
- monitorPublisher = new MonitorPublisher(new EventBus());
+ localEventPublisher = new LocalEventPublisher(new EventBus());
pushQueue = new LinkedBlockingQueue<MonitorID>();
finishQueue = new LinkedBlockingQueue<MonitorID>();
final AMQPMonitor amqpMonitor = new
- AMQPMonitor(monitorPublisher,
+ AMQPMonitor(localEventPublisher,
pushQueue, finishQueue,proxyFilePath,"xsede",
Arrays.asList("info1.dyn.teragrid.org,info2.dyn.teragrid.org".split(",")));
try {
@@ -195,7 +195,7 @@ public class AMQPMonitorTest {
pushThread.interrupt();
}
}
- monitorPublisher.registerListener(new InnerClassAMQP());
+ localEventPublisher.registerListener(new InnerClassAMQP());
// try {
// pushThread.join(5000);
// Iterator<MonitorID> iterator = pushQueue.iterator();
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
index 70727f7..cc33a96 100644
--- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
+++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
@@ -26,7 +26,7 @@
//import java.util.concurrent.BlockingQueue;
//import java.util.concurrent.LinkedBlockingQueue;
//
-//import org.apache.airavata.common.utils.MonitorPublisher;
+//import org.apache.airavata.common.utils.LocalEventPublisher;
//import org.apache.airavata.commons.gfac.type.HostDescription;
//import org.apache.airavata.gfac.core.monitor.MonitorID;
//import org.apache.airavata.gfac.monitor.HPCMonitorID;
@@ -55,7 +55,7 @@
// private String pbsFilePath;
// private String workingDirectory;
// private HostDescription hostDescription;
-// private MonitorPublisher monitorPublisher;
+// private LocalEventPublisher monitorPublisher;
// private BlockingQueue<UserMonitorData> pullQueue;
// private Thread monitorThread;
//
@@ -76,7 +76,7 @@
// throw new Exception("Need my proxy user name password to run tests.");
// }
//
-// monitorPublisher = new MonitorPublisher(new EventBus());
+// monitorPublisher = new LocalEventPublisher(new EventBus());
// class InnerClassQstat {
//
// @Subscribe
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 9e89788..382cd5c 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -21,6 +21,11 @@
package org.apache.airavata.gfac.server;
import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.exception.AiravataStartupException;
+import org.apache.airavata.common.utils.LocalEventPublisher;
+import org.apache.airavata.gfac.core.GFacConstants;
+import org.apache.airavata.gfac.core.GFacWorker;
+import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -28,16 +33,12 @@ import org.apache.airavata.common.logger.AiravataLogger;
import org.apache.airavata.common.logger.AiravataLoggerFactory;
import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.gfac.core.GFacConfiguration;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFac;
-import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
import org.apache.airavata.gfac.core.GFacUtils;
@@ -63,18 +64,13 @@ import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
-import org.xml.sax.SAXException;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.xpath.XPathExpressionException;
import java.io.File;
-import java.io.IOException;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
@@ -84,53 +80,67 @@ import java.util.concurrent.BlockingQueue;
public class GfacServerHandler implements GfacService.Iface {
private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class);
- private static RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+ private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
private static int requestCount=0;
private ExperimentCatalog experimentCatalog;
private AppCatalog appCatalog;
- private String gatewayName;
private String airavataUserName;
private CuratorFramework curatorClient;
- private MonitorPublisher publisher;
- private String gfacServer;
- private String gfacExperiments;
+ private LocalEventPublisher localEventPublisher;
private String airavataServerHostPort;
private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
private static File gfacConfigFile;
private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
- public GfacServerHandler() throws Exception {
+ public GfacServerHandler() throws AiravataStartupException {
try {
-
- // start curator client
- String zkhostPort = AiravataZKUtils.getZKhostPort();
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
- curatorClient = CuratorFrameworkFactory.newClient(zkhostPort, retryPolicy);
- curatorClient.start();
- gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
- gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
- airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
- + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
- storeServerConfig();
- publisher = new MonitorPublisher(new EventBus());
+ startCuratorClient();
+ initZkDataStructure();
+ initAMQPClient();
+ localEventPublisher = new LocalEventPublisher(new EventBus());
experimentCatalog = RegistryFactory.getDefaultExpCatalog();
appCatalog = RegistryFactory.getAppCatalog();
- setGatewayProperties();
- startDaemonHandlers();
- // initializing Better Gfac Instance
- BetterGfacImpl.getInstance().init(experimentCatalog, appCatalog, curatorClient, publisher);
- if (ServerSettings.isGFacPassiveMode()) {
- rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
- rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
- }
- startStatusUpdators(experimentCatalog, curatorClient, publisher, rabbitMQTaskLaunchConsumer);
-
+ startStatusUpdators(experimentCatalog, curatorClient, localEventPublisher, rabbitMQTaskLaunchConsumer);
} catch (Exception e) {
- throw new Exception("Error initialising GFAC", e);
+ throw new AiravataStartupException("Gfac Server Initialization error ", e);
}
}
+ private void initAMQPClient() throws AiravataException {
+ rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
+ rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
+ }
+
+ private void startCuratorClient() throws ApplicationSettingsException {
+ String connectionSting = ServerSettings.getZookeeperConnection();
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+ curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
+ curatorClient.start();
+ }
+
+ private void initZkDataStructure() throws Exception {
+ /*
+ *|/servers
+ * - /gfac
+ * - /gfac-node0 (localhost:2181)
+ *|/experiments
+ */
+ airavataServerHostPort = ServerSettings.getGfacServerHost() + ":" + ServerSettings.getGFacServerPort();
+ // create PERSISTENT nodes
+ ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacUtils.getZKGfacServersParentPath());
+ ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacConstants.ZOOKEEPER_EXPERIMENT_NODE);
+ // create EPHEMERAL server name node
+ String gfacName = ServerSettings.getGFacServerName();
+ if (curatorClient.checkExists().forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? gfacName : "/" + gfacName)) == null) {
+ curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+ .forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? gfacName : "/" + gfacName));
+
+ }
+ curatorClient.setData().withVersion(-1).forPath(GFacUtils.getZKGfacServersParentPath() +
+ (gfacName.startsWith("/") ? gfacName : "/" + gfacName), new String(airavataServerHostPort).getBytes());
+ }
+
public static void main(String[] args) {
RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = null;
try {
@@ -140,29 +150,6 @@ public class GfacServerHandler implements GfacService.Iface {
logger.error(e.getMessage(), e);
}
}
- private void storeServerConfig() throws Exception {
- Stat stat = curatorClient.checkExists().forPath(gfacServer);
- if (stat == null) {
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
- .forPath(gfacServer, new byte[0]);
- }
- String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
- String instanceNode = gfacServer + File.separator + instanceId;
- stat = curatorClient.checkExists().forPath(instanceNode);
- if (stat == null) {
- curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(instanceNode, airavataServerHostPort.getBytes());
- curatorClient.getChildren().watched().forPath(instanceNode);
- }
- stat = curatorClient.checkExists().forPath(gfacExperiments);
- if (stat == null) {
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(gfacExperiments, airavataServerHostPort.getBytes());
- }
- stat = curatorClient.checkExists().forPath(gfacExperiments + File.separator + instanceId);
- if (stat == null) {
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
- .forPath(gfacExperiments + File.separator + instanceId, airavataServerHostPort.getBytes());
- }
- }
private long ByateArrayToLong(byte[] data) {
long value = 0;
@@ -190,20 +177,27 @@ public class GfacServerHandler implements GfacService.Iface {
* *
* *
*
- * @param experimentId
- * @param taskId
- * @param gatewayId
+ * @param experimentId - ExperimentModel id in registry
+ * @param processId - processModel id in registry
+ * @param gatewayId - gateway Identification
*/
- public boolean submitJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException {
+ public boolean submitJob(String experimentId, String processId, String gatewayId, String tokenId) throws TException {
requestCount++;
logger.info("-----------------------------------------------------" + requestCount + "-----------------------------------------------------");
- logger.infoId(experimentId, "GFac Received submit job request for the Experiment: {} TaskId: {}", experimentId, taskId);
+ logger.infoId(experimentId, "GFac Received submit job request for the Experiment: {} process: {}", experimentId, processId);
+ ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId);
+ processContext.setAppCatalog(appCatalog);
+ processContext.setExperimentCatalog(experimentCatalog);
+ processContext.setCuratorClient(curatorClient);
+ processContext.setLocalEventPublisher(localEventPublisher);
+
+ GFacWorker worker = new GFacWorker(processContext);
InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(BetterGfacImpl.getInstance(), experimentId,
- taskId, gatewayId, tokenId);
+ processId, gatewayId, tokenId);
// try {
// if( gfac.submitJob(experimentId, taskId, gatewayId)){
logger.debugId(experimentId, "Submitted job to the Gfac Implementation, experiment {}, task {}, gateway " +
- "{}", experimentId, taskId, gatewayId);
+ "{}", experimentId, processId, gatewayId);
GFacThreadPoolExecutor.getCachedThreadPool().execute(inputHandlerWorker);
@@ -235,60 +229,14 @@ public class GfacServerHandler implements GfacService.Iface {
this.experimentCatalog = experimentCatalog;
}
- public String getGatewayName() {
- return gatewayName;
- }
-
- public void setGatewayName(String gatewayName) {
- this.gatewayName = gatewayName;
- }
-
- public String getAiravataUserName() {
- return airavataUserName;
- }
-
- public void setAiravataUserName(String airavataUserName) {
- this.airavataUserName = airavataUserName;
- }
-
- protected void setGatewayProperties() throws ApplicationSettingsException {
- setAiravataUserName(ServerSettings.getDefaultUser());
- setGatewayName(ServerSettings.getDefaultUserGateway());
- }
private GFac getGfac() throws TException {
GFac gFac = BetterGfacImpl.getInstance();
- gFac.init(experimentCatalog, appCatalog, curatorClient, publisher);
+ gFac.init(experimentCatalog, appCatalog, curatorClient, localEventPublisher);
return gFac;
}
- public void startDaemonHandlers() {
- List<GFacHandlerConfig> daemonHandlerConfig = null;
- String className = null;
- try {
- URL resource = GfacServerHandler.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
- if (resource != null) {
- gfacConfigFile = new File(resource.getPath());
- }
- daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile);
- for (GFacHandlerConfig handlerConfig : daemonHandlerConfig) {
- className = handlerConfig.getClassName();
- Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class);
- ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance();
- threadedHandler.initProperties(handlerConfig.getProperties());
- daemonHandlers.add(threadedHandler);
- }
- } catch (ParserConfigurationException | IOException | XPathExpressionException | ClassNotFoundException |
- InstantiationException | IllegalAccessException | GFacHandlerException | SAXException e) {
- logger.error("Error parsing gfac-config.xml, double check the xml configuration", e);
- }
- for (ThreadedHandler tHandler : daemonHandlers) {
- (new Thread(tHandler)).start();
- }
- }
-
-
- public static void startStatusUpdators(ExperimentCatalog experimentCatalog, CuratorFramework curatorClient, MonitorPublisher publisher,
+ public static void startStatusUpdators(ExperimentCatalog experimentCatalog, CuratorFramework curatorClient, LocalEventPublisher publisher,
RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
try {
@@ -337,9 +285,9 @@ public class GfacServerHandler implements GfacService.Iface {
private String experimentNode;
private String nodeName;
- public TaskLaunchMessageHandler() {
- experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
- nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME,"gfac-node0");
+ public TaskLaunchMessageHandler() throws ApplicationSettingsException {
+ experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
+ nodeName = ServerSettings.getGFacServerName();
}
public Map<String, Object> getProperties() {
@@ -366,7 +314,6 @@ public class GfacServerHandler implements GfacService.Iface {
status.setExperimentState(ExperimentState.EXECUTING);
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status, event.getExperimentId());
- experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
try {
GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), curatorClient,
experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java
----------------------------------------------------------------------
diff --git a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java
index e133be3..0f929df 100644
--- a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java
+++ b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java
@@ -21,7 +21,7 @@
package org.apache.airavata.workflow.engine.util;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
public class ProxyMonitorPublisher implements AbstractActivityListener{
@@ -33,11 +33,11 @@ public class ProxyMonitorPublisher implements AbstractActivityListener{
setupConfigurations=configurations;
}
- private static MonitorPublisher getPublisher(){
+ private static LocalEventPublisher getPublisher(){
if (setupConfigurations!=null) {
for (Object configuration : setupConfigurations) {
- if (configuration instanceof MonitorPublisher){
- return (MonitorPublisher) configuration;
+ if (configuration instanceof LocalEventPublisher){
+ return (LocalEventPublisher) configuration;
}
}
}
[3/6] airavata git commit: Implemented gfac worker
Posted by sh...@apache.org.
Implemented gfac worker
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2654de09
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2654de09
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2654de09
Branch: refs/heads/master
Commit: 2654de094e76a937643b2ea3ca9fff5efa88c50e
Parents: 8535ff1
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Fri Jun 12 14:40:34 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Fri Jun 12 14:40:34 2015 -0400
----------------------------------------------------------------------
.../airavata/common/logger/AiravataLogger.java | 677 -------------------
.../common/logger/AiravataLoggerFactory.java | 34 -
.../common/logger/AiravataLoggerImpl.java | 323 ---------
.../airavata/common/utils/ServerSettings.java | 22 +
.../main/resources/airavata-server.properties | 4 +-
.../org/apache/airavata/gfac/core/GFac.java | 36 +-
.../gfac/core/GFacThreadPoolExecutor.java | 6 +-
.../apache/airavata/gfac/core/GFacUtils.java | 5 +
.../apache/airavata/gfac/core/GFacWorker.java | 37 -
.../airavata/gfac/impl/BetterGfacImpl.java | 1 +
.../apache/airavata/gfac/impl/GFacEngine.java | 53 ++
.../org/apache/airavata/gfac/impl/GFacImpl.java | 28 -
.../apache/airavata/gfac/impl/GFacWorker.java | 87 +++
.../airavata/gfac/server/GfacServerHandler.java | 95 ++-
14 files changed, 234 insertions(+), 1174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLogger.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLogger.java b/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLogger.java
deleted file mode 100644
index 9477cc9..0000000
--- a/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLogger.java
+++ /dev/null
@@ -1,677 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.common.logger;
-
-public interface AiravataLogger{
-
- /**
- * Return the name of this <code>Logger</code> instance.
- *
- * @return name of this logger instance
- */
- String getName();
-
- /**
- * Is the logger instance enabled for the TRACE level?
- *
- * @return True if this Logger is enabled for the TRACE level,
- * false otherwise.
- * @since 1.4
- */
- boolean isTraceEnabled();
-
- /**
- * Log a message at the TRACE level.
- *
- * @param msg the message string to be logged
- * @since 1.4
- */
- void trace(String msg);
-
- /**
- * Log a message at the TRACE level.
- *
- * @param etjId - Experiment , Task or Job Id
- * @param msg the message string to be logged
- * @since 1.4
- */
- void traceId(String etjId, String msg);
-
- /**
- * Log a message at the TRACE level according to the specified format
- * and argument.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the TRACE level. </p>
- *
- * @param format the format string
- * @param arg the argument
- * @since 1.4
- */
- void trace(String format, Object arg);
-
- /**
- * Log a message at the TRACE level according to the specified format
- * and argument.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the TRACE level. </p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arg the argument
- * @since 1.4
- */
- void traceId(String etjId, String format, Object arg);
-
- /**
- * Log a message at the TRACE level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the TRACE level. </p>
- *
- * @param format the format string
- * @param arg1 the first argument
- * @param arg2 the second argument
- * @since 1.4
- */
- void trace(String format, Object arg1, Object arg2);
-
- /**
- * Log a message at the TRACE level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the TRACE level. </p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arg1 the first argument
- * @param arg2 the second argument
- * @since 1.4
- */
- void traceId(String etjId, String format, Object arg1, Object arg2);
-
- /**
- * Log a message at the TRACE level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous string concatenation when the logger
- * is disabled for the TRACE level. However, this variant incurs the hidden
- * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method,
- * even if this logger is disabled for TRACE. The variants taking {@link #trace(String, Object) one} and
- * {@link #trace(String, Object, Object) two} arguments exist solely in order to avoid this hidden cost.</p>
- *
- * @param format the format string
- * @param arguments a list of 3 or more arguments
- * @since 1.4
- */
- void trace(String format, Object... arguments);
-
- /**
- * Log a message at the TRACE level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous string concatenation when the logger
- * is disabled for the TRACE level. However, this variant incurs the hidden
- * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method,
- * even if this logger is disabled for TRACE. The variants taking {@link #trace(String, Object) one} and
- * {@link #trace(String, Object, Object) two} arguments exist solely in order to avoid this hidden cost.</p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arguments a list of 3 or more arguments
- * @since 1.4
- */
- void traceId(String etjId, String format, Object... arguments);
-
- /**
- * Log an exception (throwable) at the TRACE level with an
- * accompanying message.
- *
- * @param msg the message accompanying the exception
- * @param t the exception (throwable) to log
- * @since 1.4
- */
- void trace(String msg, Throwable t);
-
- /**
- * Log an exception (throwable) at the TRACE level with an
- * accompanying message.
- *
- * @param etjId - Experiment , Task or Job Id
- * @param msg the message accompanying the exception
- * @param t the exception (throwable) to log
- * @since 1.4
- */
- void traceId(String etjId, String msg, Throwable t);
-
- /**
- * Is the logger instance enabled for the DEBUG level?
- *
- * @return True if this Logger is enabled for the DEBUG level,
- * false otherwise.
- */
- boolean isDebugEnabled();
-
- /**
- * Log a message at the DEBUG level.
- *
- * @param msg the message string to be logged
- */
- void debug(String msg);
-
- /**
- * Log a message at the DEBUG level.
- *
- * @param etjId - Experiment , Task or Job Id
- * @param msg the message string to be logged
- */
- void debugId(String etjId, String msg);
-
- /**
- * Log a message at the DEBUG level according to the specified format
- * and argument.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the DEBUG level. </p>
- *
- * @param format the format string
- * @param arg the argument
- */
- void debug(String format, Object arg);
-
- /**
- * Log a message at the DEBUG level according to the specified format
- * and argument.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the DEBUG level. </p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arg the argument
- */
- void debugId(String etjId, String format, Object arg);
-
- /**
- * Log a message at the DEBUG level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the DEBUG level. </p>
- *
- * @param format the format string
- * @param arg1 the first argument
- * @param arg2 the second argument
- */
- void debug(String format, Object arg1, Object arg2);
-
- /**
- * Log a message at the DEBUG level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the DEBUG level. </p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arg1 the first argument
- * @param arg2 the second argument
- */
- void debugId(String etjId, String format, Object arg1, Object arg2);
-
- /**
- * Log a message at the DEBUG level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous string concatenation when the logger
- * is disabled for the DEBUG level. However, this variant incurs the hidden
- * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method,
- * even if this logger is disabled for DEBUG. The variants taking
- * {@link #debug(String, Object) one} and {@link #debug(String, Object, Object) two}
- * arguments exist solely in order to avoid this hidden cost.</p>
- *
- * @param format the format string
- * @param arguments a list of 3 or more arguments
- */
- void debug(String format, Object... arguments);
-
- /**
- * Log a message at the DEBUG level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous string concatenation when the logger
- * is disabled for the DEBUG level. However, this variant incurs the hidden
- * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method,
- * even if this logger is disabled for DEBUG. The variants taking
- * {@link #debug(String, Object) one} and {@link #debug(String, Object, Object) two}
- * arguments exist solely in order to avoid this hidden cost.</p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arguments a list of 3 or more arguments
- */
- void debugId(String etjId, String format, Object... arguments);
-
- /**
- * Log an exception (throwable) at the DEBUG level with an
- * accompanying message.
- *
- * @param msg the message accompanying the exception
- * @param t the exception (throwable) to log
- */
- void debug(String msg, Throwable t);
-
- /**
- * Log an exception (throwable) at the DEBUG level with an
- * accompanying message.
- *
- * @param etjId - Experiment , Task or Job Id
- * @param msg the message accompanying the exception
- * @param t the exception (throwable) to log
- */
- void debugId(String etjId, String msg, Throwable t);
-
- /**
- * Is the logger instance enabled for the INFO level?
- *
- * @return True if this Logger is enabled for the INFO level,
- * false otherwise.
- */
- boolean isInfoEnabled();
-
- /**
- * Log a message at the INFO level.
- *
- * @param msg the message string to be logged
- */
- void info(String msg);
-
- /**
- * Log a message at the INFO level.
- *
- * @param etjId - Experiment , Task or Job Id
- * @param msg the message string to be logged
- */
- void infoId(String etjId, String msg);
-
- /**
- * Log a message at the INFO level according to the specified format
- * and argument.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the INFO level. </p>
- *
- * @param format the format string
- * @param arg the argument
- */
- void info(String format, Object arg);
-
- /**
- * Log a message at the INFO level according to the specified format
- * and argument.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the INFO level. </p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arg the argument
- */
- void infoId(String etjId, String format, Object arg);
-
- /**
- * Log a message at the INFO level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the INFO level. </p>
- *
- * @param format the format string
- * @param arg1 the first argument
- * @param arg2 the second argument
- */
- void info(String format, Object arg1, Object arg2);
-
- /**
- * Log a message at the INFO level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the INFO level. </p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arg1 the first argument
- * @param arg2 the second argument
- */
- void infoId(String etjId, String format, Object arg1, Object arg2);
-
- /**
- * Log a message at the INFO level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous string concatenation when the logger
- * is disabled for the INFO level. However, this variant incurs the hidden
- * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method,
- * even if this logger is disabled for INFO. The variants taking
- * {@link #info(String, Object) one} and {@link #info(String, Object, Object) two}
- * arguments exist solely in order to avoid this hidden cost.</p>
- *
- * @param format the format string
- * @param arguments a list of 3 or more arguments
- */
- void info(String format, Object... arguments);
-
- /**
- * Log a message at the INFO level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous string concatenation when the logger
- * is disabled for the INFO level. However, this variant incurs the hidden
- * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method,
- * even if this logger is disabled for INFO. The variants taking
- * {@link #info(String, Object) one} and {@link #info(String, Object, Object) two}
- * arguments exist solely in order to avoid this hidden cost.</p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arguments a list of 3 or more arguments
- */
- void infoId(String etjId, String format, Object... arguments);
-
- /**
- * Log an exception (throwable) at the INFO level with an
- * accompanying message.
- *
- * @param msg the message accompanying the exception
- * @param t the exception (throwable) to log
- */
- void info(String msg, Throwable t);
-
- /**
- * Log an exception (throwable) at the INFO level with an
- * accompanying message.
- *
- * @param etjId - Experiment , Task or Job Id
- * @param msg the message accompanying the exception
- * @param t the exception (throwable) to log
- */
- void infoId(String etjId, String msg, Throwable t);
-
- /**
- * Is the logger instance enabled for the WARN level?
- *
- * @return True if this Logger is enabled for the WARN level,
- * false otherwise.
- */
- boolean isWarnEnabled();
-
- /**
- * Log a message at the WARN level.
- *
- * @param msg the message string to be logged
- */
- void warn(String msg);
-
- /**
- * Log a message at the WARN level.
- *
- * @param etjId - Experiment , Task or Job Id
- * @param msg the message string to be logged
- */
- void warnId(String etjId, String msg);
-
- /**
- * Log a message at the WARN level according to the specified format
- * and argument.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the WARN level. </p>
- *
- * @param format the format string
- * @param arg the argument
- */
- void warn(String format, Object arg);
-
- /**
- * Log a message at the WARN level according to the specified format
- * and argument.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the WARN level. </p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arg the argument
- */
- void warnId(String etjId, String format, Object arg);
-
- /**
- * Log a message at the WARN level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous string concatenation when the logger
- * is disabled for the WARN level. However, this variant incurs the hidden
- * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method,
- * even if this logger is disabled for WARN. The variants taking
- * {@link #warn(String, Object) one} and {@link #warn(String, Object, Object) two}
- * arguments exist solely in order to avoid this hidden cost.</p>
- *
- * @param format the format string
- * @param arguments a list of 3 or more arguments
- */
- void warn(String format, Object... arguments);
-
- /**
- * Log a message at the WARN level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous string concatenation when the logger
- * is disabled for the WARN level. However, this variant incurs the hidden
- * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method,
- * even if this logger is disabled for WARN. The variants taking
- * {@link #warn(String, Object) one} and {@link #warn(String, Object, Object) two}
- * arguments exist solely in order to avoid this hidden cost.</p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arguments a list of 3 or more arguments
- */
- void warnId(String etjId, String format, Object... arguments);
-
- /**
- * Log a message at the WARN level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the WARN level. </p>
- *
- * @param format the format string
- * @param arg1 the first argument
- * @param arg2 the second argument
- */
- void warn(String format, Object arg1, Object arg2);
-
- /**
- * Log a message at the WARN level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the WARN level. </p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arg1 the first argument
- * @param arg2 the second argument
- */
- void warnId(String etjId, String format, Object arg1, Object arg2);
-
- /**
- * Log an exception (throwable) at the WARN level with an
- * accompanying message.
- *
- * @param msg the message accompanying the exception
- * @param t the exception (throwable) to log
- */
- void warn(String msg, Throwable t);
-
- /**
- * Log an exception (throwable) at the WARN level with an
- * accompanying message.
- *
- * @param etjId - Experiment , Task or Job Id
- * @param msg the message accompanying the exception
- * @param t the exception (throwable) to log
- */
- void warnId(String etjId, String msg, Throwable t);
-
- /**
- * Is the logger instance enabled for the ERROR level?
- *
- * @return True if this Logger is enabled for the ERROR level,
- * false otherwise.
- */
- boolean isErrorEnabled();
-
- /**
- * Log a message at the ERROR level.
- *
- * @param msg the message string to be logged
- */
- void error(String msg);
-
- /**
- * Log a message at the ERROR level.
- *
- * @param etjId - Experiment , Task or Job Id
- * @param msg the message string to be logged
- */
- void errorId(String etjId, String msg);
-
- /**
- * Log a message at the ERROR level according to the specified format
- * and argument.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the ERROR level. </p>
- *
- * @param format the format string
- * @param arg the argument
- */
- void error(String format, Object arg);
-
- /**
- * Log a message at the ERROR level according to the specified format
- * and argument.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the ERROR level. </p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arg the argument
- */
- void errorId(String etjId, String format, Object arg);
-
- /**
- * Log a message at the ERROR level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the ERROR level. </p>
- *
- * @param format the format string
- * @param arg1 the first argument
- * @param arg2 the second argument
- */
- void error(String format, Object arg1, Object arg2);
-
- /**
- * Log a message at the ERROR level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous object creation when the logger
- * is disabled for the ERROR level. </p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arg1 the first argument
- * @param arg2 the second argument
- */
- void errorId(String etjId, String format, Object arg1, Object arg2);
-
- /**
- * Log a message at the ERROR level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous string concatenation when the logger
- * is disabled for the ERROR level. However, this variant incurs the hidden
- * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method,
- * even if this logger is disabled for ERROR. The variants taking
- * {@link #error(String, Object) one} and {@link #error(String, Object, Object) two}
- * arguments exist solely in order to avoid this hidden cost.</p>
- *
- * @param format the format string
- * @param arguments a list of 3 or more arguments
- */
- void error(String format, Object... arguments);
-
- /**
- * Log a message at the ERROR level according to the specified format
- * and arguments.
- * <p/>
- * <p>This form avoids superfluous string concatenation when the logger
- * is disabled for the ERROR level. However, this variant incurs the hidden
- * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method,
- * even if this logger is disabled for ERROR. The variants taking
- * {@link #error(String, Object) one} and {@link #error(String, Object, Object) two}
- * arguments exist solely in order to avoid this hidden cost.</p>
- *
- * @param etjId - Experiment , Task or Job Id
- * @param format the format string
- * @param arguments a list of 3 or more arguments
- */
- void errorId(String etjId, String format, Object... arguments);
-
- /**
- * Log an exception (throwable) at the ERROR level with an
- * accompanying message.
- *
- * @param msg the message accompanying the exception
- * @param t the exception (throwable) to log
- */
- void error(String msg, Throwable t);
-
- /**
- * Log an exception (throwable) at the ERROR level with an
- * accompanying message.
- *
- * @param etjId - Experiment , Task or Job Id
- * @param msg the message accompanying the exception
- * @param t the exception (throwable) to log
- */
- void errorId(String etjId, String msg, Throwable t);
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerFactory.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerFactory.java b/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerFactory.java
deleted file mode 100644
index a1a9462..0000000
--- a/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.common.logger;
-
-public class AiravataLoggerFactory {
-
- public static AiravataLogger getLogger(Class aClass) {
- return new AiravataLoggerImpl(aClass);
- }
-
- public static AiravataLogger getLogger(String className) {
- return new AiravataLoggerImpl(className);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerImpl.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerImpl.java b/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerImpl.java
deleted file mode 100644
index 74ab401..0000000
--- a/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerImpl.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.common.logger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AiravataLoggerImpl implements AiravataLogger{
-
- private Logger logger;
-
- public AiravataLoggerImpl(Class aClass) {
- logger = LoggerFactory.getLogger(aClass);
- }
-
- public AiravataLoggerImpl(String className) {
- logger = LoggerFactory.getLogger(className);
- }
-
-
- @Override
- public String getName() {
- return logger.getName();
- }
-
- @Override
- public boolean isTraceEnabled() {
- return logger.isTraceEnabled();
- }
-
- @Override
- public void trace(String msg) {
- logger.trace(msg);
- }
-
- @Override
- public void traceId(String etjId, String msg) {
- logger.trace(getAiravataLogMessage(etjId, msg));
- }
-
- @Override
- public void trace(String format, Object arg) {
- logger.trace(format, arg);
- }
-
- @Override
- public void traceId(String etjId, String format, Object arg) {
- logger.trace(getAiravataLogMessage(etjId, format), arg);
- }
-
- @Override
- public void trace(String format, Object arg1, Object arg2) {
- logger.trace(format, arg1, arg2);
- }
-
- @Override
- public void traceId(String etjId, String format, Object arg1, Object arg2) {
- logger.trace(getAiravataLogMessage(etjId,format), arg1, arg2);
- }
-
- @Override
- public void trace(String format, Object... arguments) {
- logger.trace(format, arguments);
- }
-
- @Override
- public void traceId(String etjId, String format, Object... arguments) {
- logger.trace(getAiravataLogMessage(etjId, format), arguments);
- }
-
- @Override
- public void trace(String msg, Throwable t) {
- logger.trace(msg, t);
- }
-
- @Override
- public void traceId(String etjId, String msg, Throwable t) {
- logger.trace(getAiravataLogMessage(etjId, msg), t);
- }
-
- @Override
- public boolean isDebugEnabled() {
- return logger.isDebugEnabled();
- }
-
- @Override
- public void debug(String msg) {
- logger.debug(msg);
- }
-
- @Override
- public void debugId(String etjId, String msg) {
- logger.debug(getAiravataLogMessage(etjId, msg));
- }
-
- @Override
- public void debug(String format, Object arg) {
- logger.debug(format, arg);
- }
-
- @Override
- public void debugId(String etjId, String format, Object arg) {
- logger.debug(getAiravataLogMessage(etjId, format), arg);
- }
-
- @Override
- public void debug(String format, Object arg1, Object arg2) {
- logger.debug(format, arg1, arg2);
- }
-
- @Override
- public void debugId(String etjId, String format, Object arg1, Object arg2) {
- logger.debug(getAiravataLogMessage(etjId, format), arg1, arg2);
- }
-
- @Override
- public void debug(String format, Object... arguments) {
- logger.debug(format, arguments);
- }
-
- @Override
- public void debugId(String etjId, String format, Object... arguments) {
- logger.debug(getAiravataLogMessage(etjId, format), arguments);
- }
-
- @Override
- public void debug(String msg, Throwable t) {
- logger.debug(msg, t);
- }
-
- @Override
- public void debugId(String etjId, String msg, Throwable t) {
- logger.debug(getAiravataLogMessage(etjId, msg), t);
- }
-
- @Override
- public boolean isInfoEnabled() {
- return logger.isInfoEnabled();
- }
-
- @Override
- public void info(String msg) {
- logger.info(msg);
- }
-
- @Override
- public void infoId(String etjId, String msg) {
- logger.info(getAiravataLogMessage(etjId, msg));
- }
-
- @Override
- public void info(String format, Object arg) {
- logger.info(format, arg);
- }
-
- @Override
- public void infoId(String etjId, String format, Object arg) {
- logger.info(getAiravataLogMessage(etjId, format), arg);
- }
-
- @Override
- public void info(String format, Object arg1, Object arg2) {
- logger.info(format, arg1, arg2);
- }
-
- @Override
- public void infoId(String etjId, String format, Object arg1, Object arg2) {
- logger.info(getAiravataLogMessage(etjId, format), arg1, arg2);
- }
-
- @Override
- public void info(String format, Object... arguments) {
- logger.info(format, arguments);
- }
-
- @Override
- public void infoId(String etjId, String format, Object... arguments) {
- logger.info(getAiravataLogMessage(etjId, format), arguments);
- }
-
- @Override
- public void info(String msg, Throwable t) {
- logger.info(msg, t);
- }
-
- @Override
- public void infoId(String etjId, String msg, Throwable t) {
- logger.info(getAiravataLogMessage(etjId, msg), t);
- }
-
- @Override
- public boolean isWarnEnabled() {
- return logger.isWarnEnabled();
- }
-
- @Override
- public void warn(String msg) {
- logger.warn(msg);
- }
-
- @Override
- public void warnId(String etjId, String msg) {
- logger.warn(getAiravataLogMessage(etjId, msg));
- }
-
- @Override
- public void warn(String format, Object arg) {
- logger.warn(format, arg);
- }
-
- @Override
- public void warnId(String etjId, String format, Object arg) {
- logger.warn(getAiravataLogMessage(etjId, format), arg);
- }
-
- @Override
- public void warn(String format, Object... arguments) {
- logger.warn(format, arguments);
- }
-
- @Override
- public void warnId(String etjId, String format, Object... arguments) {
- logger.warn(getAiravataLogMessage(etjId, format), arguments);
- }
-
- @Override
- public void warn(String format, Object arg1, Object arg2) {
- logger.warn(format, arg1, arg2);
- }
-
- @Override
- public void warnId(String etjId, String format, Object arg1, Object arg2) {
- logger.warn(getAiravataLogMessage(etjId, format), arg1, arg2);
- }
-
- @Override
- public void warn(String msg, Throwable t) {
- logger.warn(msg, t);
- }
-
- @Override
- public void warnId(String etjId, String msg, Throwable t) {
- logger.warn(getAiravataLogMessage(etjId, msg), t);
- }
-
- @Override
- public boolean isErrorEnabled() {
- return logger.isErrorEnabled();
- }
-
- @Override
- public void error(String msg) {
- logger.error(msg);
- }
-
- @Override
- public void errorId(String etjId, String msg) {
- logger.error(getAiravataLogMessage(etjId, msg));
- }
-
- @Override
- public void error(String format, Object arg) {
- logger.error(format, arg);
- }
-
- @Override
- public void errorId(String etjId, String format, Object arg) {
- logger.error(getAiravataLogMessage(etjId, format), arg);
- }
-
- @Override
- public void error(String format, Object arg1, Object arg2) {
- logger.error(format, arg1, arg2);
- }
-
- @Override
- public void errorId(String etjId, String format, Object arg1, Object arg2) {
- logger.error(getAiravataLogMessage(etjId, format), arg1, arg2);
- }
-
- @Override
- public void error(String format, Object... arguments) {
- logger.error(format, arguments);
- }
-
- @Override
- public void errorId(String etjId, String format, Object... arguments) {
- logger.error(getAiravataLogMessage(etjId, format), arguments);
- }
-
- @Override
- public void error(String msg, Throwable t) {
- logger.error(msg, t);
- }
-
- @Override
- public void errorId(String etjId, String msg, Throwable t) {
- logger.error(getAiravataLogMessage(etjId, msg), t);
- }
-
- private String getAiravataLogMessage(String etjId, String msg) {
- return new StringBuilder("Id:").append(etjId).append(" : ").append(msg).toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 3f312fd..adcde31 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -25,9 +25,13 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ServerSettings extends ApplicationSettings {
+ private static final Logger log = LoggerFactory.getLogger(ServerSettings.class);
+
private static final String DEFAULT_USER = "default.registry.user";
private static final String DEFAULT_USER_PASSWORD = "default.registry.password";
private static final String DEFAULT_USER_GATEWAY = "default.registry.gateway";
@@ -42,6 +46,8 @@ public class ServerSettings extends ApplicationSettings {
public static final String GFAC_SERVER_HOST = "gfac.server.host";
public static final String GFAC_SERVER_PORT = "gfac.server.port";
public static final String GFAC_SERVER_NAME = "gfac.server.name";
+ public static final String GFAC_THREAD_POOL_SIZE = "gfac.thread.pool.size";
+ public static final int DEFAULT_GFAC_THREAD_POOL_SIZE = 50;
public static final String GFAC_CONFIG_XML = "gfac-config.xml";
// Credential Store constants
public static final String CREDENTIAL_SERVER_HOST = "credential.store.server.host";
@@ -294,4 +300,20 @@ public class ServerSettings extends ApplicationSettings {
return getSetting(GFAC_SERVER_PORT);
}
+ public static int getGFacThreadPoolSize() {
+ try {
+ String threadPoolSize = getSetting(GFAC_THREAD_POOL_SIZE);
+ if (threadPoolSize != null && !threadPoolSize.isEmpty()) {
+ return Integer.valueOf(threadPoolSize);
+ } else {
+ log.warn("Thread pool size is not configured, use default gfac thread pool size " +
+ DEFAULT_GFAC_THREAD_POOL_SIZE);
+ }
+ } catch (ApplicationSettingsException e) {
+ log.warn("Couldn't read thread pool size from configuration on exception, use default gfac thread pool " +
+ "size " + DEFAULT_GFAC_THREAD_POOL_SIZE);
+ }
+ return DEFAULT_GFAC_THREAD_POOL_SIZE;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 9d20609..1e52a16 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -82,7 +82,6 @@ apiserver.host=localhost
apiserver.port=8930
apiserver.min.threads=50
-
###########################################################################
# Orchestrator Server Configurations
###########################################################################
@@ -108,6 +107,7 @@ enable.validation=true
gfac.server.name=gfac-node0
gfac.server.host=localhost
gfac.server.port=8950
+gfac.thread.pool.size=50
host.scheduler=org.apache.airavata.gfac.core.scheduler.impl.SimpleHostScheduler
@@ -170,8 +170,6 @@ email.from=airavata@apache.org
# Security Configuration used by Airavata Generic Factory Service
# to interact with Computational Resources.
#
-gfac.thread.pool.size=50
-airavata.server.thread.pool.size=50
gfac=org.apache.airavata.gfac.server.GfacServer
myproxy.server=myproxy.teragrid.org
myproxy.username=ogce
http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java
index fca4c98..ca3f5b2 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java
@@ -20,9 +20,8 @@
*/
package org.apache.airavata.gfac.core;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.common.utils.LocalEventPublisher;
+import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.curator.framework.CuratorFramework;
@@ -34,33 +33,44 @@ import org.apache.curator.framework.CuratorFramework;
public interface GFac {
/**
- * Launching a process, this method run process inflow task and job submission task.
+ * Initialized method, this method must call one time before use any other method.
+ * @param experimentCatalog
+ * @param appCatalog
+ * @param curatorClient
+ * @param publisher
+ * @return
+ */
+ public boolean init(ExperimentCatalog experimentCatalog, AppCatalog appCatalog, CuratorFramework curatorClient, LocalEventPublisher publisher);
+
+ /**
+ * This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers
+ * And update the registry occordingly, so the users can query the database to retrieve status and output from Registry
*
- * @param processContext
+ * @param experimentID
* @return boolean Successful acceptence of the jobExecution returns a true value
* @throws GFacException
*/
- public boolean submitProcess(ProcessContext processContext) throws GFacException;
+ public boolean submitJob(String experimentID,String taskID, String gatewayID, String tokenId) throws GFacException;
/**
- * This will invoke outflow tasks for a given process.
- * @param processContext
+ * This method can be used in a handler to ivvoke outhandler asynchronously
+ * @param jobExecutionContext
* @throws GFacException
*/
- public void invokeProcessOutFlow(ProcessContext processContext) throws GFacException;
+ public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException;
/**
- * This will reInvoke outflow tasks for a given process.
- * @param processContext
+ * This method can be used to handle re-run case asynchronously
+ * @param jobExecutionContext
* @throws GFacException
*/
- public void reInvokeProcessOutFlow(ProcessContext processContext) throws GFacException;
+ public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException;
/**
- * This operation can be used to cancel an already running process.
+ * This operation can be used to cancel an already running experiment
* @return Successful cancellation will return true
* @throws GFacException
*/
- public boolean cancelProcess(ProcessContext processContext)throws GFacException;
+ public boolean cancel(String experimentID, String taskID, String gatewayID, String tokenId)throws GFacException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java
index 9ae8c99..5bf09bf 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java
@@ -21,15 +21,15 @@
package org.apache.airavata.gfac.core;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.logger.AiravataLogger;
-import org.apache.airavata.common.logger.AiravataLoggerFactory;
import org.apache.airavata.common.utils.ServerSettings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class GFacThreadPoolExecutor {
- private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GFacThreadPoolExecutor.class);
+ private final static Logger logger = LoggerFactory.getLogger(GFacThreadPoolExecutor.class);
public static final String GFAC_THREAD_POOL_SIZE = "gfac.thread.pool.size";
private static ExecutorService threadPool;
http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 5a6d51d..294c3a9 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.gfac.core;
+import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -100,6 +101,10 @@ public class GFacUtils {
private GFacUtils() {
}
+ public static ProcessContext populateProcessContext(ProcessContext processContext) {
+ return processContext;
+ }
+
/**
* Read data from inputStream and convert it to String.
*
http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java
deleted file mode 100644
index 2219f3a..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.core;
-
-import org.apache.airavata.gfac.core.context.ProcessContext;
-
-public class GFacWorker implements Runnable {
-
-
- public GFacWorker(ProcessContext processContext) {
-
- }
-
- @Override
- public void run() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
index a78d3f0..23b5cca 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
@@ -117,6 +117,7 @@ public class BetterGfacImpl implements GFac {
return gfacInstance;
}
+ @Override
public boolean init(ExperimentCatalog experimentCatalog, AppCatalog appCatalog, CuratorFramework curatorClient,
LocalEventPublisher publisher) {
this.experimentCatalog = experimentCatalog;
http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java
new file mode 100644
index 0000000..cdb22f6
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl;
+
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+
+public class GFacEngine {
+
+ public static void createTaskChain(ProcessContext processContext) throws GFacException {
+
+ }
+
+ public static void executeProcess(ProcessContext processContext) throws GFacException {
+
+
+ }
+
+ public static void recoverProcess(ProcessContext processContext) throws GFacException {
+
+ }
+
+ public static void runProcessOutflow(ProcessContext processContext) throws GFacException {
+
+ }
+
+ public static void recoverProcessOutflow(ProcessContext processContext) throws GFacException {
+
+ }
+
+ public static void cancelProcess() throws GFacException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java
deleted file mode 100644
index 827ab55..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.airavata.gfac.impl;
-
-import org.apache.airavata.gfac.core.GFac;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-
-public class GFacImpl implements GFac {
-
- @Override
- public boolean submitProcess(ProcessContext processContext) throws GFacException {
- return false;
- }
-
- @Override
- public void invokeProcessOutFlow(ProcessContext processContext) throws GFacException {
-
- }
-
- @Override
- public void reInvokeProcessOutFlow(ProcessContext processContext) throws GFacException {
-
- }
-
- @Override
- public boolean cancelProcess(ProcessContext processContext) throws GFacException {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
new file mode 100644
index 0000000..d8aa094
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.impl;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GFacWorker implements Runnable {
+
+ private static final Logger log = LoggerFactory.getLogger(GFacWorker.class);
+ private final ProcessContext processContext;
+
+ public GFacWorker(ProcessContext processContext) throws AiravataException {
+ if (processContext == null) {
+ throw new AiravataException("Worker must initialize with valide processContext, Process context is null");
+ }
+ this.processContext = processContext;
+ }
+
+ @Override
+ public void run() {
+ ProcessType type = getProcessType(processContext);
+ try {
+ switch (type){
+ case NEW:
+ GFacUtils.populateProcessContext(processContext);
+ GFacEngine.createTaskChain(processContext);
+ GFacEngine.executeProcess(processContext);
+ break;
+ case RECOVER:
+ // recover the process
+ GFacEngine.recoverProcess(processContext);
+ break;
+ case OUTFLOW:
+ // run the outflow task
+ GFacEngine.runProcessOutflow(processContext);
+ break;
+ case RECOVER_OUTFLOW:
+ // recover outflow task;
+ GFacEngine.recoverProcessOutflow(processContext);
+ }
+ } catch (GFacException e) {
+ switch (type) {
+ case NEW: log.error("Process execution error", e); break;
+ case RECOVER: log.error("Process recover error ", e); break;
+ case OUTFLOW: log.error("Process outflow execution error",e); break;
+ case RECOVER_OUTFLOW: log.error("Process outflow recover error",e); break;
+ }
+ }
+ }
+
+ private ProcessType getProcessType(ProcessContext processContext) {
+ // check the status and return correct type of process.
+ return ProcessType.NEW;
+ }
+
+
+ private enum ProcessType {
+ NEW,
+ RECOVER,
+ OUTFLOW,
+ RECOVER_OUTFLOW
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 382cd5c..ec474d8 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -24,15 +24,14 @@ import com.google.common.eventbus.EventBus;
import org.apache.airavata.common.exception.AiravataStartupException;
import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.gfac.core.GFacConstants;
-import org.apache.airavata.gfac.core.GFacWorker;
+import org.apache.airavata.gfac.impl.GFacWorker;
import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.logger.AiravataLogger;
-import org.apache.airavata.common.logger.AiravataLoggerFactory;
+import org.apache.airavata.common.log.AiravataLogger;
+import org.apache.airavata.common.log.AiravataLoggerFactory;
import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
@@ -45,7 +44,6 @@ import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.cpi.GfacService;
import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
import org.apache.airavata.gfac.impl.BetterGfacImpl;
-import org.apache.airavata.gfac.impl.InputHandlerWorker;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingConstants;
@@ -69,6 +67,8 @@ import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
@@ -77,9 +77,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
public class GfacServerHandler implements GfacService.Iface {
- private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class);
+ private final static Logger log = LoggerFactory.getLogger(GfacServerHandler.class);
private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
private static int requestCount=0;
private ExperimentCatalog experimentCatalog;
@@ -92,13 +94,15 @@ public class GfacServerHandler implements GfacService.Iface {
private static File gfacConfigFile;
private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
+ private ExecutorService executorService;
public GfacServerHandler() throws AiravataStartupException {
try {
startCuratorClient();
initZkDataStructure();
initAMQPClient();
- localEventPublisher = new LocalEventPublisher(new EventBus());
+ executorService = Executors.newFixedThreadPool(ServerSettings.getGFacThreadPoolSize());
+ localEventPublisher = new LocalEventPublisher(new EventBus());
experimentCatalog = RegistryFactory.getDefaultExpCatalog();
appCatalog = RegistryFactory.getAppCatalog();
startStatusUpdators(experimentCatalog, curatorClient, localEventPublisher, rabbitMQTaskLaunchConsumer);
@@ -126,40 +130,23 @@ public class GfacServerHandler implements GfacService.Iface {
* - /gfac-node0 (localhost:2181)
*|/experiments
*/
- airavataServerHostPort = ServerSettings.getGfacServerHost() + ":" + ServerSettings.getGFacServerPort();
+ airavataServerHostPort = ServerSettings.getGfacServerHost() + ":" + ServerSettings.getGFacServerPort();
// create PERSISTENT nodes
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacUtils.getZKGfacServersParentPath());
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacConstants.ZOOKEEPER_EXPERIMENT_NODE);
// create EPHEMERAL server name node
String gfacName = ServerSettings.getGFacServerName();
- if (curatorClient.checkExists().forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? gfacName : "/" + gfacName)) == null) {
+ if (curatorClient.checkExists().forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ?
+ gfacName : "/" + gfacName)) == null) {
curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
- .forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? gfacName : "/" + gfacName));
+ .forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? gfacName : "/" +
+ gfacName));
}
curatorClient.setData().withVersion(-1).forPath(GFacUtils.getZKGfacServersParentPath() +
(gfacName.startsWith("/") ? gfacName : "/" + gfacName), new String(airavataServerHostPort).getBytes());
}
- public static void main(String[] args) {
- RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = null;
- try {
- rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
- rabbitMQTaskLaunchConsumer.listen(new TestHandler());
- } catch (AiravataException e) {
- logger.error(e.getMessage(), e);
- }
- }
-
- private long ByateArrayToLong(byte[] data) {
- long value = 0;
- for (int i = 0; i < data.length; i++)
- {
- value += ((long) data[i] & 0xffL) << (8 * i);
- }
- return value;
- }
-
public String getGFACServiceVersion() throws TException {
return gfac_cpi_serviceConstants.GFAC_CPI_VERSION;
}
@@ -181,42 +168,38 @@ public class GfacServerHandler implements GfacService.Iface {
* @param processId - processModel id in registry
* @param gatewayId - gateway Identification
*/
- public boolean submitJob(String experimentId, String processId, String gatewayId, String tokenId) throws TException {
+ public boolean submitJob(String experimentId, String processId, String gatewayId, String tokenId) throws
+ TException {
requestCount++;
- logger.info("-----------------------------------------------------" + requestCount + "-----------------------------------------------------");
- logger.infoId(experimentId, "GFac Received submit job request for the Experiment: {} process: {}", experimentId, processId);
+ log.info("-----------------------------------" + requestCount + "-----------------------------------------");
+ log.info(experimentId, "GFac Received submit job request for the Experiment: {} process: {}", experimentId,
+ processId);
ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId);
processContext.setAppCatalog(appCatalog);
processContext.setExperimentCatalog(experimentCatalog);
processContext.setCuratorClient(curatorClient);
processContext.setLocalEventPublisher(localEventPublisher);
-
- GFacWorker worker = new GFacWorker(processContext);
- InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(BetterGfacImpl.getInstance(), experimentId,
- processId, gatewayId, tokenId);
-// try {
-// if( gfac.submitJob(experimentId, taskId, gatewayId)){
- logger.debugId(experimentId, "Submitted job to the Gfac Implementation, experiment {}, task {}, gateway " +
- "{}", experimentId, processId, gatewayId);
-
- GFacThreadPoolExecutor.getCachedThreadPool().execute(inputHandlerWorker);
-
- // we immediately return when we have a threadpool
+ try {
+ executorService.execute(new GFacWorker(processContext));
+ } catch (AiravataException e) {
+ log.error("Failed to submit process", e);
+ return false;
+ }
return true;
}
public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException {
- logger.infoId(experimentId, "GFac Received cancel job request for Experiment: {} TaskId: {} ", experimentId, taskId);
+ log.info(experimentId, "GFac Received cancel job request for Experiment: {} TaskId: {} ", experimentId, taskId);
try {
if (BetterGfacImpl.getInstance().cancel(experimentId, taskId, gatewayId, tokenId)) {
- logger.debugId(experimentId, "Successfully cancelled job, experiment {} , task {}", experimentId, taskId);
+ log.debug(experimentId, "Successfully cancelled job, experiment {} , task {}", experimentId, taskId);
return true;
} else {
- logger.errorId(experimentId, "Job cancellation failed, experiment {} , task {}", experimentId, taskId);
+ log.error(experimentId, "Job cancellation failed, experiment {} , task {}", experimentId, taskId);
return false;
}
} catch (Exception e) {
- logger.errorId(experimentId, "Error cancelling the experiment {}.", experimentId);
+ log.error(experimentId, "Error cancelling the experiment {}.", experimentId);
throw new TException("Error cancelling the experiment : " + e.getMessage(), e);
}
}
@@ -247,11 +230,11 @@ public class GfacServerHandler implements GfacService.Iface {
AbstractActivityListener abstractActivityListener = aClass.newInstance();
activityListeners.add(abstractActivityListener);
abstractActivityListener.setup(publisher, experimentCatalog, curatorClient, rabbitMQPublisher, rabbitMQTaskLaunchConsumer);
- logger.info("Registering listener: " + listenerClass);
+ log.info("Registering listener: " + listenerClass);
publisher.registerListener(abstractActivityListener);
}
} catch (Exception e) {
- logger.error("Error loading the listener classes configured in airavata-server.properties", e);
+ log.error("Error loading the listener classes configured in airavata-server.properties", e);
}
}
private static class TestHandler implements MessageHandler{
@@ -276,7 +259,7 @@ public class GfacServerHandler implements GfacService.Iface {
ThriftUtils.createThriftFromBytes(bytes, event);
System.out.println(event.getExperimentId());
} catch (TException e) {
- logger.error(e.getMessage(), e);
+ log.error(e.getMessage(), e);
}
}
}
@@ -320,13 +303,13 @@ public class GfacServerHandler implements GfacService.Iface {
AiravataZKUtils.getExpStatePath(event.getExperimentId());
submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
} catch (Exception e) {
- logger.error(e.getMessage(), e);
+ log.error(e.getMessage(), e);
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
}
} catch (TException e) {
- logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
+ log.error(e.getMessage(), e); //nobody is listening so nothing to throw
} catch (RegistryException e) {
- logger.error("Error while updating experiment status", e);
+ log.error("Error while updating experiment status", e);
}
} else if (message.getType().equals(MessageType.TERMINATETASK)) {
boolean cancelSuccess = false;
@@ -345,7 +328,7 @@ public class GfacServerHandler implements GfacService.Iface {
"This happens when another cancel operation is being processed or experiment is in one of final states, complete|failed|cancelled.");
}
} catch (Exception e) {
- logger.error(e.getMessage(), e);
+ log.error(e.getMessage(), e);
}finally {
if (cancelSuccess) {
// if cancel success , AiravataExperimentStatusUpdator will send an ack to this message.
@@ -358,7 +341,7 @@ public class GfacServerHandler implements GfacService.Iface {
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
}
} catch (Exception e) {
- logger.error("Error while ack to cancel request, experimentId: " + event.getExperimentId());
+ log.error("Error while ack to cancel request, experimentId: " + event.getExperimentId());
}
}
}
[2/6] airavata git commit: Refactored GfacServerHandler
initialization, changed GFac interface , added GfacImpl
Posted by sh...@apache.org.
Refactored GfacServerHandler initialization, changed GFac interface , added GfacImpl
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/8535ff1d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/8535ff1d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/8535ff1d
Branch: refs/heads/master
Commit: 8535ff1d24a228c793075b92cfdc7f08d6ab7bdb
Parents: 9500724
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Fri Jun 12 12:05:18 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Fri Jun 12 12:05:18 2015 -0400
----------------------------------------------------------------------
.../AiravataExperimentStatusUpdator.java | 10 +-
.../exception/AiravataStartupException.java | 46 +++++
.../exception/AiravataStratupException.java | 9 -
.../apache/airavata/common/utils/Constants.java | 31 +--
.../common/utils/LocalEventPublisher.java | 47 +++++
.../airavata/common/utils/MonitorPublisher.java | 47 -----
.../airavata/common/utils/ServerSettings.java | 41 +++-
.../main/resources/airavata-server.properties | 109 +++++------
.../gfac/bes/provider/impl/BESProvider.java | 2 +-
.../gfac/bes/security/X509SecurityContext.java | 8 +-
.../gfac/bes/utils/DataTransferrer.java | 4 +-
.../apache/airavata/gfac/core/Constants.java | 82 --------
.../org/apache/airavata/gfac/core/GFac.java | 36 ++--
.../airavata/gfac/core/GFacConfiguration.java | 34 ++--
.../airavata/gfac/core/GFacConstants.java | 86 +++++++++
.../apache/airavata/gfac/core/GFacUtils.java | 16 +-
.../apache/airavata/gfac/core/GFacWorker.java | 37 ++++
.../apache/airavata/gfac/core/RequestData.java | 10 +-
.../apache/airavata/gfac/core/Scheduler.java | 22 +--
.../gfac/core/context/JobExecutionContext.java | 12 +-
.../gfac/core/context/ProcessContext.java | 89 +++++++++
.../gfac/core/handler/AbstractHandler.java | 6 +-
.../security/TokenizedMyProxyAuthInfo.java | 10 +-
.../gfac/impl/AiravataJobStatusUpdator.java | 10 +-
.../gfac/impl/AiravataTaskStatusUpdator.java | 12 +-
.../impl/AiravataWorkflowNodeStatusUpdator.java | 10 +-
.../airavata/gfac/impl/BetterGfacImpl.java | 95 +++++-----
.../org/apache/airavata/gfac/impl/GFacImpl.java | 28 +++
.../airavata/gfac/impl/OutHandlerWorker.java | 16 +-
.../gfac/local/provider/impl/LocalProvider.java | 10 +-
.../monitor/core/AiravataAbstractMonitor.java | 3 +-
.../gfac/monitor/email/EmailBasedMonitor.java | 2 +-
.../handlers/GridPullMonitorHandler.java | 2 +-
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 16 +-
.../monitor/impl/push/amqp/AMQPMonitor.java | 16 +-
.../monitor/impl/push/amqp/BasicConsumer.java | 6 +-
.../gfac/ssh/handler/SSHOutputHandler.java | 4 +-
.../gfac/ssh/provider/impl/SSHProvider.java | 22 +--
.../gfac/ssh/security/TokenizedSSHAuthInfo.java | 10 +-
.../airavata/gfac/ssh/util/GFACSSHUtils.java | 4 +-
.../gfac/services/impl/LocalProviderTest.java | 4 +-
.../apache/airavata/job/AMQPMonitorTest.java | 10 +-
.../job/QstatMonitorTestWithMyProxyAuth.java | 6 +-
.../airavata/gfac/server/GfacServerHandler.java | 189 +++++++------------
.../engine/util/ProxyMonitorPublisher.java | 8 +-
45 files changed, 697 insertions(+), 580 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index 5f25f8a..1d1f1ed 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -26,7 +26,7 @@ import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.messaging.core.MessageContext;
@@ -51,7 +51,7 @@ import java.util.Calendar;
public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
private ExperimentCatalog airavataExperimentCatalog;
- private MonitorPublisher monitorPublisher;
+ private LocalEventPublisher localEventPublisher;
private Publisher publisher;
private CuratorFramework curatorClient;
private RabbitMQTaskLaunchConsumer consumer;
@@ -111,7 +111,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(state,
nodeStatus.getWorkflowNodeIdentity().getExperimentId(),
nodeStatus.getWorkflowNodeIdentity().getGatewayId());
- monitorPublisher.publish(event);
+ localEventPublisher.publish(event);
String messageId = AiravataUtils.getId("EXPERIMENT");
MessageContext msgCntxt = new MessageContext(event, MessageType.EXPERIMENT, messageId, nodeStatus.getWorkflowNodeIdentity().getGatewayId());
msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
@@ -203,8 +203,8 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
for (Object configuration : configurations) {
if (configuration instanceof ExperimentCatalog){
this.airavataExperimentCatalog =(ExperimentCatalog)configuration;
- } else if (configuration instanceof MonitorPublisher){
- this.monitorPublisher=(MonitorPublisher) configuration;
+ } else if (configuration instanceof LocalEventPublisher){
+ this.localEventPublisher =(LocalEventPublisher) configuration;
} else if (configuration instanceof Publisher){
this.publisher=(Publisher) configuration;
}else if (configuration instanceof RabbitMQTaskLaunchConsumer) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStartupException.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStartupException.java b/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStartupException.java
new file mode 100644
index 0000000..2ec9f5a
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStartupException.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.common.exception;
+
+public class AiravataStartupException extends Exception {
+ private static final long serialVersionUID = 495204868100143133L;
+
+ public AiravataStartupException() {
+ super();
+ }
+
+ public AiravataStartupException(String message) {
+ super(message);
+ }
+
+ public AiravataStartupException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public AiravataStartupException(Throwable cause) {
+ super(cause);
+ }
+
+ protected AiravataStartupException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStratupException.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStratupException.java b/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStratupException.java
deleted file mode 100644
index e503f4b..0000000
--- a/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStratupException.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.airavata.common.exception;
-
-/**
- * Created by syodage on 6/11/15.
- */
-public class AiravataStratupException extends Exception {
- private static final long serialVersionUID = 495204868100143133L;
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
index 6855a8e..6e1cb84 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -27,37 +27,10 @@ package org.apache.airavata.common.utils;
public final class Constants {
public static final String USER_IN_SESSION = "userName";
-// public static final String GATEWAY_NAME = "gateway_id";
- public static final String GFAC_CONFIG_XML = "gfac-config.xml";
- public static final String PUSH = "push";
- public static final String PULL = "pull";
- public static final String API_SERVER_PORT = "apiserver.server.port";
- public static final String API_SERVER_HOST = "apiserver.server.host";
- public static final String REGISTRY_JDBC_URL = "registry.jdbc.url";
- public static final String APPCATALOG_JDBC_URL = "appcatalog.jdbc.url";
- public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
- public static final String RABBITMQ_EXCHANGE = "rabbitmq.exchange.name";
- public static final String ORCHESTRATOR_SERVER_HOST = "orchestrator.server.host";
- public static final String ORCHESTRATOR_SERVER_PORT = "orchestrator.server.port";
- public static final String GFAC_SERVER_HOST = "gfac.server.host";
- public static final String GFAC_SERVER_PORT = "gfac.server.port";
- public static final String CREDENTIAL_SERVER_HOST = "credential.store.server.host";
- public static final String CREDENTIAL_SERVER_PORT = "credential.store.server.port";
- public static final String ZOOKEEPER_EXPERIMENT_CATALOG = "experiment-catalog";
- public static final String ZOOKEEPER_APPCATALOG = "app-catalog";
- public static final String ZOOKEEPER_RABBITMQ = "rabbit-mq";
- public static final String ZOOKEEPER_SERVER_HOST = "zookeeper.server.host";
- public static final String ZOOKEEPER_SERVER_PORT = "zookeeper.server.port";
- public static final String ZOOKEEPER_API_SERVER_NODE = "airavata-server";
- public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NODE = "orchestrator-server";
- public static final String ZOOKEEPER_GFAC_SERVER_NODE = "gfac-server";
- public static final String ZOOKEEPER_GFAC_EXPERIMENT_NODE = "gfac-experiments";
- public static final String ZOOKEEPER_GFAC_SERVER_NAME = "gfac-server-name";
- public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NAME = "orchestrator-server-name";
- public static final String ZOOKEEPER_API_SERVER_NAME = "api-server-name";
+
+
public static final String STAT = "stat";
public static final String JOB = "job";
- public static final String ZOOKEEPER_TIMEOUT = "zookeeper.timeout";
//API security relates property names
public static final String IS_API_SECURED = "api.secured";
public static final String REMOTE_OAUTH_SERVER_URL = "remote.oauth.authorization.server";
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/commons/src/main/java/org/apache/airavata/common/utils/LocalEventPublisher.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/LocalEventPublisher.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/LocalEventPublisher.java
new file mode 100644
index 0000000..2b5a1d8
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/LocalEventPublisher.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.common.utils;
+
+import com.google.common.eventbus.EventBus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LocalEventPublisher {
+ private final static Logger logger = LoggerFactory.getLogger(LocalEventPublisher.class);
+ private EventBus eventBus;
+
+ public LocalEventPublisher(EventBus eventBus) {
+ this.eventBus = eventBus;
+ }
+
+ public void registerListener(Object listener) {
+ eventBus.register(listener);
+ }
+
+ public void unregisterListener(Object listener) {
+ eventBus.unregister(listener);
+ }
+
+ public void publish(Object o) {
+ eventBus.post(o);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/commons/src/main/java/org/apache/airavata/common/utils/MonitorPublisher.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/MonitorPublisher.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/MonitorPublisher.java
deleted file mode 100644
index 7f64e86..0000000
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/MonitorPublisher.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.common.utils;
-
-import com.google.common.eventbus.EventBus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MonitorPublisher{
- private final static Logger logger = LoggerFactory.getLogger(MonitorPublisher.class);
- private EventBus eventBus;
-
- public MonitorPublisher(EventBus eventBus) {
- this.eventBus = eventBus;
- }
-
- public void registerListener(Object listener) {
- eventBus.register(listener);
- }
-
- public void unregisterListener(Object listener) {
- eventBus.unregister(listener);
- }
-
- public void publish(Object o) {
- eventBus.post(o);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 8370e40..3f312fd 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -33,8 +33,24 @@ public class ServerSettings extends ApplicationSettings {
private static final String DEFAULT_USER_GATEWAY = "default.registry.gateway";
private static final String SERVER_CONTEXT_ROOT = "server.context-root";
- public static final String EMBEDDED_ZK = "embedded.zk";
public static final String IP = "ip";
+ // Orchestrator Constants
+ public static final String ORCHESTRATOR_SERVER_HOST = "orchestrator.server.host";
+
+ public static final String ORCHESTRATOR_SERVER_PORT = "orchestrator.server.port";
+ // Gfac constants
+ public static final String GFAC_SERVER_HOST = "gfac.server.host";
+ public static final String GFAC_SERVER_PORT = "gfac.server.port";
+ public static final String GFAC_SERVER_NAME = "gfac.server.name";
+ public static final String GFAC_CONFIG_XML = "gfac-config.xml";
+ // Credential Store constants
+ public static final String CREDENTIAL_SERVER_HOST = "credential.store.server.host";
+ public static final String CREDENTIAL_SERVER_PORT = "credential.store.server.port";
+ // Zookeeper + curator constants
+ public static final String EMBEDDED_ZK = "embedded.zk";
+ public static final String ZOOKEEPER_SERVER_CONNECTION = "zookeeper.server.connection";
+ public static final String ZOOKEEPER_TIMEOUT = "zookeeper.timeout";
+
private static final String CREDENTIAL_STORE_DB_URL = "credential.store.jdbc.url";
private static final String CREDENTIAL_STORE_DB_USER = "credential.store.jdbc.user";
@@ -57,7 +73,6 @@ public class ServerSettings extends ApplicationSettings {
public static final String JOB_NOTIFICATION_ENABLE = "job.notification.enable";
public static final String JOB_NOTIFICATION_EMAILIDS = "job.notification.emailids";
public static final String JOB_NOTIFICATION_FLAGS = "job.notification.flags";
- public static final String GFAC_PASSIVE = "gfac.passive"; // by default this is desabled
public static final String LAUNCH_QUEUE_NAME = "launch.queue.name";
public static final String CANCEL_QUEUE_NAME = "cancel.queue.name";
@@ -186,11 +201,6 @@ public class ServerSettings extends ApplicationSettings {
return getSetting(TASK_LAUNCH_PUBLISHER);
}
- public static boolean isGFacPassiveMode()throws ApplicationSettingsException {
- String setting = getSetting(GFAC_PASSIVE);
- return Boolean.parseBoolean(setting);
- }
-
public static boolean isEmbeddedZK() {
return Boolean.parseBoolean(getSetting(EMBEDDED_ZK, "true"));
}
@@ -267,4 +277,21 @@ public class ServerSettings extends ApplicationSettings {
public static String getAdminPassword() throws ApplicationSettingsException {
return getSetting(Constants.ADMIN_PASSWORD);
}
+
+ public static String getZookeeperConnection() throws ApplicationSettingsException {
+ return getSetting(ZOOKEEPER_SERVER_CONNECTION, "localhost:2181");
+ }
+
+ public static String getGFacServerName() throws ApplicationSettingsException {
+ return getSetting(GFAC_SERVER_NAME);
+ }
+
+ public static String getGfacServerHost() throws ApplicationSettingsException {
+ return getSetting(GFAC_SERVER_HOST);
+ }
+
+ public static String getGFacServerPort() throws ApplicationSettingsException {
+ return getSetting(GFAC_SERVER_PORT);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index b5c29a5..9d20609 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -68,20 +68,60 @@ appcatalog.validationQuery=SELECT 1 from CONFIGURATION
###########################################################################
# Server module Configuration
###########################################################################
-
servers=apiserver,orchestrator,gfac,credentialstore
#shutdown.trategy=NONE
shutdown.trategy=SELF_TERMINATE
-apiserver.server.host=localhost
-apiserver.server.port=8930
-apiserver.server.min.threads=50
+###########################################################################
+# API Server Configurations
+###########################################################################
+apiserver=org.apache.airavata.api.server.AiravataAPIServer
+apiserver.name=apiserver-node0
+apiserver.host=localhost
+apiserver.port=8930
+apiserver.min.threads=50
+
+
+###########################################################################
+# Orchestrator Server Configurations
+###########################################################################
+orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer
+orchestrator.server.name=orchestrator-node0
orchestrator.server.host=localhost
orchestrator.server.port=8940
+orchestrator.server.min.threads=50
+#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
+#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter
+#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACRPCJobSubmitter
+job.validators=org.apache.airavata.orchestrator.core.validator.impl.BatchQueueValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
+submitter.interval=10000
+threadpool.size=10
+start.submitter=true
+embedded.mode=true
+enable.validation=true
+
+
+###########################################################################
+# GFac Server Configurations
+###########################################################################
+gfac.server.name=gfac-node0
gfac.server.host=localhost
gfac.server.port=8950
-orchestrator.server.min.threads=50
+host.scheduler=org.apache.airavata.gfac.core.scheduler.impl.SimpleHostScheduler
+
+
+
+###########################################################################
+# Airavata Workflow Interpreter Configurations
+###########################################################################
+workflowserver=org.apache.airavata.api.server.WorkflowServer
+enactment.thread.pool.size=10
+
+#to define custom workflow parser user following property
+#workflow.parser=org.apache.airavata.workflow.core.parser.AiravataWorkflowParser
+
+
###########################################################################
# Job Scheduler can send informative email messages to you about the status of your job.
@@ -139,7 +179,6 @@ myproxy.password=
myproxy.life=3600
# XSEDE Trusted certificates can be downloaded from https://software.xsede.org/security/xsede-certs.tar.gz
trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates
-gfac.passive=true
# SSH PKI key pair or ssh password can be used SSH based authentication is used.
# if user specify both password authentication gets the higher preference
@@ -156,33 +195,6 @@ gfac.passive=true
#bes.ca.key.path=<location>/certificates/cakey.pem
#bes.ca.key.pass=passphrase
-
-###########################################################################
-# Airavata Workflow Interpreter Configurations
-###########################################################################
-
-#runInThread=true
-#provenance=true
-#provenanceWriterThreadPoolSize=20
-#gfac.embedded=true
-#workflowserver=org.apache.airavata.api.server.WorkflowServer
-enactment.thread.pool.size=10
-
-#to define custom workflow parser user following property
-#workflow.parser=org.apache.airavata.workflow.core.parser.AiravataWorkflowParser
-
-
-###########################################################################
-# API Server module Configuration
-###########################################################################
-apiserver=org.apache.airavata.api.server.AiravataAPIServer
-
-###########################################################################
-# Workflow Server module Configuration
-###########################################################################
-
-workflowserver=org.apache.airavata.api.server.WorkflowServer
-
###########################################################################
# Advance configuration to change service implementations
###########################################################################
@@ -191,7 +203,6 @@ TwoPhase=true
#
# Class which implemented HostScheduler interface. It will determine the which host to submit the request
#
-host.scheduler=org.apache.airavata.gfac.core.scheduler.impl.SimpleHostScheduler
###########################################################################
# Monitoring module Configuration
@@ -213,15 +224,11 @@ email.based.monitoring.period=10000
###########################################################################
# AMQP Notification Configuration
###########################################################################
-
-
amqp.notification.enable=1
-
amqp.broker.host=localhost
amqp.broker.port=5672
amqp.broker.username=guest
amqp.broker.password=guest
-
amqp.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPSenderImpl
amqp.topic.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPTopicSenderImpl
amqp.broadcast.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPBroadcastSenderImpl
@@ -251,35 +258,11 @@ activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
rabbitmq.exchange.name=airavata_rabbitmq_exchange
###########################################################################
-# Orchestrator module Configuration
-###########################################################################
-
-#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
-#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter
-#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACRPCJobSubmitter
-job.validators=org.apache.airavata.orchestrator.core.validator.impl.BatchQueueValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
-submitter.interval=10000
-threadpool.size=10
-start.submitter=true
-embedded.mode=true
-enable.validation=true
-orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer
-
-###########################################################################
# Zookeeper Server Configuration
###########################################################################
-
embedded.zk=true
-zookeeper.server.host=localhost
-zookeeper.server.port=2181
-airavata-server=/api-server
+zookeeper.server.connection=localhost:2181
zookeeper.timeout=30000
-orchestrator-server=/orchestrator-server
-gfac-server=/gfac-server
-gfac-experiments=/gfac-experiments
-gfac-server-name=gfac-node0
-orchestrator-server-name=orch-node0
-airavata-server-name=api-node0
########################################################################
## API Security Configuration
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index b166593..e629150 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -449,6 +449,6 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
log.debug(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " +
"experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
jobStatus.getJobIdentity().getTaskId());
- jobExecutionContext.getMonitorPublisher().publish(jobStatus);
+ jobExecutionContext.getLocalEventPublisher().publish(jobStatus);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/X509SecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/X509SecurityContext.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/X509SecurityContext.java
index ff60e99..d9b183f 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/X509SecurityContext.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/X509SecurityContext.java
@@ -26,8 +26,8 @@ import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.credential.store.credential.Credential;
import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.gfac.core.GFacConstants;
import org.apache.airavata.gfac.core.context.AbstractSecurityContext;
-import org.apache.airavata.gfac.core.Constants;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.RequestData;
import org.apache.airavata.gfac.bes.utils.MyProxyLogon;
@@ -105,13 +105,13 @@ public class X509SecurityContext extends AbstractSecurityContext {
log.info("Current directory " + f.getAbsolutePath());
throw new RuntimeException("Cannot read trusted certificate path " + trustedCertificatePath);
} else {
- System.setProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath());
+ System.setProperty(GFacConstants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath());
}
}
private static void setUpTrustedCertificatePath() throws ApplicationSettingsException {
- String trustedCertificatePath = ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION);
+ String trustedCertificatePath = ServerSettings.getSetting(GFacConstants.TRUSTED_CERT_LOCATION);
setUpTrustedCertificatePath(trustedCertificatePath);
}
@@ -122,7 +122,7 @@ public class X509SecurityContext extends AbstractSecurityContext {
* @return The trusted certificate path as a string.
*/
public static String getTrustedCertificatePath() {
- return System.getProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY);
+ return System.getProperty(GFacConstants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
index 623a0b6..f20f18b 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
@@ -31,7 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.airavata.gfac.core.Constants;
+import org.apache.airavata.gfac.core.GFacConstants;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.provider.GFacProviderException;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
@@ -209,7 +209,7 @@ public class DataTransferrer {
String temp = null;
while ((temp = instream.readLine()) != null) {
buff.append(temp);
- buff.append(Constants.NEWLINE);
+ buff.append(GFacConstants.NEWLINE);
}
log.info("finish read file:" + localFile);
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Constants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Constants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Constants.java
deleted file mode 100644
index 9f89256..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Constants.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.core;
-
-public class Constants {
- public static final String XPATH_EXPR_GLOBAL_INFLOW_HANDLERS = "/GFac/GlobalHandlers/InHandlers/Handler";
- public static final String XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS = "/GFac/GlobalHandlers/OutHandlers/Handler";
- public static final String XPATH_EXPR_DAEMON_HANDLERS = "/GFac/DaemonHandlers/Handler";
-
- public static final String XPATH_EXPR_APPLICATION_HANDLERS_START = "/GFac/Application[@name='";
- public static final String XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
- public static final String XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
- public static final String XPATH_EXPR_APPLICATION_PROVIDER = "']/OutHandlers/Handler";
-
-
- public static final String XPATH_EXPR_PROVIDER_HANDLERS_START = "/GFac/Provider[@class='";
- public static final String XPATH_EXPR_PROVIDER_ON_HOST = "/GFac/Provider[@host='";
- public static final String XPATH_EXPR_PROVIDER_ON_SUBMISSION = "/GFac/Provider[@submission='";
- public static final String XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
- public static final String XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
-
- public static final String GFAC_CONFIG_CLASS_ATTRIBUTE = "class";
- public static final String GFAC_CONFIG_SECURITY_ATTRIBUTE = "security";
- public static final String GFAC_CONFIG_SUBMISSION_ATTRIBUTE = "submission";
- public static final String GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE = "executionMode";
- public static final String GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE = "class";
- public static final String NEWLINE = System.getProperty("line.separator");
- public static final String INPUT_DATA_DIR_VAR_NAME = "input";
- public static final String OUTPUT_DATA_DIR_VAR_NAME = "output";
- public static final int DEFAULT_GSI_FTP_PORT = 2811;
- public static final String _127_0_0_1 = "127.0.0.1";
- public static final String LOCALHOST = "localhost";
-
- public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id";
- public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id";
- public static final String PROP_BROKER_URL = "broker.url";
- public static final String PROP_TOPIC = "topic";
- public static final String SPACE = " ";
- public static final int COMMAND_EXECUTION_TIMEOUT = 5;
- public static final String EXECUTABLE_NAME = "run.sh";
-
- public static final String TRUSTED_CERT_LOCATION = "trusted.cert.location";
- public static final String TRUSTED_CERTIFICATE_SYSTEM_PROPERTY = "X509_CERT_DIR";
- public static final String MYPROXY_SERVER = "myproxy.server";
- public static final String MYPROXY_SERVER_PORT = "myproxy.port";
- public static final String MYPROXY_USER = "myproxy.username";
- public static final String MYPROXY_PASS = "myproxy.password";
- public static final String MYPROXY_LIFE = "myproxy.life";
- /*
- * SSH properties
- */
- public static final String SSH_PRIVATE_KEY = "private.ssh.key";
- public static final String SSH_PUBLIC_KEY = "public.ssh.key";
- public static final String SSH_PRIVATE_KEY_PASS = "ssh.keypass";
- public static final String SSH_USER_NAME = "ssh.username";
- public static final String SSH_PASSWORD = "ssh.password";
- public static final String PROPERTY = "property";
- public static final String NAME = "name";
- public static final String VALUE = "value";
- public static final String OUTPUT_DATA_DIR = "output.location";
-
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java
index a8c0d58..fca4c98 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java
@@ -20,8 +20,9 @@
*/
package org.apache.airavata.gfac.core;
+import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.registry.cpi.AppCatalog;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.curator.framework.CuratorFramework;
@@ -33,44 +34,33 @@ import org.apache.curator.framework.CuratorFramework;
public interface GFac {
/**
- * Initialized method, this method must call one time before use any other method.
- * @param experimentCatalog
- * @param appCatalog
- * @param curatorClient
- * @param publisher
- * @return
- */
- public boolean init(ExperimentCatalog experimentCatalog, AppCatalog appCatalog, CuratorFramework curatorClient, MonitorPublisher publisher);
-
- /**
- * This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers
- * And update the registry occordingly, so the users can query the database to retrieve status and output from Registry
+ * Launching a process, this method run process inflow task and job submission task.
*
- * @param experimentID
+ * @param processContext
* @return boolean Successful acceptence of the jobExecution returns a true value
* @throws GFacException
*/
- public boolean submitJob(String experimentID,String taskID, String gatewayID, String tokenId) throws GFacException;
+ public boolean submitProcess(ProcessContext processContext) throws GFacException;
/**
- * This method can be used in a handler to ivvoke outhandler asynchronously
- * @param jobExecutionContext
+ * This will invoke outflow tasks for a given process.
+ * @param processContext
* @throws GFacException
*/
- public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException;
+ public void invokeProcessOutFlow(ProcessContext processContext) throws GFacException;
/**
- * This method can be used to handle re-run case asynchronously
- * @param jobExecutionContext
+ * This will reInvoke outflow tasks for a given process.
+ * @param processContext
* @throws GFacException
*/
- public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException;
+ public void reInvokeProcessOutFlow(ProcessContext processContext) throws GFacException;
/**
- * This operation can be used to cancel an already running experiment
+ * This operation can be used to cancel an already running process.
* @return Successful cancellation will return true
* @throws GFacException
*/
- public boolean cancel(String experimentID, String taskID, String gatewayID, String tokenId)throws GFacException;
+ public boolean cancelProcess(ProcessContext processContext)throws GFacException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConfiguration.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConfiguration.java
index ae82a72..3d2a320 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConfiguration.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConfiguration.java
@@ -94,15 +94,15 @@ public class GFacConfiguration {
public void setInHandlers(String providerName, String applicationName) {
try {
- this.inHandlers = getHandlerConfig(handlerDoc, Constants.XPATH_EXPR_GLOBAL_INFLOW_HANDLERS, Constants.GFAC_CONFIG_CLASS_ATTRIBUTE);
+ this.inHandlers = getHandlerConfig(handlerDoc, GFacConstants.XPATH_EXPR_GLOBAL_INFLOW_HANDLERS, GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE);
if (applicationName != null) {
- String xPath = Constants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + Constants.XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END;
- List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, Constants.GFAC_CONFIG_CLASS_ATTRIBUTE);
+ String xPath = GFacConstants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + GFacConstants.XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END;
+ List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE);
this.inHandlers.addAll(handlers);
}
if (providerName != null) {
- String xPath = Constants.XPATH_EXPR_PROVIDER_HANDLERS_START + providerName + Constants.XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END;
- List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, Constants.GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE);
+ String xPath = GFacConstants.XPATH_EXPR_PROVIDER_HANDLERS_START + providerName + GFacConstants.XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END;
+ List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, GFacConstants.GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE);
this.inHandlers.addAll(handlers);
}
} catch (XPathExpressionException e) {
@@ -112,15 +112,15 @@ public class GFacConfiguration {
public void setOutHandlers(String providerName, String applicationName) {
try {
- this.outHandlers = getHandlerConfig(handlerDoc, Constants.XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS, Constants.GFAC_CONFIG_CLASS_ATTRIBUTE);
+ this.outHandlers = getHandlerConfig(handlerDoc, GFacConstants.XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS, GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE);
if (applicationName != null) {
- String xPath = Constants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + Constants.XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END;
- List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, Constants.GFAC_CONFIG_CLASS_ATTRIBUTE);
+ String xPath = GFacConstants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + GFacConstants.XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END;
+ List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE);
this.outHandlers.addAll(handlers);
}
if(providerName != null) {
- String xPath = Constants.XPATH_EXPR_PROVIDER_HANDLERS_START + providerName + Constants.XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END;
- List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, Constants.GFAC_CONFIG_CLASS_ATTRIBUTE);
+ String xPath = GFacConstants.XPATH_EXPR_PROVIDER_HANDLERS_START + providerName + GFacConstants.XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END;
+ List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE);
this.outHandlers.addAll(handlers);
}
} catch (XPathExpressionException e) {
@@ -200,9 +200,9 @@ public class GFacConfiguration {
className = ((Element) nl.item(i)).getAttribute(attribute);
NodeList childNodes = (nl.item(i)).getChildNodes();
for(int j = 0;j < childNodes.getLength();j++){
- if(Constants.PROPERTY.equals(childNodes.item(j).getNodeName())) {
- String name = ((Element) childNodes.item(j)).getAttribute(Constants.NAME);
- String value = ((Element) childNodes.item(j)).getAttribute(Constants.VALUE);
+ if(GFacConstants.PROPERTY.equals(childNodes.item(j).getNodeName())) {
+ String name = ((Element) childNodes.item(j)).getAttribute(GFacConstants.NAME);
+ String value = ((Element) childNodes.item(j)).getAttribute(GFacConstants.VALUE);
properties.put(name, value);
}
}
@@ -226,9 +226,9 @@ public class GFacConfiguration {
if (className != null && !className.equals("")) {
NodeList childNodes = (nl.item(i)).getChildNodes();
for (int j = 0; j < childNodes.getLength(); j++) {
- if (Constants.PROPERTY.equals(childNodes.item(j).getNodeName())) {
- String name = ((Element) childNodes.item(j)).getAttribute(Constants.NAME);
- String value = ((Element) childNodes.item(j)).getAttribute(Constants.VALUE);
+ if (GFacConstants.PROPERTY.equals(childNodes.item(j).getNodeName())) {
+ String name = ((Element) childNodes.item(j)).getAttribute(GFacConstants.NAME);
+ String value = ((Element) childNodes.item(j)).getAttribute(GFacConstants.VALUE);
properties.put(name, value);
}
}
@@ -273,7 +273,7 @@ public class GFacConfiguration {
DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder();
handlerDoc = docBuilder.parse(configFile);
- return getHandlerConfig(handlerDoc, Constants.XPATH_EXPR_DAEMON_HANDLERS, Constants.GFAC_CONFIG_CLASS_ATTRIBUTE);
+ return getHandlerConfig(handlerDoc, GFacConstants.XPATH_EXPR_DAEMON_HANDLERS, GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE);
}
public static Document getHandlerDoc() {
return handlerDoc;
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
new file mode 100644
index 0000000..621eeac
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.core;
+
+public class GFacConstants {
+ public static final String XPATH_EXPR_GLOBAL_INFLOW_HANDLERS = "/GFac/GlobalHandlers/InHandlers/Handler";
+ public static final String XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS = "/GFac/GlobalHandlers/OutHandlers/Handler";
+ public static final String XPATH_EXPR_DAEMON_HANDLERS = "/GFac/DaemonHandlers/Handler";
+
+ public static final String XPATH_EXPR_APPLICATION_HANDLERS_START = "/GFac/Application[@name='";
+ public static final String XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
+ public static final String XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
+ public static final String XPATH_EXPR_APPLICATION_PROVIDER = "']/OutHandlers/Handler";
+
+
+ public static final String XPATH_EXPR_PROVIDER_HANDLERS_START = "/GFac/Provider[@class='";
+ public static final String XPATH_EXPR_PROVIDER_ON_HOST = "/GFac/Provider[@host='";
+ public static final String XPATH_EXPR_PROVIDER_ON_SUBMISSION = "/GFac/Provider[@submission='";
+ public static final String XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
+ public static final String XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
+
+ public static final String GFAC_CONFIG_CLASS_ATTRIBUTE = "class";
+ public static final String GFAC_CONFIG_SECURITY_ATTRIBUTE = "security";
+ public static final String GFAC_CONFIG_SUBMISSION_ATTRIBUTE = "submission";
+ public static final String GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE = "executionMode";
+ public static final String GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE = "class";
+ public static final String NEWLINE = System.getProperty("line.separator");
+ public static final String INPUT_DATA_DIR_VAR_NAME = "input";
+ public static final String OUTPUT_DATA_DIR_VAR_NAME = "output";
+ public static final int DEFAULT_GSI_FTP_PORT = 2811;
+ public static final String _127_0_0_1 = "127.0.0.1";
+ public static final String LOCALHOST = "localhost";
+
+ public static final String ZOOKEEPER_SERVERS_NODE = "/servers";
+ public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac";
+ public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments";
+
+ public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id";
+ public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id";
+ public static final String PROP_BROKER_URL = "broker.url";
+ public static final String PROP_TOPIC = "topic";
+ public static final String SPACE = " ";
+ public static final int COMMAND_EXECUTION_TIMEOUT = 5;
+ public static final String EXECUTABLE_NAME = "run.sh";
+
+ public static final String TRUSTED_CERT_LOCATION = "trusted.cert.location";
+ public static final String TRUSTED_CERTIFICATE_SYSTEM_PROPERTY = "X509_CERT_DIR";
+ public static final String MYPROXY_SERVER = "myproxy.server";
+ public static final String MYPROXY_SERVER_PORT = "myproxy.port";
+ public static final String MYPROXY_USER = "myproxy.username";
+ public static final String MYPROXY_PASS = "myproxy.password";
+ public static final String MYPROXY_LIFE = "myproxy.life";
+ /*
+ * SSH properties
+ */
+ public static final String SSH_PRIVATE_KEY = "private.ssh.key";
+ public static final String SSH_PUBLIC_KEY = "public.ssh.key";
+ public static final String SSH_PRIVATE_KEY_PASS = "ssh.keypass";
+ public static final String SSH_USER_NAME = "ssh.username";
+ public static final String SSH_PASSWORD = "ssh.password";
+ public static final String PROPERTY = "property";
+ public static final String NAME = "name";
+ public static final String VALUE = "value";
+ public static final String OUTPUT_DATA_DIR = "output.location";
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 65c305b..5a6d51d 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -25,7 +25,7 @@ import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.DBUtil;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
@@ -196,7 +196,7 @@ public class GFacUtils {
String temp = null;
while ((temp = instream.readLine()) != null) {
buff.append(temp);
- buff.append(Constants.NEWLINE);
+ buff.append(GFacConstants.NEWLINE);
}
return buff.toString();
} finally {
@@ -214,7 +214,7 @@ public class GFacUtils {
throws UnknownHostException {
String localHost = InetAddress.getLocalHost().getCanonicalHostName();
return (localHost.equals(appHost)
- || Constants.LOCALHOST.equals(appHost) || Constants._127_0_0_1
+ || GFacConstants.LOCALHOST.equals(appHost) || GFacConstants._127_0_0_1
.equals(appHost));
}
@@ -267,7 +267,7 @@ public class GFacUtils {
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier);
- jobExecutionContext.getMonitorPublisher().publish(jobStatusChangeRequestEvent);
+ jobExecutionContext.getLocalEventPublisher().publish(jobStatusChangeRequestEvent);
} catch (Exception e) {
throw new GFacException("Error persisting job status"
+ e.getLocalizedMessage(), e);
@@ -510,7 +510,7 @@ public class GFacUtils {
* @throws InterruptedException
*/
public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception {
- String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+ String experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
List<String> children = curatorClient.getChildren().forPath(experimentNode);
for (String pickedChild : children) {
String experimentPath = experimentNode + File.separator + pickedChild;
@@ -729,11 +729,15 @@ public class GFacUtils {
return false;
}
- public static void publishTaskStatus (JobExecutionContext jobExecutionContext, MonitorPublisher publisher, TaskState state){
+ public static void publishTaskStatus (JobExecutionContext jobExecutionContext, LocalEventPublisher publisher, TaskState state){
TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity));
}
+
+ public static String getZKGfacServersParentPath() {
+ return GFacConstants.ZOOKEEPER_SERVERS_NODE + GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java
new file mode 100644
index 0000000..2219f3a
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.core;
+
+import org.apache.airavata.gfac.core.context.ProcessContext;
+
+public class GFacWorker implements Runnable {
+
+
+ public GFacWorker(ProcessContext processContext) {
+
+ }
+
+ @Override
+ public void run() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/RequestData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/RequestData.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/RequestData.java
index 73538b0..a396861 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/RequestData.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/RequestData.java
@@ -90,7 +90,7 @@ public class RequestData {
public String getMyProxyServerUrl() throws ApplicationSettingsException {
if (myProxyServerUrl == null) {
- myProxyServerUrl = ServerSettings.getSetting(Constants.MYPROXY_SERVER);
+ myProxyServerUrl = ServerSettings.getSetting(GFacConstants.MYPROXY_SERVER);
}
return myProxyServerUrl;
}
@@ -102,7 +102,7 @@ public class RequestData {
public int getMyProxyPort() {
if (myProxyPort == 0) {
- String sPort = ServerSettings.getSetting(Constants.MYPROXY_SERVER_PORT, Integer.toString(DEFAULT_MY_PROXY_PORT));
+ String sPort = ServerSettings.getSetting(GFacConstants.MYPROXY_SERVER_PORT, Integer.toString(DEFAULT_MY_PROXY_PORT));
myProxyPort = Integer.parseInt(sPort);
}
@@ -115,7 +115,7 @@ public class RequestData {
public String getMyProxyUserName() throws ApplicationSettingsException {
if (myProxyUserName == null) {
- myProxyUserName = ServerSettings.getSetting(Constants.MYPROXY_USER);
+ myProxyUserName = ServerSettings.getSetting(GFacConstants.MYPROXY_USER);
}
return myProxyUserName;
@@ -128,14 +128,14 @@ public class RequestData {
public String getMyProxyPassword() throws ApplicationSettingsException {
if (myProxyPassword == null) {
- myProxyPassword = ServerSettings.getSetting(Constants.MYPROXY_PASS);
+ myProxyPassword = ServerSettings.getSetting(GFacConstants.MYPROXY_PASS);
}
return myProxyPassword;
}
public int getMyProxyLifeTime() {
- String life = ServerSettings.getSetting(Constants.MYPROXY_LIFE,Integer.toString(myProxyLifeTime));
+ String life = ServerSettings.getSetting(GFacConstants.MYPROXY_LIFE,Integer.toString(myProxyLifeTime));
myProxyLifeTime = Integer.parseInt(life);
return myProxyLifeTime;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Scheduler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Scheduler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Scheduler.java
index dc5ede8..606c385 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Scheduler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Scheduler.java
@@ -101,7 +101,7 @@ public class Scheduler {
String providerClassName = null;
try {
aClass = GFacConfiguration.getProviderConfig(handlerDoc,
- Constants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + "']", Constants.GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE);
+ GFacConstants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + "']", GFacConstants.GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE);
// This should be have a single element only.
if (aClass != null && !aClass.isEmpty()) {
s = aClass.get(0);
@@ -136,13 +136,13 @@ public class Scheduler {
unicoreSubmission = appCatalog.getComputeResource().getUNICOREJobSubmission(jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId());
securityProtocol = unicoreSubmission.getSecurityProtocol().toString();
}
- List<Element> elements = GFacUtils.getElementList(GFacConfiguration.getHandlerDoc(), Constants.XPATH_EXPR_PROVIDER_ON_SUBMISSION + jobSubmissionProtocol + "']");
+ List<Element> elements = GFacUtils.getElementList(GFacConfiguration.getHandlerDoc(), GFacConstants.XPATH_EXPR_PROVIDER_ON_SUBMISSION + jobSubmissionProtocol + "']");
for (Element element : elements) {
- String security = element.getAttribute(Constants.GFAC_CONFIG_SECURITY_ATTRIBUTE);
+ String security = element.getAttribute(GFacConstants.GFAC_CONFIG_SECURITY_ATTRIBUTE);
if (security.equals("")) {
- providerClassName = element.getAttribute(Constants.GFAC_CONFIG_CLASS_ATTRIBUTE);
+ providerClassName = element.getAttribute(GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE);
}else if (securityProtocol != null && securityProtocol.equals(security)) {
- providerClassName = element.getAttribute(Constants.GFAC_CONFIG_CLASS_ATTRIBUTE);
+ providerClassName = element.getAttribute(GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE);
}
}
if (providerClassName == null) {
@@ -152,8 +152,8 @@ public class Scheduler {
Class<? extends GFacProvider> aClass1 = Class.forName(providerClassName).asSubclass(GFacProvider.class);
provider = aClass1.newInstance();
//loading the provider properties
- aClass = GFacConfiguration.getProviderConfig(GFacConfiguration.getHandlerDoc(), Constants.XPATH_EXPR_PROVIDER_HANDLERS_START +
- providerClassName + "']", Constants.GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE);
+ aClass = GFacConfiguration.getProviderConfig(GFacConfiguration.getHandlerDoc(), GFacConstants.XPATH_EXPR_PROVIDER_HANDLERS_START +
+ providerClassName + "']", GFacConstants.GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE);
if (!aClass.isEmpty()) {
provider.initProperties(aClass.get(0).getProperties());
}
@@ -200,18 +200,18 @@ public class Scheduler {
String executionMode = "sync";
try {
executionMode = GFacConfiguration.getAttributeValue(handlerDoc,
- Constants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + "']", Constants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE);
+ GFacConstants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + "']", GFacConstants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE);
// This should be have a single element only.
if (executionMode == null || "".equals(executionMode)) {
String hostClass = jobExecutionContext.getPreferredJobSubmissionProtocol().toString();
- executionMode = GFacConfiguration.getAttributeValue(GFacConfiguration.getHandlerDoc(), Constants.XPATH_EXPR_PROVIDER_ON_HOST + hostClass + "']", Constants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE);
+ executionMode = GFacConfiguration.getAttributeValue(GFacConfiguration.getHandlerDoc(), GFacConstants.XPATH_EXPR_PROVIDER_ON_HOST + hostClass + "']", GFacConstants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE);
}
if (executionMode == null || "".equals(executionMode)) {
- List<Element> elements = GFacUtils.getElementList(GFacConfiguration.getHandlerDoc(), Constants.XPATH_EXPR_PROVIDER_ON_SUBMISSION + jobSubmissionProtocol + "']");
+ List<Element> elements = GFacUtils.getElementList(GFacConfiguration.getHandlerDoc(), GFacConstants.XPATH_EXPR_PROVIDER_ON_SUBMISSION + jobSubmissionProtocol + "']");
for (Element element : elements) {
- executionMode = element.getAttribute(Constants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE);
+ executionMode = element.getAttribute(GFacConstants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index 8183dec..b240901 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -27,10 +27,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.AppCatalogException;
-import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.gfac.core.GFacConfiguration;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.SecurityContext;
@@ -144,7 +144,7 @@ public class JobExecutionContext extends AbstractContext implements Serializable
private String status;
private List<String> outputFileList;
private ExperimentCatalog experimentCatalog;
- private MonitorPublisher monitorPublisher;
+ private LocalEventPublisher localEventPublisher;
public String getGatewayID() {
return gatewayID;
@@ -486,11 +486,11 @@ public class JobExecutionContext extends AbstractContext implements Serializable
this.loginUserName = loginUserName;
}
- public MonitorPublisher getMonitorPublisher() {
- return monitorPublisher;
+ public LocalEventPublisher getLocalEventPublisher() {
+ return localEventPublisher;
}
- public void setMonitorPublisher(MonitorPublisher monitorPublisher) {
- this.monitorPublisher = monitorPublisher;
+ public void setLocalEventPublisher(LocalEventPublisher localEventPublisher) {
+ this.localEventPublisher = localEventPublisher;
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
new file mode 100644
index 0000000..4cab291
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.core.context;
+
+import org.apache.airavata.common.utils.LocalEventPublisher;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.curator.framework.CuratorFramework;
+
+public class ProcessContext {
+ // process model
+ private ExperimentCatalog experimentCatalog;
+ private AppCatalog appCatalog;
+ private CuratorFramework curatorClient;
+ private LocalEventPublisher localEventPublisher;
+ private final String processId;
+ private final String gatewayId;
+ private final String tokenId;
+
+ public ProcessContext(String processId, String gatewayId, String tokenId) {
+ this.processId = processId;
+ this.gatewayId = gatewayId;
+ this.tokenId = tokenId;
+ }
+
+ // Getters and Setters
+ public ExperimentCatalog getExperimentCatalog() {
+ return experimentCatalog;
+ }
+
+ public void setExperimentCatalog(ExperimentCatalog experimentCatalog) {
+ this.experimentCatalog = experimentCatalog;
+ }
+
+ public AppCatalog getAppCatalog() {
+ return appCatalog;
+ }
+
+ public void setAppCatalog(AppCatalog appCatalog) {
+ this.appCatalog = appCatalog;
+ }
+
+ public String getGatewayId() {
+ return gatewayId;
+ }
+
+ public String getTokenId() {
+ return tokenId;
+ }
+
+ public String getProcessId() {
+ return processId;
+ }
+
+ public CuratorFramework getCuratorClient() {
+ return curatorClient;
+ }
+
+ public void setCuratorClient(CuratorFramework curatorClient) {
+ this.curatorClient = curatorClient;
+ }
+
+ public LocalEventPublisher getLocalEventPublisher() {
+ return localEventPublisher;
+ }
+
+ public void setLocalEventPublisher(LocalEventPublisher localEventPublisher) {
+ this.localEventPublisher = localEventPublisher;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
index fcfd7f1..6a28986 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
@@ -20,7 +20,7 @@
*/
package org.apache.airavata.gfac.core.handler;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.states.GfacHandlerState;
import org.apache.airavata.gfac.core.GFacUtils;
@@ -39,11 +39,11 @@ public abstract class AbstractHandler implements GFacHandler {
private static final Logger logger = LoggerFactory.getLogger(AbstractHandler.class);
protected ExperimentCatalog experimentCatalog = null;
- protected MonitorPublisher publisher = null;
+ protected LocalEventPublisher publisher = null;
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
try {
- publisher = jobExecutionContext.getMonitorPublisher();
+ publisher = jobExecutionContext.getLocalEventPublisher();
GFacUtils.updateHandlerState(jobExecutionContext.getCuratorClient(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
} catch (Exception e) {
logger.error("Error saving Recoverable provider state", e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
index 5b1e3a2..36cb84f 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
@@ -25,7 +25,7 @@ import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.credential.store.credential.Credential;
import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.gfac.core.Constants;
+import org.apache.airavata.gfac.core.GFacConstants;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.RequestData;
import org.apache.airavata.gfac.core.GFacUtils;
@@ -79,13 +79,13 @@ public class TokenizedMyProxyAuthInfo extends GSIAuthenticationInfo {
log.info("Current directory " + f.getAbsolutePath());
throw new RuntimeException("Cannot read trusted certificate path " + trustedCertificatePath);
} else {
- System.setProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath());
+ System.setProperty(GFacConstants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath());
}
}
private static void setUpTrustedCertificatePath() throws ApplicationSettingsException {
- String trustedCertificatePath = ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION);
+ String trustedCertificatePath = ServerSettings.getSetting(GFacConstants.TRUSTED_CERT_LOCATION);
setUpTrustedCertificatePath(trustedCertificatePath);
}
@@ -94,7 +94,7 @@ public class TokenizedMyProxyAuthInfo extends GSIAuthenticationInfo {
this.credentialReader = credentialReader;
this.requestData = requestData;
try {
- properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION));
+ properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(GFacConstants.TRUSTED_CERT_LOCATION));
} catch (ApplicationSettingsException e) {
log.error("Error while reading server properties", e);
};
@@ -103,7 +103,7 @@ public class TokenizedMyProxyAuthInfo extends GSIAuthenticationInfo {
public TokenizedMyProxyAuthInfo(RequestData requestData) {
this.requestData = requestData;
try {
- properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION));
+ properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(GFacConstants.TRUSTED_CERT_LOCATION));
} catch (ApplicationSettingsException e) {
log.error("Error while reading server properties", e);
};
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java
index 46659ff..eee66c2 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java
@@ -22,7 +22,7 @@ package org.apache.airavata.gfac.impl;
import com.google.common.eventbus.Subscribe;
import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.Publisher;
@@ -43,7 +43,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
private ExperimentCatalog airavataExperimentCatalog;
- private MonitorPublisher monitorPublisher;
+ private LocalEventPublisher localEventPublisher;
private Publisher publisher;
@@ -71,7 +71,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
logger.debug("expId - {}: Publishing job status for " + jobStatus.getJobIdentity().getJobId() + ":"
+ state.toString(),jobStatus.getJobIdentity().getExperimentId());
JobStatusChangeEvent event = new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity());
- monitorPublisher.publish(event);
+ localEventPublisher.publish(event);
String messageId = AiravataUtils.getId("JOB");
MessageContext msgCntxt = new MessageContext(event, MessageType.JOB, messageId, jobStatus.getJobIdentity().getGatewayId());
msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
@@ -110,8 +110,8 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
for (Object configuration : configurations) {
if (configuration instanceof ExperimentCatalog){
this.airavataExperimentCatalog =(ExperimentCatalog)configuration;
- } else if (configuration instanceof MonitorPublisher){
- this.monitorPublisher=(MonitorPublisher) configuration;
+ } else if (configuration instanceof LocalEventPublisher){
+ this.localEventPublisher =(LocalEventPublisher) configuration;
} else if (configuration instanceof Publisher){
this.publisher=(Publisher) configuration;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java
index 7cfa7ca..e64952c 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java
@@ -23,7 +23,7 @@ package org.apache.airavata.gfac.impl;
import com.google.common.eventbus.Subscribe;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.Publisher;
@@ -45,7 +45,7 @@ import java.util.Calendar;
public class AiravataTaskStatusUpdator implements AbstractActivityListener {
private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class);
private ExperimentCatalog airavataExperimentCatalog;
- private MonitorPublisher monitorPublisher;
+ private LocalEventPublisher localEventPublisher;
private Publisher publisher;
public ExperimentCatalog getAiravataExperimentCatalog() {
@@ -63,7 +63,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
logger.debug("expId - {}: Publishing task status for " + taskStatus.getTaskIdentity().getTaskId() + ":"
+ taskStatus.getState().toString(), taskStatus.getTaskIdentity().getExperimentId());
TaskStatusChangeEvent event = new TaskStatusChangeEvent(taskStatus.getState(), taskStatus.getTaskIdentity());
- monitorPublisher.publish(event);
+ localEventPublisher.publish(event);
String messageId = AiravataUtils.getId("TASK");
MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId, taskStatus.getTaskIdentity().getGatewayId());
msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
@@ -107,7 +107,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
jobStatus.getJobIdentity().getExperimentId(),
jobStatus.getJobIdentity().getGatewayId());
TaskStatusChangeEvent event = new TaskStatusChangeEvent(state, taskIdentity);
- monitorPublisher.publish(event);
+ localEventPublisher.publish(event);
String messageId = AiravataUtils.getId("TASK");
MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId,jobStatus.getJobIdentity().getGatewayId());
msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
@@ -144,8 +144,8 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
for (Object configuration : configurations) {
if (configuration instanceof ExperimentCatalog){
this.airavataExperimentCatalog =(ExperimentCatalog)configuration;
- } else if (configuration instanceof MonitorPublisher){
- this.monitorPublisher=(MonitorPublisher) configuration;
+ } else if (configuration instanceof LocalEventPublisher){
+ this.localEventPublisher =(LocalEventPublisher) configuration;
} else if (configuration instanceof Publisher){
this.publisher=(Publisher) configuration;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java
index ddec551..53c29d1 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java
@@ -22,7 +22,7 @@ package org.apache.airavata.gfac.impl;
import com.google.common.eventbus.Subscribe;
import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.Publisher;
@@ -44,7 +44,7 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
private ExperimentCatalog airavataExperimentCatalog;
- private MonitorPublisher monitorPublisher;
+ private LocalEventPublisher localEventPublisher;
private Publisher publisher;
@@ -88,7 +88,7 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
taskStatus.getTaskIdentity().getExperimentId(),
taskStatus.getTaskIdentity().getGatewayId());
WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent(state, workflowIdentity);
- monitorPublisher.publish(event);
+ localEventPublisher.publish(event);
String messageId = AiravataUtils.getId("WFNODE");
MessageContext msgCntxt = new MessageContext(event, MessageType.WORKFLOWNODE, messageId, taskStatus.getTaskIdentity().getGatewayId());
msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
@@ -119,8 +119,8 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
for (Object configuration : configurations) {
if (configuration instanceof ExperimentCatalog){
this.airavataExperimentCatalog =(ExperimentCatalog)configuration;
- } else if (configuration instanceof MonitorPublisher){
- this.monitorPublisher=(MonitorPublisher) configuration;
+ } else if (configuration instanceof LocalEventPublisher){
+ this.localEventPublisher =(LocalEventPublisher) configuration;
} else if (configuration instanceof Publisher){
this.publisher=(Publisher) configuration;
}
[4/6] airavata git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/airavata
Posted by sh...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/be5c037e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/be5c037e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/be5c037e
Branch: refs/heads/master
Commit: be5c037ec5f4d2c7058f13f9343745909dbe2b48
Parents: 2654de0 8a6b891
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Fri Jun 12 14:40:39 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Fri Jun 12 14:40:39 2015 -0400
----------------------------------------------------------------------
.../airavata-data-models/process_model.thrift | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
[5/6] airavata git commit: Refactored airavata-server.properties file.
Posted by sh...@apache.org.
Refactored airavata-server.properties file.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/debeae01
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/debeae01
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/debeae01
Branch: refs/heads/master
Commit: debeae018fde8dab0c1617a1d1a912090f0ccbec
Parents: be5c037
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Fri Jun 12 15:34:40 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Fri Jun 12 15:34:40 2015 -0400
----------------------------------------------------------------------
.../main/resources/airavata-server.properties | 42 +-------------------
1 file changed, 1 insertion(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/debeae01/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 1e52a16..a86a1d4 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -104,6 +104,7 @@ enable.validation=true
###########################################################################
# GFac Server Configurations
###########################################################################
+gfac=org.apache.airavata.gfac.server.GfacServer
gfac.server.name=gfac-node0
gfac.server.host=localhost
gfac.server.port=8950
@@ -152,10 +153,6 @@ credentialstore=org.apache.airavata.credential.store.server.CredentialStoreServe
credential.store.thrift.server.keystore=/Users/chathuri/dev/airavata/credential-store/oa4mp/airavata.jks
credential.store.thrift.server.keystore.password=airavata
-notifier.enabled=false
-#period in milliseconds
-notifier.duration=5000
-
email.server=smtp.googlemail.com
email.server.port=465
email.user=airavata
@@ -163,20 +160,6 @@ email.password=xxx
email.ssl=true
email.from=airavata@apache.org
-###########################################################################
-# Airavata GFac MyProxy GSI credentials to access Grid Resources.
-###########################################################################
-#
-# Security Configuration used by Airavata Generic Factory Service
-# to interact with Computational Resources.
-#
-gfac=org.apache.airavata.gfac.server.GfacServer
-myproxy.server=myproxy.teragrid.org
-myproxy.username=ogce
-myproxy.password=
-myproxy.life=3600
-# XSEDE Trusted certificates can be downloaded from https://software.xsede.org/security/xsede-certs.tar.gz
-trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates
# SSH PKI key pair or ssh password can be used SSH based authentication is used.
# if user specify both password authentication gets the higher preference
@@ -194,15 +177,6 @@ trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates
#bes.ca.key.pass=passphrase
###########################################################################
-# Advance configuration to change service implementations
-###########################################################################
-# If false, disables two phase commit when submitting jobs
-TwoPhase=true
-#
-# Class which implemented HostScheduler interface. It will determine the which host to submit the request
-#
-
-###########################################################################
# Monitoring module Configuration
###########################################################################
@@ -222,20 +196,6 @@ email.based.monitoring.period=10000
###########################################################################
# AMQP Notification Configuration
###########################################################################
-amqp.notification.enable=1
-amqp.broker.host=localhost
-amqp.broker.port=5672
-amqp.broker.username=guest
-amqp.broker.password=guest
-amqp.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPSenderImpl
-amqp.topic.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPTopicSenderImpl
-amqp.broadcast.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPBroadcastSenderImpl
-
-#,org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor
-#This is the amqp related configuration and this lists down the Rabbitmq host, this is an xsede specific configuration
-amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
-proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
-connection.name=xsede
#publisher
#for simple scenarios we can use the guest user
rabbitmq.broker.url=amqp://localhost:5672
[6/6] airavata git commit: Updated thrift model structure.
Posted by sh...@apache.org.
Updated thrift model structure.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e76851d0
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e76851d0
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e76851d0
Branch: refs/heads/master
Commit: e76851d04f11c2863fb8ef227e2d3304790f938a
Parents: debeae0
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Fri Jun 12 17:36:14 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Fri Jun 12 17:36:14 2015 -0400
----------------------------------------------------------------------
.../airavata-api/airavata_api.thrift | 27 +-
.../airavata-api/airavata_commons.thrift | 54 +++
.../airavata-api/airavata_errors.thrift | 15 -
.../application_interface_model.thrift | 96 +----
.../airavata-api/application_io_models.thrift | 118 ++++++
.../airavata-api/experiment_model.thrift | 399 ++-----------------
.../airavata-api/messaging_events.thrift | 27 +-
.../airavata-api/process_model.thrift | 56 +++
.../airavata-api/scheduling_model.thrift | 39 ++
.../airavata-api/status_models.thrift | 102 +++++
.../airavata-api/task_model.thrift | 75 ++++
.../airavata-api/workflow_data_model.thrift | 6 +-
.../airavata-api/workspace_model.thrift | 4 +-
.../airavata_commons.thrift | 23 +-
.../experiment_model.thrift | 105 +++++
.../airavata-data-models/process_model.thrift | 23 +-
.../airavata-data-models/status_models.thrift | 56 ++-
.../airavata-data-models/task_model.thrift | 4 +-
18 files changed, 706 insertions(+), 523 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-api/airavata_api.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/airavata_api.thrift b/thrift-interface-descriptions/airavata-api/airavata_api.thrift
index fc85c1e..12a4773 100644
--- a/thrift-interface-descriptions/airavata-api/airavata_api.thrift
+++ b/thrift-interface-descriptions/airavata-api/airavata_api.thrift
@@ -26,6 +26,7 @@
include "airavata_errors.thrift"
include "airavata_data_models.thrift"
+include "status_models.thrift"
include "experiment_model.thrift"
include "workspace_model.thrift"
include "compute_resource_model.thrift"
@@ -312,7 +313,7 @@ service Airavata {
* Instead use searchExperimentsByNameWithPagination
*
*/
- list<experiment_model.ExperimentSummary> searchExperimentsByName (1: required string gatewayId,
+ list<experiment_model.ExperimentSummaryModel> searchExperimentsByName (1: required string gatewayId,
2: required string userName, 3: required string expName)
throws (1: airavata_errors.InvalidRequestException ire,
2: airavata_errors.AiravataClientException ace,
@@ -333,7 +334,7 @@ service Airavata {
* @param offset
* The starting point of the results to be fetched
*/
- list<experiment_model.ExperimentSummary> searchExperimentsByNameWithPagination (1: required string gatewayId,
+ list<experiment_model.ExperimentSummaryModel> searchExperimentsByNameWithPagination (1: required string gatewayId,
2: required string userName, 3: required string expName, 4: required i32 limit,
5: required i32 offset)
throws (1: airavata_errors.InvalidRequestException ire,
@@ -352,7 +353,7 @@ service Airavata {
* @deprecated
* Instead use searchExperimentsByDescWithPagination
*/
- list<experiment_model.ExperimentSummary> searchExperimentsByDesc (1: required string gatewayId,
+ list<experiment_model.ExperimentSummaryModel> searchExperimentsByDesc (1: required string gatewayId,
2: required string userName, 3: required string description)
throws (1: airavata_errors.InvalidRequestException ire,
2: airavata_errors.AiravataClientException ace,
@@ -373,7 +374,7 @@ service Airavata {
* @param offset
* The starting point of the results to be fetched
*/
- list<experiment_model.ExperimentSummary> searchExperimentsByDescWithPagination (1: required string gatewayId,
+ list<experiment_model.ExperimentSummaryModel> searchExperimentsByDescWithPagination (1: required string gatewayId,
2: required string userName, 3: required string description, 4: required i32 limit,
5: required i32 offset)
throws (1: airavata_errors.InvalidRequestException ire,
@@ -393,7 +394,7 @@ service Airavata {
* @deprecated
* Instead use searchExperimentsByApplicationWithPagination
*/
- list<experiment_model.ExperimentSummary> searchExperimentsByApplication (1: required string gatewayId,
+ list<experiment_model.ExperimentSummaryModel> searchExperimentsByApplication (1: required string gatewayId,
2: required string userName, 3: required string applicationId)
throws (1: airavata_errors.InvalidRequestException ire,
2: airavata_errors.AiravataClientException ace,
@@ -413,7 +414,7 @@ service Airavata {
* @param offset
* The starting point of the results to be fetched
*/
- list<experiment_model.ExperimentSummary> searchExperimentsByApplicationWithPagination (1: required string gatewayId,
+ list<experiment_model.ExperimentSummaryModel> searchExperimentsByApplicationWithPagination (1: required string gatewayId,
2: required string userName, 3: required string applicationId, 4: required i32 limit,
5: required i32 offset)
throws (1: airavata_errors.InvalidRequestException ire,
@@ -431,8 +432,8 @@ service Airavata {
* @deprecated
* Instead use searchExperimentsByStatusWithPagination
*/
- list<experiment_model.ExperimentSummary> searchExperimentsByStatus (1: required string gatewayId,
- 2: required string userName, 3: required experiment_model.ExperimentState experimentState)
+ list<experiment_model.ExperimentSummaryModel> searchExperimentsByStatus (1: required string gatewayId,
+ 2: required string userName, 3: required status_models.ExperimentState experimentState)
throws (1: airavata_errors.InvalidRequestException ire,
2: airavata_errors.AiravataClientException ace,
3: airavata_errors.AiravataSystemException ase)
@@ -452,8 +453,8 @@ service Airavata {
* @param offset
* The starting point of the results to be fetched
*/
- list<experiment_model.ExperimentSummary> searchExperimentsByStatusWithPagination (1: required string gatewayId,
- 2: required string userName, 3: required experiment_model.ExperimentState experimentState,
+ list<experiment_model.ExperimentSummaryModel> searchExperimentsByStatusWithPagination (1: required string gatewayId,
+ 2: required string userName, 3: required status_models.ExperimentState experimentState,
4: required i32 limit, 5: required i32 offset)
throws (1: airavata_errors.InvalidRequestException ire,
2: airavata_errors.AiravataClientException ace,
@@ -473,7 +474,7 @@ service Airavata {
* @deprecated
* Instead use searchExperimentsByCreationTimeWithPagination
*/
- list<experiment_model.ExperimentSummary> searchExperimentsByCreationTime (1: required string gatewayId,
+ list<experiment_model.ExperimentSummaryModel> searchExperimentsByCreationTime (1: required string gatewayId,
2: required string userName, 3: required i64 fromTime, 4: required i64 toTime)
throws (1: airavata_errors.InvalidRequestException ire,
2: airavata_errors.AiravataClientException ace,
@@ -496,7 +497,7 @@ service Airavata {
* @param offset
* The starting point of the results to be fetched
*/
- list<experiment_model.ExperimentSummary> searchExperimentsByCreationTimeWithPagination (1: required string gatewayId,
+ list<experiment_model.ExperimentSummaryModel> searchExperimentsByCreationTimeWithPagination (1: required string gatewayId,
2: required string userName, 3: required i64 fromTime, 4: required i64 toTime,
5: required i32 limit, 6: required i32 offset)
throws (1: airavata_errors.InvalidRequestException ire,
@@ -518,7 +519,7 @@ service Airavata {
* @param offset
* The starting point of the results to be fetched
*/
- list<experiment_model.ExperimentSummary> searchExperiments(1: required string gatewayId,
+ list<experiment_model.ExperimentSummaryModel> searchExperiments(1: required string gatewayId,
2: required string userName, 3: map<experiment_model.ExperimentSearchFields, string> filters,
4: required i32 limit, 5: required i32 offset)
throws (1: airavata_errors.InvalidRequestException ire,
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-api/airavata_commons.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/airavata_commons.thrift b/thrift-interface-descriptions/airavata-api/airavata_commons.thrift
new file mode 100644
index 0000000..e49fe3f
--- /dev/null
+++ b/thrift-interface-descriptions/airavata-api/airavata_commons.thrift
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace java org.apache.airavata.model.commons
+namespace php Airavata.Model.Commons
+namespace cpp apache.airavata.model.commons
+namespace py apache.airavata.model.commons
+
+const string DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS"
+
+struct ErrorModel {
+ 1: required string errorId = DEFAULT_ID,
+ 2: optional i64 creationTime,
+ 3: optional string actualErrorMessage,
+ 4: optional string userFriendlyMessage,
+ 5: optional bool transientOrPersistent = 0,
+ 6: optional list<string> rootCauseErrorIdList
+}
+
+
+/**
+* This data structure can be used to store the validation results
+* captured during validation step and during the launchExperiment
+* operation it can be easilly checked to see the errors occured
+* during the experiment launch operation
+**/
+
+struct ValidatorResult {
+ 1: required bool result,
+ 2: optional string errorDetails
+}
+
+
+struct ValidationResults {
+ 1: required bool validationState,
+ 2: required list<ValidatorResult> validationResultList
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-api/airavata_errors.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/airavata_errors.thrift b/thrift-interface-descriptions/airavata-api/airavata_errors.thrift
index e0849a1..c445ec1 100644
--- a/thrift-interface-descriptions/airavata-api/airavata_errors.thrift
+++ b/thrift-interface-descriptions/airavata-api/airavata_errors.thrift
@@ -139,21 +139,6 @@ exception AiravataClientException {
2: optional string parameter
}
-struct ValidatorResult {
- 1: required bool result,
- 2: optional string errorDetails
-}
-
-struct ValidationResults {
- 1: required bool validationState,
- 2: required list<ValidatorResult> validationResultList
-}
-
-exception LaunchValidationException {
- 1: required ValidationResults validationResult;
- 2: optional string errorMessage;
-}
-
/**
* This exception is thrown by Airavata Services when a call fails as a result of
* a problem in the service that could not be changed through client's action.
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-api/application_interface_model.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/application_interface_model.thrift b/thrift-interface-descriptions/airavata-api/application_interface_model.thrift
index 4c57009..c7ca231 100644
--- a/thrift-interface-descriptions/airavata-api/application_interface_model.thrift
+++ b/thrift-interface-descriptions/airavata-api/application_interface_model.thrift
@@ -23,6 +23,7 @@
* to application mapping on various resources.
*
*/
+include "application_io_models.thrift"
namespace java org.apache.airavata.model.appcatalog.appinterface
namespace php Airavata.Model.AppCatalog.AppInterface
@@ -32,97 +33,6 @@ namespace py apache.airavata.model.appcatalog.appinterface
const string DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS"
/**
- * Data Types supported in Airavata. The primitive data types
- *
-*/
-enum DataType{
- STRING,
- INTEGER,
- FLOAT,
- URI,
- STDOUT,
- STDERR
-}
-
-/**
- * Application Inputs. The paramters describe how inputs are passed to the application.
- *
- * name:
- * Name of the parameter.
- *
- * value:
- * Value of the parameter. A default value could be set during registration.
- *
- * type:
- * Data type of the parameter
- *
- * applicationArguement:
- * The argument flag sent to the application. Such as -p pressure.
- *
- * standardInput:
- * When this value is set, the parameter is sent as standard input rather than a parameter.
- * Typically this is passed using redirection operator ">".
- *
- * userFriendlyDescription:
- * Description to be displayed at the user interface.
- *
- * metaData:
- * Any metadat. This is typically ignore by Airavata and is used by gateways for application configuration.
- *
-*/
-struct InputDataObjectType {
- 1: required string name,
- 2: optional string value,
- 3: optional DataType type,
- 4: optional string applicationArgument,
- 5: optional bool standardInput = 0,
- 6: optional string userFriendlyDescription,
- 7: optional string metaData,
- 8: optional i32 inputOrder,
- 9: optional bool isRequired,
- 10: optional bool requiredToAddedToCommandLine,
- 11: optional bool dataStaged = 0
-}
-
-/**
- * Application Outputs. The paramters describe how outputs generated by the application.
- *
- * name:
- * Name of the parameter.
- *
- * value:
- * Value of the parameter.
- *
- * type:
- * Data type of the parameter
- *
- * applicationArguement:
- * The argument flag sent to the application. Such as -p pressure.
- *
- * standardInput:
- * When this value is set, the parameter is sent as standard input rather than a parameter.
- * Typically this is passed using redirection operator ">".
- *
- * userFriendlyDescription:
- * Description to be displayed at the user interface.
- *
- * metaData:
- * Any metadat. This is typically ignore by Airavata and is used by gateways for application configuration.
- *
-*/
-struct OutputDataObjectType {
- 1: required string name,
- 2: optional string value,
- 3: optional DataType type,
- 4: optional string applicationArgument,
- 5: optional bool isRequired,
- 6: optional bool requiredToAddedToCommandLine,
- 7: optional bool dataMovement,
- 8: optional string location,
- 9: optional string searchQuery
-}
-
-/**
* Application Interface Description
*
* applicationModules:
@@ -141,6 +51,6 @@ struct ApplicationInterfaceDescription {
2: required string applicationName,
3: optional string applicationDescription,
4: optional list<string> applicationModules,
- 5: optional list<InputDataObjectType> applicationInputs,
- 6: optional list<OutputDataObjectType> applicationOutputs
+ 5: optional list<application_io_models.InputDataObjectType> applicationInputs,
+ 6: optional list<application_io_models.OutputDataObjectType> applicationOutputs
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-api/application_io_models.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/application_io_models.thrift b/thrift-interface-descriptions/airavata-api/application_io_models.thrift
new file mode 100644
index 0000000..4a5b9ef
--- /dev/null
+++ b/thrift-interface-descriptions/airavata-api/application_io_models.thrift
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+include "compute_resource_model.thrift"
+include "application_interface_model.thrift"
+
+namespace java org.apache.airavata.model.application.io
+namespace php Airavata.Model.Application.Io
+namespace cpp apache.airavata.model.application.io
+namespace py apache.airavata.model.application.io
+
+/**
+ * Data Types supported in Airavata. The primitive data types
+ *
+*/
+enum DataType{
+ STRING,
+ INTEGER,
+ FLOAT,
+ URI,
+ STDOUT,
+ STDERR
+}
+
+/**
+ * Application Inputs. The paramters describe how inputs are passed to the application.
+ *
+ * name:
+ * Name of the parameter.
+ *
+ * value:
+ * Value of the parameter. A default value could be set during registration.
+ *
+ * type:
+ * Data type of the parameter
+ *
+ * applicationArguement:
+ * The argument flag sent to the application. Such as -p pressure.
+ *
+ * standardInput:
+ * When this value is set, the parameter is sent as standard input rather than a parameter.
+ * Typically this is passed using redirection operator ">".
+ *
+ * userFriendlyDescription:
+ * Description to be displayed at the user interface.
+ *
+ * metaData:
+ * Any metadat. This is typically ignore by Airavata and is used by gateways for application configuration.
+ *
+*/
+struct InputDataObjectType {
+ 1: required string name,
+ 2: optional string value,
+ 3: optional DataType type,
+ 4: optional string applicationArgument,
+ 5: optional bool standardInput = 0,
+ 6: optional string userFriendlyDescription,
+ 7: optional string metaData,
+ 8: optional i32 inputOrder,
+ 9: optional bool isRequired,
+ 10: optional bool requiredToAddedToCommandLine,
+ 11: optional bool dataStaged = 0
+}
+
+/**
+ * Application Outputs. The paramters describe how outputs generated by the application.
+ *
+ * name:
+ * Name of the parameter.
+ *
+ * value:
+ * Value of the parameter.
+ *
+ * type:
+ * Data type of the parameter
+ *
+ * applicationArguement:
+ * The argument flag sent to the application. Such as -p pressure.
+ *
+ * standardInput:
+ * When this value is set, the parameter is sent as standard input rather than a parameter.
+ * Typically this is passed using redirection operator ">".
+ *
+ * userFriendlyDescription:
+ * Description to be displayed at the user interface.
+ *
+ * metaData:
+ * Any metadat. This is typically ignore by Airavata and is used by gateways for application configuration.
+ *
+*/
+struct OutputDataObjectType {
+ 1: required string name,
+ 2: optional string value,
+ 3: optional DataType type,
+ 4: optional string applicationArgument,
+ 5: optional bool isRequired,
+ 6: optional bool requiredToAddedToCommandLine,
+ 7: optional bool dataMovement,
+ 8: optional string location,
+ 9: optional string searchQuery
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-api/experiment_model.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/experiment_model.thrift b/thrift-interface-descriptions/airavata-api/experiment_model.thrift
index 8896f0d..af577c5 100644
--- a/thrift-interface-descriptions/airavata-api/experiment_model.thrift
+++ b/thrift-interface-descriptions/airavata-api/experiment_model.thrift
@@ -18,228 +18,19 @@
*
*/
-include "compute_resource_model.thrift"
-include "application_interface_model.thrift"
+ include "application_io_models.thrift"
+ include "scheduling_model.thrift"
+ include "airavata_commons.thrift"
+ include "status_models.thrift"
-namespace java org.apache.airavata.model.workspace.experiment
-namespace php Airavata.Model.Workspace.Experiment
-namespace cpp apache.airavata.model.workspace.experiment
-namespace py apache.airavata.model.workspace.experiment
+ namespace java org.apache.airavata.model.experiment
+ namespace php Airavata.Model.Experiment
+ namespace cpp apache.airavata.model.experiment
+ namespace py apache.airavata.model.experiment
-/*
- * This file describes the definitions of the.airavata.registry.core.experiment.Data Structures. Each of the
- * language specific Airavata Client SDK's will translate this neutral data model into an
- * appropriate form for passing to the Airavata Server Execution API Calls.
- *
- * The Experiment data model is divided into 6 categories: experiment metadata, experiment configuration
- * data, experiment generated data, experiment monitoring data, provenance data and error handling data.
- *
- * Experiment Metadata:
- * this structure holds the owner of the experiment, name, description, creation and last update times,
- * last known status, and if is private to the user or shared publicly.
- * FIXME: To start with, we will not define this but populate it inferring data from other structures. This
- * structure needs revisiting once the API gets used.
- *
- * Experiment Configuration Data:
- * this structure will contain all user provided configuration data.
- *
- * Experiment Generated Data:
- * this structure describes all intermediate and output data generated by executing the experiment.
- *
- * Experiment Monitoring Data:
- * this structure contains fine grained experiment status information.
- *
- * Experiment Summary Data:
- * this is derived information from all experiment objects to provide a quick summary.
- *
-*/
-
-const string DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS"
-const string DEFAULT_PROJECT_NAME = "DEFAULT"
-const string SINGLE_APP_NODE_NAME = "SINGLE_APP_NODE"
-
-enum ExperimentState {
- CREATED,
- VALIDATED,
- SCHEDULED,
- LAUNCHED,
- EXECUTING,
- CANCELING,
- CANCELED,
- SUSPENDED,
- COMPLETED,
- FAILED,
- UNKNOWN
-}
-
-enum ExperimentSearchFields {
- EXPERIMENT_NAME,
- EXPERIMENT_DESC,
- APPLICATION_ID,
- FROM_DATE,
- TO_DATE,
- STATUS
-}
-
-struct ExperimentStatus {
- 1: required ExperimentState experimentState,
- 2: optional i64 timeOfStateChange
-}
-
-enum WorkflowNodeState {
- INVOKED,
- EXECUTING,
- CANCELING,
- CANCELED,
- SUSPENDED,
- COMPLETED,
- FAILED,
- UNKNOWN
-}
-
-struct WorkflowNodeStatus {
- 1: required WorkflowNodeState workflowNodeState,
- 2: optional i64 timeOfStateChange
-}
-
-enum TaskState {
- WAITING,
- STARTED,
- PRE_PROCESSING,
- CONFIGURING_WORKSPACE,
- INPUT_DATA_STAGING,
- OUTPUT_DATA_STAGING,
- POST_PROCESSING,
- EXECUTING,
- CANCELING,
- CANCELED,
- COMPLETED,
- FAILED,
- UNKNOWN
-}
-
-struct TaskStatus {
- 1: required TaskState executionState,
- 2: optional i64 timeOfStateChange
-}
-
-enum JobState {
- SUBMITTED,
- UN_SUBMITTED,
- SETUP,
- QUEUED,
- ACTIVE,
- COMPLETE,
- CANCELING,
- CANCELED,
- FAILED,
- HELD,
- SUSPENDED,
- UNKNOWN
-}
-
-struct JobStatus {
- 1: required JobState jobState,
- 2: optional i64 timeOfStateChange
-}
-
-enum TransferState {
- DIRECTORY_SETUP,
- UPLOAD,
- DOWNLOAD,
- ACTIVE,
- COMPLETE,
- STDOUT_DOWNLOAD,
- STDERROR_DOWNLOAD,
- CANCELING,
- CANCELED,
- FAILED,
- HELD,
- SUSPENDED,
- UNKNOWN
-}
-
-struct TransferStatus {
- 1: required TransferState transferState,
- 2: optional i64 timeOfStateChange
-}
-
-struct ApplicationStatus {
- 1: required string applicationState,
- 2: optional i64 timeOfStateChange
-}
-
-enum ActionableGroup {
- RESOURCE_ADMINS,
- AIRAVATA_ADMINS,
- GATEWAYS_ADMINS,
- USER,
- CANNOT_BE_DETERMINED
-}
-
-enum ErrorCategory {
- FILE_SYSTEM_FAILURE,
- APPLICATION_FAILURE,
- RESOURCE_NODE_FAILURE,
- DISK_FULL,
- INSUFFICIENT_ALLOCATION,
- SYSTEM_MAINTENANCE,
- AIRAVATA_INTERNAL_ERROR,
- CANNOT_BE_DETERMINED
-}
-
-enum CorrectiveAction {
- RETRY_SUBMISSION,
- CONTACT_SUPPORT,
- CANNOT_BE_DETERMINED
-}
-
-/**
- * A structure holding the Computational Resource Scheduling.
- *
-*/
-struct ComputationalResourceScheduling {
- 1: optional string resourceHostId,
- 2: optional i32 totalCPUCount,
- 3: optional i32 nodeCount,
- 4: optional i32 numberOfThreads,
- 5: optional string queueName,
- 6: optional i32 wallTimeLimit,
- 7: optional i32 jobStartTime,
- 8: optional i32 totalPhysicalMemory,
- 9: optional string computationalProjectAccount,
- 10: optional string chassisName
-}
-
-/**
- * A structure holding specified input data handling.
- *
-*/
-struct AdvancedInputDataHandling {
- 1: optional bool stageInputFilesToWorkingDir = 0,
- 2: optional string parentWorkingDirectory,
- 3: optional string uniqueWorkingDirectory,
- 4: optional bool cleanUpWorkingDirAfterJob = 0
-}
-
-/**
- * A structure holding specified output data handling.
- *
-*/
-struct AdvancedOutputDataHandling {
- 2: optional string outputDataDir,
- 3: optional string dataRegistryURL,
- 4: optional bool persistOutputData = 1
-}
-
-/**
- * A structure holding Quality of Service Parameters.
- *
-*/
-struct QualityOfServiceParams {
- 1: optional string startExecutionAt,
- 2: optional string executeBefore,
- 3: optional i32 numberofRetries
+enum ExperimentType {
+ SINGLE_APPLICATION,
+ WORKFLOW
}
/**
@@ -247,118 +38,17 @@ struct QualityOfServiceParams {
*
*
*/
-struct UserConfigurationData {
+struct UserConfigurationDataModel {
1: required bool airavataAutoSchedule = 0,
2: required bool overrideManualScheduledParams = 0,
3: optional bool shareExperimentPublicly = 0,
- 4: optional ComputationalResourceScheduling computationalResourceScheduling,
- 5: optional AdvancedInputDataHandling advanceInputDataHandling,
- 6: optional AdvancedOutputDataHandling advanceOutputDataHandling,
- 7: optional QualityOfServiceParams qosParams,
- 8: optional bool throttleResources = 0,
- 9: optional string userDN,
- 10: optional bool generateCert = 0
-}
-
-struct ErrorDetails {
- 1: required string errorID = DEFAULT_ID,
- 2: optional i64 creationTime,
- 3: optional string actualErrorMessage,
- 4: optional string userFriendlyMessage,
- 5: optional ErrorCategory errorCategory,
- 6: optional bool transientOrPersistent = 0,
- 7: optional CorrectiveAction correctiveAction,
- 8: optional ActionableGroup actionableGroup,
- 9: optional list<string> rootCauseErrorIdList
-}
-
-struct JobDetails {
- 1: required string jobID = DEFAULT_ID,
- 2: required string jobDescription,
- 3: optional i64 creationTime,
- 4: optional JobStatus jobStatus,
- 5: optional ApplicationStatus applicationStatus,
- 6: optional list<ErrorDetails> errors,
- 7: optional string computeResourceConsumed,
- 8: optional string jobName,
- 9: optional string workingDir
-}
-
-struct DataTransferDetails {
- 1: required string transferID = DEFAULT_ID,
- 2: optional i64 creationTime,
- 3: required string transferDescription,
- 4: optional TransferStatus transferStatus,
+ 4: optional scheduling_model.ComputationalResourceSchedulingModel computationalResourceScheduling,
+ 5: optional bool throttleResources = 0,
+ 6: optional string userDN,
+ 7: optional bool generateCert = 0
}
/**
- * A structure holding the actual execution context decided based on user provided configuration data or system inferred
- * information from scheduling and QoS parameters. One experiment can have multiple tasks. Each tasks results in
- * data transfers and jobs
- *
-*/
-struct TaskDetails {
- 1: required string taskID = DEFAULT_ID,
- 2: optional i64 creationTime,
- 3: optional string applicationId,
- 4: optional string applicationVersion,
- 5: optional string applicationDeploymentId,
- 6: optional list<application_interface_model.InputDataObjectType> applicationInputs,
- 7: optional list<application_interface_model.OutputDataObjectType> applicationOutputs,
- 8: optional ComputationalResourceScheduling taskScheduling,
- 9: optional AdvancedInputDataHandling advancedInputDataHandling,
- 10: optional AdvancedOutputDataHandling advancedOutputDataHandling,
- 11: optional TaskStatus taskStatus,
- 12: optional list<JobDetails> jobDetailsList,
- 13: optional list<DataTransferDetails> dataTransferDetailsList,
- 14: optional list<ErrorDetails> errors,
- 15: optional bool enableEmailNotification,
- 16: optional list<string> emailAddresses,
-}
-
-enum ExecutionUnit {
- INPUT,
- APPLICATION,
- OUTPUT,
- OTHER
-}
-
-
-/**
-* A structure holding the node data.
-* nodeInstanceId - unique node identifier for each run
-*/
-struct WorkflowNodeDetails {
- 1: required string nodeInstanceId = DEFAULT_ID,
- 2: optional i64 creationTime,
- 3: required string nodeName = SINGLE_APP_NODE_NAME,
- 4: required ExecutionUnit executionUnit = ExecutionUnit.APPLICATION,
- 5: optional string executionUnitData,
- 6: optional list<application_interface_model.InputDataObjectType> nodeInputs,
- 7: optional list<application_interface_model.OutputDataObjectType> nodeOutputs,
- 8: optional WorkflowNodeStatus workflowNodeStatus,
- 9: optional list<TaskDetails> taskDetailsList,
- 10: optional list<ErrorDetails> errors
-}
-
-/**
-* This data structure can be used to store the validation results
-* captured during validation step and during the launchExperiment
-* operation it can be easilly checked to see the errors occured
-* during the experiment launch operation
-**/
-
-struct ValidatorResult {
- 1: required bool result,
- 2: optional string errorDetails
-}
-
-
-struct ValidationResults {
- 1: required bool validationState,
- 2: required list<ValidatorResult> validationResultList
-}
-/**
* A structure holding the experiment metadata and its child models.
*
* userName:
@@ -375,39 +65,34 @@ struct ValidationResults {
* The verbose description of the experiment. This is an optional parameter.
*/
-struct Experiment {
- 1: required string experimentID = DEFAULT_ID,
- 2: required string projectID = DEFAULT_PROJECT_NAME,
- 3: optional i64 creationTime,
+struct ExperimentModel {
+ 1: required string experimentId = DEFAULT_ID,
+ 2: required string projectId = DEFAULT_PROJECT_NAME,
+ 3: required ExperimentType experimentType = ExperimentType.SINGLE_APPLICATION;
4: required string userName,
- 5: required string name,
- 6: optional string description,
- 7: optional string applicationId,
- 8: optional string applicationVersion,
- 9: optional string workflowTemplateId,
- 10: optional string workflowTemplateVersion,
- 11: optional string gatewayExecutionId,
- 12: optional bool enableEmailNotification,
- 13: optional list<string> emailAddresses,
- 14: optional UserConfigurationData userConfigurationData,
- 15: optional string workflowExecutionInstanceId,
- 16: optional list<application_interface_model.InputDataObjectType> experimentInputs,
- 17: optional list<application_interface_model.OutputDataObjectType> experimentOutputs,
- 18: optional ExperimentStatus experimentStatus,
- 19: optional list<WorkflowNodeStatus> stateChangeList,
- 20: optional list<WorkflowNodeDetails> workflowNodeDetailsList,
- 21: optional list<ErrorDetails> errors
-}
-
-struct ExperimentSummary {
- 1: required string experimentID,
- 2: required string projectID,
+ 5: required string experimentName,
+ 6: optional i64 creationTime,
+ 7: optional string description,
+ 8: optional string executionId,
+ 9: optional string gatewayExecutionId,
+ 10: optional bool enableEmailNotification,
+ 11: optional list<string> emailAddresses,
+ 12: optional UserConfigurationDataModel userConfigurationData,
+ 13: optional list<application_io_models.InputDataObjectType> experimentInputs,
+ 14: optional list<application_io_models.OutputDataObjectType> experimentOutputs,
+ 15: optional status_models.ExperimentStatus experimentStatus,
+ 16: optional list<airavata_commons.ErrorModel> errors
+}
+
+struct ExperimentSummaryModel {
+ 1: required string experimentId,
+ 2: required string projectId,
3: optional i64 creationTime,
4: required string userName,
5: required string name,
6: optional string description,
7: optional string applicationId,
- 8: optional ExperimentStatus experimentStatus,
+ 8: optional status_models.ExperimentStatus experimentStatus,
}
struct ExperimentStatistics {
@@ -415,8 +100,8 @@ struct ExperimentStatistics {
2: required i32 completedExperimentCount,
3: optional i32 cancelledExperimentCount,
4: required i32 failedExperimentCount,
- 5: required list<ExperimentSummary> allExperiments,
- 6: optional list<ExperimentSummary> completedExperiments,
- 7: optional list<ExperimentSummary> failedExperiments,
- 8: optional list<ExperimentSummary> cancelledExperiments,
+ 5: required list<ExperimentSummaryModel> allExperiments,
+ 6: optional list<ExperimentSummaryModel> completedExperiments,
+ 7: optional list<ExperimentSummaryModel> failedExperiments,
+ 8: optional list<ExperimentSummaryModel> cancelledExperiments,
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-api/messaging_events.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/messaging_events.thrift b/thrift-interface-descriptions/airavata-api/messaging_events.thrift
index 2f23850..3c55e92 100644
--- a/thrift-interface-descriptions/airavata-api/messaging_events.thrift
+++ b/thrift-interface-descriptions/airavata-api/messaging_events.thrift
@@ -18,8 +18,8 @@
*
*/
-include "experiment_model.thrift"
-include "application_interface_model.thrift"
+include "status_models.thrift"
+include "application_io_models.thrift"
namespace java org.apache.airavata.model.messaging.event
namespace php Airavata.Model.Messaging.Event
@@ -46,22 +46,11 @@ enum MessageType {
}
struct ExperimentStatusChangeEvent {
- 1: required experiment_model.ExperimentState state;
+ 1: required status_models.ExperimentState state;
2: required string experimentId;
3: required string gatewayId;
}
-struct WorkflowIdentifier {
- 1: required string workflowNodeId;
- 2: required string experimentId;
- 3: required string gatewayId;
-}
-
-struct WorkflowNodeStatusChangeEvent {
- 1: required experiment_model.WorkflowNodeState state;
- 2: required WorkflowIdentifier workflowNodeIdentity;
-}
-
struct TaskIdentifier {
1: required string taskId;
2: required string workflowNodeId;
@@ -70,17 +59,17 @@ struct TaskIdentifier {
}
struct TaskStatusChangeEvent {
- 1: required experiment_model.TaskState state;
+ 1: required status_models.TaskState state;
2: required TaskIdentifier taskIdentity;
}
struct TaskStatusChangeRequestEvent {
- 1: required experiment_model.TaskState state;
+ 1: required status_models.TaskState state;
2: required TaskIdentifier taskIdentity;
}
struct TaskOutputChangeEvent {
- 1: required list<application_interface_model.OutputDataObjectType> output;
+ 1: required list<application_io_models.OutputDataObjectType> output;
2: required TaskIdentifier taskIdentity;
}
@@ -124,12 +113,12 @@ struct TaskTerminateEvent{
}
struct JobStatusChangeEvent {
- 1: required experiment_model.JobState state;
+ 1: required status_models.JobState state;
2: required JobIdentifier jobIdentity;
}
struct JobStatusChangeRequestEvent {
- 1: required experiment_model.JobState state;
+ 1: required status_models.JobState state;
2: required JobIdentifier jobIdentity;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-api/process_model.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/process_model.thrift b/thrift-interface-descriptions/airavata-api/process_model.thrift
new file mode 100644
index 0000000..c3f8ba3
--- /dev/null
+++ b/thrift-interface-descriptions/airavata-api/process_model.thrift
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+include "airavata_commons.thrift"
+include "status_models.thrift"
+include "task_model.thrift"
+include "application_io_models.thrift"
+include "scheduling_model.thrift"
+
+namespace java org.apache.airavata.model.process
+namespace php Airavata.Model.Process
+namespace cpp apache.airavata.model.process
+namespace py apache.airavata.model.process
+
+
+/**
+ * ProcessModel: A structure holding the process details. The infromation is derived based on user provided
+ * configuration data or system inferred information from scheduling and QoS parameters.
+ *
+ * processDetail:
+ * A friendly description of the process, usally used to communicate information to users.
+ *
+ *
+*/
+struct ProcessModel {
+ 1: required string processId = airavata_commons.DEFAULT_ID,
+ 2: required string experimentId,
+ 3: optional i64 creationTime,
+ 4: optional i64 lastUpdateTime,
+ 5: optional status_models.ProcessStatus processStatus,
+ 6: optional string processDetail,
+ 7: optional string applicationInterfaceId,
+ 8: optional list<application_io_models.InputDataObjectType> processInputs,
+ 9: optional list<application_io_models.OutputDataObjectType> processOutputs,
+ 10: optional scheduling_model.ComputationalResourceSchedulingModel resourceSchedule,
+ 11: optional list<task_model.TaskModel> tasks,
+ 12: optional string taskDag,
+ 13: optional airavata_commons.ErrorModel processError
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-api/scheduling_model.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/scheduling_model.thrift b/thrift-interface-descriptions/airavata-api/scheduling_model.thrift
new file mode 100644
index 0000000..59c6abd
--- /dev/null
+++ b/thrift-interface-descriptions/airavata-api/scheduling_model.thrift
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace java org.apache.airavata.model.scheduling
+namespace php Airavata.Model.Scheduling
+namespace cpp apache.airavata.model.scheduling
+namespace py apache.airavata.model.scheduling
+
+/**
+ * ComputationalResourceSchedulingModel:
+ *
+ *
+*/
+struct ComputationalResourceSchedulingModel {
+ 1: optional string resourceHostId,
+ 2: optional i32 totalCPUCount,
+ 3: optional i32 nodeCount,
+ 4: optional i32 numberOfThreads,
+ 5: optional string queueName,
+ 6: optional i32 wallTimeLimit,
+ 7: optional i32 totalPhysicalMemory,
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-api/status_models.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/status_models.thrift b/thrift-interface-descriptions/airavata-api/status_models.thrift
new file mode 100644
index 0000000..d361f95
--- /dev/null
+++ b/thrift-interface-descriptions/airavata-api/status_models.thrift
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace java org.apache.airavata.model.status
+namespace php Airavata.Model.Status
+namespace cpp apache.airavata.model.status
+namespace py apache.airavata.model.status
+
+enum ExperimentState {
+ CREATED,
+ VALIDATED,
+ SCHEDULED,
+ LAUNCHED,
+ EXECUTING,
+ CANCELING,
+ CANCELED,
+ COMPLETED,
+ FAILED
+}
+
+enum TaskState {
+ CREATED,
+ EXECUTING,
+ COMPLETED,
+ FAILED,
+ CANCELED
+}
+
+enum ProcessState {
+ CREATED,
+ VALIDATED,
+ EXECUTING,
+ COMPLETED,
+ FAILED,
+ CANCELLING,
+ CANCELED
+}
+
+enum JobState {
+ SUBMITTED,
+ QUEUED,
+ ACTIVE,
+ COMPLETE,
+ CANCELED,
+ FAILED,
+ SUSPENDED,
+ UNKNOWN
+}
+
+/**
+ * Status: A generic status object.
+ *
+ * state:
+ * State .
+ *
+ * timeOfStateChange:
+ * time the status was last updated.
+ *
+ * reason:
+ * User friendly reason on how the state is inferred.
+ *
+*/
+struct ExperimentStatus {
+ 1: required ExperimentState state,
+ 2: optional i64 timeOfStateChange,
+ 3: optional string reason
+}
+
+struct ProcessStatus {
+ 1: required ProcessState state,
+ 2: optional i64 timeOfStateChange,
+ 3: optional string reason
+}
+
+struct TaskStatus {
+ 1: required TaskState state,
+ 2: optional i64 timeOfStateChange,
+ 3: optional string reason
+}
+
+struct JobStatus {
+ 1: required JobState jobState,
+ 2: optional i64 timeOfStateChange,
+ 3: optional string reason
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-api/task_model.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/task_model.thrift b/thrift-interface-descriptions/airavata-api/task_model.thrift
new file mode 100644
index 0000000..43f89e9
--- /dev/null
+++ b/thrift-interface-descriptions/airavata-api/task_model.thrift
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+include "airavata_commons.thrift"
+include "status_models.thrift"
+
+namespace java org.apache.airavata.model.task
+namespace php Airavata.Model.Task
+namespace cpp apache.airavata.model.task
+namespace py apache.airavata.model.task
+
+/**
+ * TaskTypes: An enumerated list of TaskTypes. Task being generic, the task type will provide the concrete interpretation.
+ *
+*/
+enum TaskTypes {
+ ENV_SETUP,
+ DATA_STAGING,
+ JOB_SUBMISSION,
+ ENV_CLEANUP,
+ MONITORING
+}
+
+/**
+ * TaskModel: A structure holding the generic task details.
+ *
+ * taskDetail:
+ * A friendly description of the task, usally used to communicate information to users.
+ *
+ * taskInternalStore:
+ * A generic byte object for the Task developer to store internal serialized data into registry catalogs.
+*/
+struct TaskModel {
+ 1: required string taskId = airavata_commons.DEFAULT_ID,
+ 2: required TaskTypes taskType,
+ 3: required string parentProcessId,
+ 4: required i64 creationTime,
+ 5: required i64 lastUpdateTime,
+ 6: required status_models.TaskStatus taskStatus,
+ 7: optional string taskDetail,
+ 8: optional byte taskInternalStore,
+ 9: optional airavata_commons.ErrorModel taskError
+}
+
+/**
+ * DataStagingTaskModel: A structure holding the data staging task details.
+ *
+ * Source and Destination locations includes standard representation of protocol, host, port and path
+ * A friendly description of the task, usally used to communicate information to users.
+ *
+*/
+struct DataStagingTaskModel {
+ 1: required string source,
+ 2: required string destination,
+ 3: optional i64 transferStartTime,
+ 4: optional i64 transferEndTime,
+ 5: optional string transferRate
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-api/workflow_data_model.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/workflow_data_model.thrift b/thrift-interface-descriptions/airavata-api/workflow_data_model.thrift
index 920aef3..e35a951 100644
--- a/thrift-interface-descriptions/airavata-api/workflow_data_model.thrift
+++ b/thrift-interface-descriptions/airavata-api/workflow_data_model.thrift
@@ -23,7 +23,7 @@ namespace java org.apache.airavata.model
namespace php Airavata.Model
namespace py apache.airavata.model.workflow
-include "application_interface_model.thrift"
+include "application_io_models.thrift"
/*
* This file describes the definitions of the Airavata Execution Data Structures. Each of the
@@ -38,6 +38,6 @@ struct Workflow {
2: required string name,
3: optional string graph,
4: optional binary image,
- 5: optional list<application_interface_model.InputDataObjectType> workflowInputs,
- 6: optional list<application_interface_model.OutputDataObjectType> workflowOutputs
+ 5: optional list<application_io_models.InputDataObjectType> workflowInputs,
+ 6: optional list<application_io_models.OutputDataObjectType> workflowOutputs
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-api/workspace_model.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/workspace_model.thrift b/thrift-interface-descriptions/airavata-api/workspace_model.thrift
index 46ed1c6..9ccd6e5 100644
--- a/thrift-interface-descriptions/airavata-api/workspace_model.thrift
+++ b/thrift-interface-descriptions/airavata-api/workspace_model.thrift
@@ -18,8 +18,6 @@
*
*/
-include "experiment_model.thrift"
-
namespace java org.apache.airavata.model.workspace
namespace php Airavata.Model.Workspace
namespace cpp apache.airavata.model.workspace
@@ -41,7 +39,7 @@ struct Group {
}
struct Project {
- 1: required string projectID = experiment_model.DEFAULT_PROJECT_NAME,
+ 1: required string projectID,
2: required string owner,
3: required string name,
4: optional string description
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-data-models/airavata_commons.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-data-models/airavata_commons.thrift b/thrift-interface-descriptions/airavata-data-models/airavata_commons.thrift
index 90a4132..e49fe3f 100644
--- a/thrift-interface-descriptions/airavata-data-models/airavata_commons.thrift
+++ b/thrift-interface-descriptions/airavata-data-models/airavata_commons.thrift
@@ -25,11 +25,30 @@ namespace py apache.airavata.model.commons
const string DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS"
-struct ErrorDetails {
- 1: required string errorID = DEFAULT_ID,
+struct ErrorModel {
+ 1: required string errorId = DEFAULT_ID,
2: optional i64 creationTime,
3: optional string actualErrorMessage,
4: optional string userFriendlyMessage,
5: optional bool transientOrPersistent = 0,
6: optional list<string> rootCauseErrorIdList
+}
+
+
+/**
+* This data structure can be used to store the validation results
+* captured during validation step and during the launchExperiment
+* operation it can be easilly checked to see the errors occured
+* during the experiment launch operation
+**/
+
+struct ValidatorResult {
+ 1: required bool result,
+ 2: optional string errorDetails
+}
+
+
+struct ValidationResults {
+ 1: required bool validationState,
+ 2: required list<ValidatorResult> validationResultList
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-data-models/experiment_model.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-data-models/experiment_model.thrift b/thrift-interface-descriptions/airavata-data-models/experiment_model.thrift
new file mode 100644
index 0000000..6cf0e75
--- /dev/null
+++ b/thrift-interface-descriptions/airavata-data-models/experiment_model.thrift
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+ include "application_io_models.thrift"
+ include "scheduling_model.thrift"
+ include "airavata_commons.thrift"
+ include "status_models.thrift"
+
+ namespace java org.apache.airavata.model.experiment
+ namespace php Airavata.Model.Experiment
+ namespace cpp apache.airavata.model.experiment
+ namespace py apache.airavata.model.experiment
+
+enum ExperimentType {
+ SINGLE_APPLICATION,
+ WORKFLOW
+}
+
+enum ExperimentSearchFields {
+ EXPERIMENT_NAME,
+ EXPERIMENT_DESC,
+ APPLICATION_ID,
+ FROM_DATE,
+ TO_DATE,
+ STATUS
+}
+/**
+ * A structure holding the experiment configuration.
+ *
+ *
+*/
+struct UserConfigurationDataModel {
+ 1: required bool airavataAutoSchedule = 0,
+ 2: required bool overrideManualScheduledParams = 0,
+ 3: optional bool shareExperimentPublicly = 0,
+ 4: optional scheduling_model.ComputationalResourceSchedulingModel computationalResourceScheduling,
+ 5: optional bool throttleResources = 0,
+ 6: optional string userDN,
+ 7: optional bool generateCert = 0
+}
+
+/**
+ * A structure holding the experiment metadata and its child models.
+ *
+ * userName:
+ * The user name of the targeted gateway end user on whose behalf the experiment is being created.
+ * the associated gateway identity can only be inferred from the security hand-shake so as to avoid
+ * authorized Airavata Clients mimicking an unauthorized request. If a gateway is not registered with
+ * Airavata, an authorization exception is thrown.
+ *
+ * experimentName:
+ * The name of the experiment as defined by the user. The name need not be unique as uniqueness is enforced
+ * by the generated experiment id.
+ *
+ * experimentDescription:
+ * The verbose description of the experiment. This is an optional parameter.
+*/
+
+struct ExperimentModel {
+ 1: required string experimentId = DEFAULT_ID,
+ 2: required string projectId = DEFAULT_PROJECT_NAME,
+ 3: required ExperimentType experimentType = ExperimentType.SINGLE_APPLICATION;
+ 4: required string userName,
+ 5: required string experimentName,
+ 6: optional i64 creationTime,
+ 7: optional string description,
+ 8: optional string executionId,
+ 9: optional string gatewayExecutionId,
+ 10: optional bool enableEmailNotification,
+ 11: optional list<string> emailAddresses,
+ 12: optional UserConfigurationDataModel userConfigurationData,
+ 13: optional list<application_io_models.InputDataObjectType> experimentInputs,
+ 14: optional list<application_io_models.OutputDataObjectType> experimentOutputs,
+ 15: optional status_models.ExperimentStatus experimentStatus,
+ 16: optional list<airavata_commons.ErrorModel> errors
+}
+
+struct ExperimentSummaryModel {
+ 1: required string experimentId,
+ 2: required string projectId,
+ 3: optional i64 creationTime,
+ 4: required string userName,
+ 5: required string name,
+ 6: optional string description,
+ 7: optional string applicationId,
+ 8: optional status_models.ExperimentStatus experimentStatus,
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-data-models/process_model.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-data-models/process_model.thrift b/thrift-interface-descriptions/airavata-data-models/process_model.thrift
index a0051f8..c3f8ba3 100644
--- a/thrift-interface-descriptions/airavata-data-models/process_model.thrift
+++ b/thrift-interface-descriptions/airavata-data-models/process_model.thrift
@@ -41,15 +41,16 @@ namespace py apache.airavata.model.process
*/
struct ProcessModel {
1: required string processId = airavata_commons.DEFAULT_ID,
- 2: optional i64 creationTime,
- 3: optional i64 lastUpdateTime,
- 4: optional status_models.Status processStatus,
- 5: optional string processDetail,
- 6: optional string applicationInterfaceId,
- 7: optional list<application_io_models.InputDataObjectType> processInputs,
- 8: optional list<application_io_models.OutputDataObjectType> processOutputs,
- 9: optional scheduling_model.ComputationalResourceSchedulingModel resourceSchedule,
- 10: optional list<task_model.TaskModel> tasks,
- 11: optional string taskDag,
- 12: optional airavata_commons.ErrorDetails processErrorDetails
+ 2: required string experimentId,
+ 3: optional i64 creationTime,
+ 4: optional i64 lastUpdateTime,
+ 5: optional status_models.ProcessStatus processStatus,
+ 6: optional string processDetail,
+ 7: optional string applicationInterfaceId,
+ 8: optional list<application_io_models.InputDataObjectType> processInputs,
+ 9: optional list<application_io_models.OutputDataObjectType> processOutputs,
+ 10: optional scheduling_model.ComputationalResourceSchedulingModel resourceSchedule,
+ 11: optional list<task_model.TaskModel> tasks,
+ 12: optional string taskDag,
+ 13: optional airavata_commons.ErrorModel processError
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-data-models/status_models.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-data-models/status_models.thrift b/thrift-interface-descriptions/airavata-data-models/status_models.thrift
index 458135c..d361f95 100644
--- a/thrift-interface-descriptions/airavata-data-models/status_models.thrift
+++ b/thrift-interface-descriptions/airavata-data-models/status_models.thrift
@@ -23,7 +23,7 @@ namespace php Airavata.Model.Status
namespace cpp apache.airavata.model.status
namespace py apache.airavata.model.status
-enum State {
+enum ExperimentState {
CREATED,
VALIDATED,
SCHEDULED,
@@ -31,11 +31,39 @@ enum State {
EXECUTING,
CANCELING,
CANCELED,
- SUSPENDED,
COMPLETED,
FAILED
}
+enum TaskState {
+ CREATED,
+ EXECUTING,
+ COMPLETED,
+ FAILED,
+ CANCELED
+}
+
+enum ProcessState {
+ CREATED,
+ VALIDATED,
+ EXECUTING,
+ COMPLETED,
+ FAILED,
+ CANCELLING,
+ CANCELED
+}
+
+enum JobState {
+ SUBMITTED,
+ QUEUED,
+ ACTIVE,
+ COMPLETE,
+ CANCELED,
+ FAILED,
+ SUSPENDED,
+ UNKNOWN
+}
+
/**
* Status: A generic status object.
*
@@ -49,8 +77,26 @@ enum State {
* User friendly reason on how the state is inferred.
*
*/
-struct Status {
- 1: required State state,
+struct ExperimentStatus {
+ 1: required ExperimentState state,
+ 2: optional i64 timeOfStateChange,
+ 3: optional string reason
+}
+
+struct ProcessStatus {
+ 1: required ProcessState state,
+ 2: optional i64 timeOfStateChange,
+ 3: optional string reason
+}
+
+struct TaskStatus {
+ 1: required TaskState state,
+ 2: optional i64 timeOfStateChange,
+ 3: optional string reason
+}
+
+struct JobStatus {
+ 1: required JobState jobState,
2: optional i64 timeOfStateChange,
- 3: string reason
+ 3: optional string reason
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e76851d0/thrift-interface-descriptions/airavata-data-models/task_model.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-data-models/task_model.thrift b/thrift-interface-descriptions/airavata-data-models/task_model.thrift
index ee82c74..43f89e9 100644
--- a/thrift-interface-descriptions/airavata-data-models/task_model.thrift
+++ b/thrift-interface-descriptions/airavata-data-models/task_model.thrift
@@ -53,10 +53,10 @@ struct TaskModel {
3: required string parentProcessId,
4: required i64 creationTime,
5: required i64 lastUpdateTime,
- 6: required status_models.Status taskStatus,
+ 6: required status_models.TaskStatus taskStatus,
7: optional string taskDetail,
8: optional byte taskInternalStore,
- 9: optional airavata_commons.ErrorDetails taskErrorDetails
+ 9: optional airavata_commons.ErrorModel taskError
}
/**