You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/06/24 20:05:48 UTC
[1/5] committing the initial version of zk work with resubmitting all
the failed jobs to the available gfac cluster nodes
Repository: airavata
Updated Branches:
refs/heads/master f723d4937 -> d56dd443e
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
new file mode 100644
index 0000000..24b3989
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.monitor;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
+import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+public class GfacInternalStatusUpdator implements AbstractActivityListener, Watcher {
+ private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
+
+ private ZooKeeper zk;
+
+ private Integer mutex = -1;
+
+ @Subscribe
+ public void updateZK(GfacExperimentStateChangeRequest statusChangeRequest) throws KeeperException, InterruptedException, ApplicationSettingsException {
+ logger.info("Gfac internal state changed to: " + statusChangeRequest.getState().toString());
+ MonitorID monitorID = statusChangeRequest.getMonitorID();
+ String experimentPath = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments") +
+ File.separator + ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME) + File.separator + statusChangeRequest.getMonitorID().getExperimentID() + "+" + monitorID.getTaskID();
+ Stat exists = null;
+ try {
+ if (!zk.getState().isConnected()) {
+ String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+ + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+ zk = new ZooKeeper(zkhostPort, 6000, this);
+ synchronized (mutex){
+ mutex.wait();
+ }
+ }
+ exists = zk.exists(experimentPath, false);// this znode is created by orchestrator so it has to exist at this level
+ if (exists == null) {
+ logger.error("ZK path: " + experimentPath + " does not exists !!");
+ logger.error("Zookeeper is in an inconsistent state !!! ");
+ return;
+ }
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ switch (statusChangeRequest.getState()) {
+ case COMPLETED:
+ zk.delete(experimentPath, exists.getVersion());
+ break;
+ case FAILED:
+ zk.delete(experimentPath, exists.getVersion());
+ break;
+ default:
+ zk.setData(experimentPath, (statusChangeRequest.getMonitorID().getJobID() +
+ "," + statusChangeRequest.getMonitorID().getWorkflowNodeID()).getBytes(), exists.getVersion());
+ }
+ }
+
+ public void setup(Object... configurations) {
+ for (Object configuration : configurations) {
+ if (configuration instanceof ZooKeeper) {
+ this.zk = (ZooKeeper) configuration;
+ }
+ }
+ }
+
+ public void process(WatchedEvent watchedEvent) {
+ synchronized (mutex) {
+ Event.KeeperState state = watchedEvent.getState();
+ if (state == Event.KeeperState.SyncConnected) {
+ mutex.notify();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
index 84978f1..8456e35 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
@@ -78,10 +78,14 @@ public class MonitorID {
this.jobExecutionContext = jobExecutionContext;
host = jobExecutionContext.getApplicationContext().getHostDescription();
userName = jobExecutionContext.getExperiment().getUserName();
- jobID = jobExecutionContext.getJobDetails().getJobID();
taskID = jobExecutionContext.getTaskData().getTaskID();
experimentID = jobExecutionContext.getExperiment().getExperimentID();
workflowNodeID = jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId();// at this point we only have one node todo: fix this
+ try {
+ jobID = jobExecutionContext.getJobDetails().getJobID();
+ }catch(NullPointerException e){
+ logger.error("There is not job created at this point");
+ }
}
public HostDescription getHost() {
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
new file mode 100644
index 0000000..50a60de
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.monitor.state;
+
+import org.apache.airavata.gfac.core.monitor.JobIdentity;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+
+public class GfacExperimentStateChangeRequest {
+ private GfacExperimentState state;
+
+ private JobIdentity identity;
+
+ private MonitorID monitorID;
+
+ public GfacExperimentStateChangeRequest(MonitorID monitorID, GfacExperimentState state) {
+ setIdentity(new JobIdentity(monitorID.getExperimentID(), monitorID.getWorkflowNodeID(),
+ monitorID.getTaskID(), monitorID.getJobID()));
+ setMonitorID(monitorID);
+ this.state = state;
+ }
+
+ public GfacExperimentStateChangeRequest(MonitorID monitorID, JobIdentity jobId, GfacExperimentState state) {
+ setIdentity(jobId);
+ setMonitorID(monitorID);
+ this.state = state;
+ }
+
+
+ public GfacExperimentState getState() {
+ return state;
+ }
+
+ public void setState(GfacExperimentState state) {
+ this.state = state;
+ }
+
+ public JobIdentity getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(JobIdentity identity) {
+ this.identity = identity;
+ }
+
+ public MonitorID getMonitorID() {
+ return monitorID;
+ }
+
+ public void setMonitorID(MonitorID monitorID) {
+ this.monitorID = monitorID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
index 7b24eb9..6e3f59e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
@@ -24,6 +24,7 @@ package org.apache.airavata.gfac.core.provider;
import com.google.common.eventbus.EventBus;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.cpi.GFacImpl;
import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.model.workspace.experiment.JobDetails;
@@ -44,8 +45,11 @@ public abstract class AbstractProvider implements GFacProvider{
protected MonitorPublisher monitorPublisher;
- protected AbstractProvider() {
+ protected AbstractProvider() { //todo this has to be fixed
this.monitorPublisher = GFacImpl.getMonitorPublisher();
+ if(this.monitorPublisher == null){
+ this.monitorPublisher = BetterGfacImpl.getMonitorPublisher();
+ }
}
public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
new file mode 100644
index 0000000..8d06aad
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
@@ -0,0 +1,82 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.airavata.gfac.workspace.experiment;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+@SuppressWarnings("all") public enum GfacExperimentState implements org.apache.thrift.TEnum {
+ INHANDLERSINVOKING(0),
+ INHANDLERSINVOKED(1),
+ PROVIDERINVOKING(2),
+ PROVIDERINVOKED(3),
+ OUTHANDLERSINVOKING(4),
+ OUTHANDLERSINVOKED(5),
+ COMPLETED(6),
+ FAILED(7),
+ UNKNOWN(8);
+
+ private final int value;
+
+ private GfacExperimentState(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static GfacExperimentState findByValue(int value) {
+ switch (value) {
+ case 0:
+ return INHANDLERSINVOKING;
+ case 1:
+ return INHANDLERSINVOKED;
+ case 2:
+ return PROVIDERINVOKING;
+ case 3:
+ return PROVIDERINVOKED;
+ case 4:
+ return OUTHANDLERSINVOKING;
+ case 5:
+ return OUTHANDLERSINVOKED;
+ case 6:
+ return COMPLETED;
+ case 7:
+ return FAILED;
+ case 8:
+ return UNKNOWN;
+ default:
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
new file mode 100644
index 0000000..3b9273d
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
@@ -0,0 +1,516 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.airavata.gfac.workspace.experiment;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all") public class GfacExperimentStatus implements org.apache.thrift.TBase<GfacExperimentStatus, GfacExperimentStatus._Fields>, java.io.Serializable, Cloneable, Comparable<GfacExperimentStatus> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("GfacExperimentStatus");
+
+ private static final org.apache.thrift.protocol.TField GFAC_EXPERIMENT_STATE_FIELD_DESC = new org.apache.thrift.protocol.TField("gfacExperimentState", org.apache.thrift.protocol.TType.I32, (short)1);
+ private static final org.apache.thrift.protocol.TField TIME_OF_STATE_CHANGE_FIELD_DESC = new org.apache.thrift.protocol.TField("timeOfStateChange", org.apache.thrift.protocol.TType.I64, (short)2);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new GfacExperimentStatusStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new GfacExperimentStatusTupleSchemeFactory());
+ }
+
+ /**
+ *
+ * @see GfacExperimentState
+ */
+ public GfacExperimentState gfacExperimentState; // required
+ public long timeOfStateChange; // optional
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ /**
+ *
+ * @see GfacExperimentState
+ */
+ GFAC_EXPERIMENT_STATE((short)1, "gfacExperimentState"),
+ TIME_OF_STATE_CHANGE((short)2, "timeOfStateChange");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // GFAC_EXPERIMENT_STATE
+ return GFAC_EXPERIMENT_STATE;
+ case 2: // TIME_OF_STATE_CHANGE
+ return TIME_OF_STATE_CHANGE;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ private static final int __TIMEOFSTATECHANGE_ISSET_ID = 0;
+ private byte __isset_bitfield = 0;
+ private _Fields optionals[] = {_Fields.TIME_OF_STATE_CHANGE};
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.GFAC_EXPERIMENT_STATE, new org.apache.thrift.meta_data.FieldMetaData("gfacExperimentState", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, GfacExperimentState.class)));
+ tmpMap.put(_Fields.TIME_OF_STATE_CHANGE, new org.apache.thrift.meta_data.FieldMetaData("timeOfStateChange", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(GfacExperimentStatus.class, metaDataMap);
+ }
+
+ public GfacExperimentStatus() {
+ }
+
+ public GfacExperimentStatus(
+ GfacExperimentState gfacExperimentState)
+ {
+ this();
+ this.gfacExperimentState = gfacExperimentState;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public GfacExperimentStatus(GfacExperimentStatus other) {
+ __isset_bitfield = other.__isset_bitfield;
+ if (other.isSetGfacExperimentState()) {
+ this.gfacExperimentState = other.gfacExperimentState;
+ }
+ this.timeOfStateChange = other.timeOfStateChange;
+ }
+
+ public GfacExperimentStatus deepCopy() {
+ return new GfacExperimentStatus(this);
+ }
+
+ @Override
+ public void clear() {
+ this.gfacExperimentState = null;
+ setTimeOfStateChangeIsSet(false);
+ this.timeOfStateChange = 0;
+ }
+
+ /**
+ *
+ * @see GfacExperimentState
+ */
+ public GfacExperimentState getGfacExperimentState() {
+ return this.gfacExperimentState;
+ }
+
+ /**
+ *
+ * @see GfacExperimentState
+ */
+ public GfacExperimentStatus setGfacExperimentState(GfacExperimentState gfacExperimentState) {
+ this.gfacExperimentState = gfacExperimentState;
+ return this;
+ }
+
+ public void unsetGfacExperimentState() {
+ this.gfacExperimentState = null;
+ }
+
+ /** Returns true if field gfacExperimentState is set (has been assigned a value) and false otherwise */
+ public boolean isSetGfacExperimentState() {
+ return this.gfacExperimentState != null;
+ }
+
+ public void setGfacExperimentStateIsSet(boolean value) {
+ if (!value) {
+ this.gfacExperimentState = null;
+ }
+ }
+
+ public long getTimeOfStateChange() {
+ return this.timeOfStateChange;
+ }
+
+ public GfacExperimentStatus setTimeOfStateChange(long timeOfStateChange) {
+ this.timeOfStateChange = timeOfStateChange;
+ setTimeOfStateChangeIsSet(true);
+ return this;
+ }
+
+ public void unsetTimeOfStateChange() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID);
+ }
+
+ /** Returns true if field timeOfStateChange is set (has been assigned a value) and false otherwise */
+ public boolean isSetTimeOfStateChange() {
+ return EncodingUtils.testBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID);
+ }
+
+ public void setTimeOfStateChangeIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID, value);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case GFAC_EXPERIMENT_STATE:
+ if (value == null) {
+ unsetGfacExperimentState();
+ } else {
+ setGfacExperimentState((GfacExperimentState)value);
+ }
+ break;
+
+ case TIME_OF_STATE_CHANGE:
+ if (value == null) {
+ unsetTimeOfStateChange();
+ } else {
+ setTimeOfStateChange((Long)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case GFAC_EXPERIMENT_STATE:
+ return getGfacExperimentState();
+
+ case TIME_OF_STATE_CHANGE:
+ return Long.valueOf(getTimeOfStateChange());
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case GFAC_EXPERIMENT_STATE:
+ return isSetGfacExperimentState();
+ case TIME_OF_STATE_CHANGE:
+ return isSetTimeOfStateChange();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof GfacExperimentStatus)
+ return this.equals((GfacExperimentStatus)that);
+ return false;
+ }
+
+ public boolean equals(GfacExperimentStatus that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_gfacExperimentState = true && this.isSetGfacExperimentState();
+ boolean that_present_gfacExperimentState = true && that.isSetGfacExperimentState();
+ if (this_present_gfacExperimentState || that_present_gfacExperimentState) {
+ if (!(this_present_gfacExperimentState && that_present_gfacExperimentState))
+ return false;
+ if (!this.gfacExperimentState.equals(that.gfacExperimentState))
+ return false;
+ }
+
+ boolean this_present_timeOfStateChange = true && this.isSetTimeOfStateChange();
+ boolean that_present_timeOfStateChange = true && that.isSetTimeOfStateChange();
+ if (this_present_timeOfStateChange || that_present_timeOfStateChange) {
+ if (!(this_present_timeOfStateChange && that_present_timeOfStateChange))
+ return false;
+ if (this.timeOfStateChange != that.timeOfStateChange)
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public int compareTo(GfacExperimentStatus other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(isSetGfacExperimentState()).compareTo(other.isSetGfacExperimentState());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetGfacExperimentState()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.gfacExperimentState, other.gfacExperimentState);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetTimeOfStateChange()).compareTo(other.isSetTimeOfStateChange());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetTimeOfStateChange()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timeOfStateChange, other.timeOfStateChange);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("GfacExperimentStatus(");
+ boolean first = true;
+
+ sb.append("gfacExperimentState:");
+ if (this.gfacExperimentState == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.gfacExperimentState);
+ }
+ first = false;
+ if (isSetTimeOfStateChange()) {
+ if (!first) sb.append(", ");
+ sb.append("timeOfStateChange:");
+ sb.append(this.timeOfStateChange);
+ first = false;
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (gfacExperimentState == null) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'gfacExperimentState' was not present! Struct: " + toString());
+ }
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class GfacExperimentStatusStandardSchemeFactory implements SchemeFactory {
+ public GfacExperimentStatusStandardScheme getScheme() {
+ return new GfacExperimentStatusStandardScheme();
+ }
+ }
+
+ private static class GfacExperimentStatusStandardScheme extends StandardScheme<GfacExperimentStatus> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // GFAC_EXPERIMENT_STATE
+ if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+ struct.gfacExperimentState = GfacExperimentState.findByValue(iprot.readI32());
+ struct.setGfacExperimentStateIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // TIME_OF_STATE_CHANGE
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.timeOfStateChange = iprot.readI64();
+ struct.setTimeOfStateChangeIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.gfacExperimentState != null) {
+ oprot.writeFieldBegin(GFAC_EXPERIMENT_STATE_FIELD_DESC);
+ oprot.writeI32(struct.gfacExperimentState.getValue());
+ oprot.writeFieldEnd();
+ }
+ if (struct.isSetTimeOfStateChange()) {
+ oprot.writeFieldBegin(TIME_OF_STATE_CHANGE_FIELD_DESC);
+ oprot.writeI64(struct.timeOfStateChange);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class GfacExperimentStatusTupleSchemeFactory implements SchemeFactory {
+ public GfacExperimentStatusTupleScheme getScheme() {
+ return new GfacExperimentStatusTupleScheme();
+ }
+ }
+
+ private static class GfacExperimentStatusTupleScheme extends TupleScheme<GfacExperimentStatus> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ oprot.writeI32(struct.gfacExperimentState.getValue());
+ BitSet optionals = new BitSet();
+ if (struct.isSetTimeOfStateChange()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetTimeOfStateChange()) {
+ oprot.writeI64(struct.timeOfStateChange);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ struct.gfacExperimentState = GfacExperimentState.findByValue(iprot.readI32());
+ struct.setGfacExperimentStateIsSet(true);
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.timeOfStateChange = iprot.readI64();
+ struct.setTimeOfStateChangeIsSet(true);
+ }
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
new file mode 100644
index 0000000..20099f2
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
@@ -0,0 +1,59 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.airavata.gfac.workspace.experiment;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all") public class gfacDataModelConstants {
+
+ public static final String DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS";
+
+ public static final String DEFAULT_PROJECT_NAME = "DEFAULT";
+
+ public static final String SINGLE_APP_NODE_NAME = "SINGLE_APP_NODE";
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-thrift-descriptions/generate-gfac-stubs.sh
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-thrift-descriptions/generate-gfac-stubs.sh b/modules/gfac/gfac-thrift-descriptions/generate-gfac-stubs.sh
index 73cdba3..f2adf51 100755
--- a/modules/gfac/gfac-thrift-descriptions/generate-gfac-stubs.sh
+++ b/modules/gfac/gfac-thrift-descriptions/generate-gfac-stubs.sh
@@ -118,6 +118,8 @@ rm -rf ${JAVA_GEN_DIR}
# Using thrify Java generator, generate the java classes based on Airavata API. This
# The airavataAPI.thrift includes rest of data models.
thrift ${THRIFT_ARGS} --gen java gfac.cpi.service.thrift || fail unable to generate java thrift classes
+thrift ${THRIFT_ARGS} --gen java gfacDataModel.thrift || fail unable to generate java thrift classes
+
# For the generated java classes add the ASF V2 License header
add_license_header $JAVA_GEN_DIR
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-thrift-descriptions/gfacDataModel.thrift
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-thrift-descriptions/gfacDataModel.thrift b/modules/gfac/gfac-thrift-descriptions/gfacDataModel.thrift
new file mode 100644
index 0000000..2438e3c
--- /dev/null
+++ b/modules/gfac/gfac-thrift-descriptions/gfacDataModel.thrift
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+include "applicationCatalogDataModel.thrift"
+
+namespace java org.apache.airavata.gfac.workspace.experiment
+namespace php Airavata.Model.Workspace.Experiment
+
+/*
+ * This file describes the definitions of the Gfac Framework level Experiment Data Structures. Each of the
+ * language specific Airavata Client SDK's will translate this neutral data model into an
+ * appropriate form for passing to the Airavata Server Execution API Calls.
+ *
+ * This data-model will not be visible to the outside users but it will be used inside GFAc to recover
+ * the failed jobs or hanged jobs.
+ *
+*/
+
+const string DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS"
+const string DEFAULT_PROJECT_NAME = "DEFAULT"
+const string SINGLE_APP_NODE_NAME = "SINGLE_APP_NODE"
+
+enum GfacExperimentState {
+ INHANDLERSINVOKING,
+ INHANDLERSINVOKED,
+ PROVIDERINVOKING,
+ PROVIDERINVOKED,
+ OUTHANDLERSINVOKING,
+ OUTHANDLERSINVOKED,
+ COMPLETED,
+ FAILED,
+ UNKNOWN
+}
+
+struct GfacExperimentStatus {
+ 1: required GfacExperimentState gfacExperimentState,
+ 2: optional i64 timeOfStateChange
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
index df24a9e..4fce52f 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
@@ -53,6 +53,7 @@ public class OrchestratorServer implements IServer{
throws Exception {
try {
final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.ORCHESTRATOT_SERVER_PORT,"8940"));
+
final String serverHost = ServerSettings.getSetting(Constants.ORCHESTRATOT_SERVER_HOST, null);
TServerTransport serverTransport;
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 2c66293..15e2c63 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -21,11 +21,18 @@
package org.apache.airavata.orchestrator.server;
+import java.io.File;
+import java.io.IOException;
import java.util.Calendar;
import java.util.List;
+import java.util.Random;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.model.error.LaunchValidationException;
import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.orchestrator.util.OrchestratorRecoveryHandler;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.cpi.OrchestratorService;
import org.apache.airavata.orchestrator.cpi.orchestrator_cpi_serviceConstants;
@@ -37,16 +44,23 @@ import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetailConstants;
import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants;
import org.apache.thrift.TException;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class OrchestratorServerHandler implements OrchestratorService.Iface {
+public class OrchestratorServerHandler implements OrchestratorService.Iface, Watcher {
private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class);
private SimpleOrchestratorImpl orchestrator = null;
private Registry registry;
+ private ZooKeeper zk;
+
+ private static Integer mutex = new Integer(-1);
+ ;
+
/**
* Query orchestrator server to fetch the CPI version
*/
@@ -57,18 +71,59 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
public OrchestratorServerHandler() {
+ // registering with zk
+ try {
+ String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+ + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+ String airavataServerHostPort = ServerSettings.getSetting(Constants.ORCHESTRATOR_SERVER_HOST)
+ + ":" + ServerSettings.getSetting(Constants.ORCHESTRATOR_SERVER_PORT);
+ try {
+ zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is required, this will only use to store some data
+ String OrchServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
+ synchronized (mutex) {
+ mutex.wait(); // waiting for the syncConnected event
+ }
+ Stat zkStat = zk.exists(OrchServer, false);
+ if (zkStat == null) {
+ zk.create(OrchServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ String instantNode = OrchServer + File.separator + String.valueOf(new Random().nextInt(Integer.MAX_VALUE));
+ zkStat = zk.exists(instantNode, false);
+ if (zkStat == null) {
+ zk.create(instantNode,
+ airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL); // other component will watch these childeren creation deletion to monitor the status of the node
+ }
+ // creating a watch in orchestrator to monitor the gfac instances
+ zk.getChildren(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"), this);
+ log.info("Finished starting ZK: " + zk);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ }
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ }
+ // orchestrator init
try {
// first constructing the monitorManager and orchestrator, then fill the required properties
orchestrator = new SimpleOrchestratorImpl();
registry = RegistryFactory.getDefaultRegistry();
orchestrator.initialize();
- }catch (OrchestratorException e) {
+ orchestrator.getOrchestratorContext().setZk(this.zk);
+ } catch (OrchestratorException e) {
e.printStackTrace();
} catch (RegistryException e) {
e.printStackTrace();
}
}
+
+
/**
* * After creating the experiment Data user have the
* * experimentID as the handler to the experiment, during the launchExperiment
@@ -82,11 +137,11 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
* @param experimentId
*/
public boolean launchExperiment(String experimentId) throws TException {
- Experiment experiment= null;
+ Experiment experiment = null;
try {
- List<String> ids = registry.getIds(RegistryModelType.WORKFLOW_NODE_DETAIL,WorkflowNodeConstants.EXPERIMENT_ID,experimentId);
+ List<String> ids = registry.getIds(RegistryModelType.WORKFLOW_NODE_DETAIL, WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
for (String workflowNodeId : ids) {
- WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails)registry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeId);
+ WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeId);
List<Object> taskDetailList = registry.get(RegistryModelType.TASK_DETAIL, TaskDetailConstants.NODE_ID, workflowNodeId);
for (Object o : taskDetailList) {
TaskDetails taskID = (TaskDetails) o;
@@ -116,11 +171,11 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
experiment.setExperimentStatus(status);
try {
- registry.update(RegistryModelType.EXPERIMENT, experiment, experimentId);
- } catch (RegistryException e1) {
- throw new TException(e);
- }
-
+ registry.update(RegistryModelType.EXPERIMENT, experiment, experimentId);
+ } catch (RegistryException e1) {
+ throw new TException(e);
+ }
+
throw new TException(e);
}
return true;
@@ -130,11 +185,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
/**
* This method will validate the experiment before launching, if is failed we do not run the launch in airavata
* thrift service (only if validation is enabled
+ *
* @param experimentId
* @return
* @throws TException
*/
- public boolean validateExperiment(String experimentId) throws TException,LaunchValidationException {
+ public boolean validateExperiment(String experimentId) throws TException, LaunchValidationException {
//TODO: Write the Orchestrator implementaion
try {
List<TaskDetails> tasks = orchestrator.createTasks(experimentId);
@@ -167,17 +223,68 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
/**
* This can be used to cancel a running experiment and store the status to terminated in registry
+ *
* @param experimentId
* @return
* @throws TException
*/
public boolean terminateExperiment(String experimentId) throws TException {
- try {
- orchestrator.cancelExperiment(experimentId);
- } catch (OrchestratorException e) {
- log.error("Error canceling experiment "+experimentId,e);
- return false;
- }
+ try {
+ orchestrator.cancelExperiment(experimentId);
+ } catch (OrchestratorException e) {
+ log.error("Error canceling experiment " + experimentId, e);
+ return false;
+ }
return true;
}
+
+ /** This method gracefully handler gfac node failures */
+ synchronized public void process(WatchedEvent watchedEvent) {
+ synchronized (mutex) {
+ try {
+ Event.KeeperState state = watchedEvent.getState();
+ switch (state) {
+ case SyncConnected:
+ mutex.notify();
+ break;
+ }
+ if (watchedEvent.getPath() != null && watchedEvent.getPath().startsWith(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"))) {
+ List<String> children = zk.getChildren(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"), true);
+ for (String gfacNodes : children) {
+ zk.exists(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server") +
+ File.separator + gfacNodes, this);
+ }
+ switch (watchedEvent.getType()) {
+ case NodeCreated:
+ mutex.notify();
+ break;
+ case NodeDeleted:
+ // here we have to handle gfac node shutdown case
+ if (children.size() == 0) {
+ log.error("There are not gfac instances to route failed jobs");
+ return;
+ }
+ // we recover one gfac node at a time
+ final WatchedEvent event = watchedEvent;
+ final OrchestratorServerHandler handler = this;
+ (new Thread() {
+ public void run() {
+ try {
+ (new OrchestratorRecoveryHandler(handler, event.getPath())).recover(); // run this task in a separate thread
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("error recovering the jobs for gfac-node: " + event.getPath());
+ }
+ }
+ }).start();
+ break;
+ }
+ }
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
new file mode 100644
index 0000000..05a87e7
--- /dev/null
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.util;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.server.OrchestratorServerHandler;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+public class OrchestratorRecoveryHandler implements Watcher{
+ private static Logger log = LoggerFactory.getLogger(OrchestratorRecoveryHandler.class);
+
+ private ZooKeeper zk;
+
+ private String gfacId;
+
+ private String zkExperimentPath;
+
+ private static Integer mutex = -1;
+
+ private OrchestratorServerHandler serverHandler;
+
+ public OrchestratorRecoveryHandler(OrchestratorServerHandler handler,String zkExpPath) {
+ this.zk = zk;
+ int index = zkExpPath.split(File.separator).length - 1;
+ this.gfacId = zkExpPath.split(File.separator)[index];
+ this.zkExperimentPath = zkExpPath;
+ this.serverHandler = handler;
+ }
+
+ /**
+ * This method return the list of experimentId
+ * @return
+ * @throws OrchestratorException
+ * @throws ApplicationSettingsException
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void recover() throws OrchestratorException, ApplicationSettingsException, IOException, KeeperException, InterruptedException {
+ String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+ + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+ zk = new ZooKeeper(zkhostPort, 6000, this);
+ synchronized (mutex) {
+ mutex.wait();
+ }
+ List<String> children = zk.getChildren(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE)
+ + File.separator + gfacId, false);
+ for (String expId : children) {
+ log.info("Recovering1 Experiment: " + expId.split("\\+")[0]);
+ try {
+ serverHandler.launchExperiment(expId.split("\\+")[0]);
+ //return of above call gaurantee that experiment is persist some gfac node in the cluster
+ Stat exists = zk.exists(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE)
+ + File.separator + gfacId + File.separator + expId, false);
+ if(exists != null) {
+ zk.delete(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE)
+ + File.separator + gfacId + File.separator + expId, exists.getVersion());
+ }
+ } catch (Exception e) { // we attempt all the experiments
+ e.printStackTrace();
+ }
+ }
+ }
+
+ synchronized public void process(WatchedEvent watchedEvent) {
+ synchronized (mutex) {
+ Event.KeeperState state = watchedEvent.getState();
+ switch (state) {
+ case SyncConnected:
+ mutex.notify();
+ break;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/orchestrator-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml
index 3912dd1..95a45a5 100644
--- a/modules/orchestrator/orchestrator-core/pom.xml
+++ b/modules/orchestrator/orchestrator-core/pom.xml
@@ -126,6 +126,12 @@ the License. -->
<artifactId>airavata-server-configuration</artifactId>
<scope>test</scope>
</dependency>
+ <!-- zookeeper dependencies -->
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.0</version>
+ </dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
<artifactId>airavata-server-configuration</artifactId>
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
index 8de0482..be3acda 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
@@ -27,6 +27,7 @@ import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
import org.apache.airavata.registry.api.AiravataRegistry2;
import org.apache.airavata.registry.cpi.Registry;
+import org.apache.zookeeper.ZooKeeper;
/**
* This is the context object used in orchestrator which
@@ -39,6 +40,8 @@ public class OrchestratorContext {
private AiravataRegistry2 registry;
private Registry newRegistry;
+
+ private ZooKeeper zk;
public OrchestratorContext(List<GFACInstance> gfacInstanceList) {
this.gfacInstanceList = new ArrayList<GFACInstance>();
@@ -78,4 +81,16 @@ public class OrchestratorContext {
public void setNewRegistry(Registry newRegistry) {
this.newRegistry = newRegistry;
}
+
+ public void setGfacInstanceList(List<GFACInstance> gfacInstanceList) {
+ this.gfacInstanceList = gfacInstanceList;
+ }
+
+ public void setZk(ZooKeeper zk) {
+ this.zk = zk;
+ }
+
+ public ZooKeeper getZk() {
+ return zk;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/gfac/GFacClientFactory.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/gfac/GFacClientFactory.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/gfac/GFacClientFactory.java
index 60ecd58..3ec1e8c 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/gfac/GFacClientFactory.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/gfac/GFacClientFactory.java
@@ -28,7 +28,7 @@ import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
public class GFacClientFactory {
- public static GfacService.Client createOrchestratorClient(String serverHost, int serverPort){
+ public static GfacService.Client createGFacClient(String serverHost, int serverPort){
try {
TTransport transport = new TSocket(serverHost, serverPort);
transport.open();
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
index 6167134..23089aa 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
@@ -20,28 +20,39 @@
*/
package org.apache.airavata.orchestrator.core.impl;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.cpi.GfacService;
-import org.apache.airavata.gfac.util.Constants;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
import org.apache.airavata.orchestrator.core.gfac.GFacClientFactory;
import org.apache.airavata.orchestrator.core.job.JobSubmitter;
import org.apache.thrift.TException;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
/*
* this class is responsible for submitting a job to gfac in service mode,
* it will select a gfac instance based on the incoming request and submit to that
* gfac instance.
*/
-public class GFACServiceJobSubmitter implements JobSubmitter {
+public class GFACServiceJobSubmitter implements JobSubmitter,Watcher{
private final static Logger logger = LoggerFactory.getLogger(GFACServiceJobSubmitter.class);
+ public static final String IP = "ip";
private OrchestratorContext orchestratorContext;
+ private static Integer mutex = -1;
+
public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException {
this.orchestratorContext = orchestratorContext;
}
@@ -52,12 +63,65 @@ public class GFACServiceJobSubmitter implements JobSubmitter {
}
public boolean submit(String experimentID, String taskID) throws OrchestratorException {
- GfacService.Client localhost = GFacClientFactory.createOrchestratorClient("localhost",
- Integer.parseInt(ServerSettings.getSetting(Constants.GFAC_SERVER_PORT, "8950")));
+ ZooKeeper zk = orchestratorContext.getZk();
+ int retryCount = 0;
try {
- return localhost.submitJob(experimentID, taskID);
+ if (!zk.getState().isConnected()) {
+ String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+ + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+ zk = new ZooKeeper(zkhostPort, 6000, this);
+ synchronized (mutex){
+ mutex.wait();
+ }
+ }
+ String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
+ String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+ List<String> children = zk.getChildren(gfacServer, this);
+
+ String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size());
+ // here we are not using an index because the getChildren does not return the same order everytime
+
+ String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null));
+ logger.info("GFAC instance node data: " + gfacNodeData);
+ String[] split = gfacNodeData.split(":");
+ GfacService.Client localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
+ if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { // before submitting the job we check again the state of the node
+ String experimentPath = experimentNode + File.separator + pickedChild;
+ String newExpNode = experimentPath + File.separator + experimentID + "+" + taskID;
+ Stat exists1 = zk.exists(newExpNode, this);
+ if (exists1 == null) {
+ zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } else {
+ logger.info("ExperimentID: " + experimentID + " taskID: " + taskID + " is re-running due to gfac failure");
+ }
+ return localhost.submitJob(experimentID, taskID);
+ }
} catch (TException e) {
throw new OrchestratorException(e);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+ synchronized public void process(WatchedEvent event) {
+ synchronized (mutex) {
+ switch (event.getState()){
+ case SyncConnected:
+ mutex.notify();
+ }
+ switch (event.getType()) {
+ case NodeCreated:
+ mutex.notify();
+ break;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
index 5bbe4e6..7c5cb85 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
@@ -59,7 +59,9 @@ public abstract class AbstractOrchestrator implements Orchestrator {
protected OrchestratorConfiguration orchestratorConfiguration;
private String registryURL;
+
private String gatewayName;
+
private String airavataUserName;
public String getRegistryURL() {
@@ -162,4 +164,16 @@ public abstract class AbstractOrchestrator implements Orchestrator {
}
return airavataAPI;
}
+
+ public AiravataRegistry2 getAiravataRegistry() {
+ return airavataRegistry;
+ }
+
+ public OrchestratorContext getOrchestratorContext() {
+ return orchestratorContext;
+ }
+
+ public OrchestratorConfiguration getOrchestratorConfiguration() {
+ return orchestratorConfiguration;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 5f19c2f..fce6dde 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -191,4 +191,5 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
public void initialize() throws OrchestratorException {
}
+
}
[3/5] git commit: more zk related changes
Posted by la...@apache.org.
more zk related changes
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e86504aa
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e86504aa
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e86504aa
Branch: refs/heads/master
Commit: e86504aae30401d109690d8814cbf4bcc6f77ae9
Parents: 362da4e
Author: lahiru <la...@apache.org>
Authored: Tue Jun 24 13:46:33 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Jun 24 13:46:33 2014 -0400
----------------------------------------------------------------------
.../airavata/client/tools/DocumentCreator.java | 2 +-
modules/commons/utils/pom.xml | 12 +-
.../airavata/common/utils/AiravataZKUtils.java | 59 +++
.../airavata/gfac/server/GfacServerHandler.java | 13 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 399 +++++++++-----
.../apache/airavata/gfac/core/cpi/GFacImpl.java | 3 +-
.../core/monitor/GfacInternalStatusUpdator.java | 24 +-
.../state/GfacExperimentStateChangeRequest.java | 2 +-
.../airavata/gfac/core/utils/GFacUtils.java | 21 +
.../gfac/core/utils/GfacExperimentState.java | 81 +++
.../experiment/GfacExperimentState.java | 82 ---
.../experiment/GfacExperimentStatus.java | 516 -------------------
.../experiment/gfacDataModelConstants.java | 59 ---
.../server/OrchestratorServerHandler.java | 5 +-
.../core/impl/GFACServiceJobSubmitter.java | 6 +
15 files changed, 483 insertions(+), 801 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
index ffcff17..d573da9 100644
--- a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
+++ b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
@@ -671,7 +671,7 @@ public class DocumentCreator {
ApplicationDescription applicationDeploymentDescription = new ApplicationDescription();
ApplicationDeploymentDescriptionType applicationDeploymentDescriptionType = applicationDeploymentDescription.getType();
applicationDeploymentDescriptionType.addNewApplicationName().setStringValue(serviceName);
- applicationDeploymentDescriptionType.setExecutableLocation("/tmp/echo.sh");
+ applicationDeploymentDescriptionType.setExecutableLocation("/bin/echo");
applicationDeploymentDescriptionType.setScratchWorkingDirectory("/tmp");
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/commons/utils/pom.xml
----------------------------------------------------------------------
diff --git a/modules/commons/utils/pom.xml b/modules/commons/utils/pom.xml
index 151a141..aca8fea 100644
--- a/modules/commons/utils/pom.xml
+++ b/modules/commons/utils/pom.xml
@@ -8,7 +8,8 @@
ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.apache.airavata</groupId>
@@ -96,7 +97,7 @@
<artifactId>tomcat-embed-core</artifactId>
<version>7.0.22</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
@@ -107,7 +108,7 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.airavata</groupId>
<artifactId>airavata-server-configuration</artifactId>
<scope>test</scope>
@@ -122,6 +123,11 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.0</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
new file mode 100644
index 0000000..7349b7e
--- /dev/null
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.common.utils;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.File;
+
+public class AiravataZKUtils {
+ public static final String ZK_EXPERIMENT_STATE_NODE = "state";
+
+ public static String getExpZnodePath(String experimentId, String taskId) throws ApplicationSettingsException {
+ return ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE) +
+ File.separator +
+ ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME) + File.separator
+ + experimentId + "+" + taskId;
+ }
+
+ public static String getZKhostPort() throws ApplicationSettingsException {
+ return ServerSettings.getSetting(Constants.ZOOKEEPER_SERVER_HOST)
+ + ":" + ServerSettings.getSetting(Constants.ZOOKEEPER_SERVER_PORT);
+ }
+
+ public static String getExpStatePath(String experimentId, String taskId) throws ApplicationSettingsException {
+ return AiravataZKUtils.getExpZnodePath(experimentId, taskId) +
+ File.separator +
+ "state";
+ }
+
+ public static String getExpState(ZooKeeper zk,String expId,String tId) throws ApplicationSettingsException,
+ KeeperException, InterruptedException {
+ Stat exists = zk.exists(getExpStatePath(expId, tId), false);
+ if(exists != null) {
+ return new String(zk.getData(getExpStatePath(expId, tId),false, exists));
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 27733f9..60a0f21 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -23,6 +23,7 @@ package org.apache.airavata.gfac.server;
import com.google.common.eventbus.EventBus;
import org.apache.airavata.common.exception.AiravataConfigurationException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.GFacException;
@@ -74,8 +75,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
public GfacServerHandler() {
// registering with zk
try {
- String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
- + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+ String zkhostPort = AiravataZKUtils.getZKhostPort();
String airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
+ ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
try {
@@ -140,6 +140,15 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
if (state == Event.KeeperState.SyncConnected) {
mutex.notify();
connected = true;
+ } else if(state == Event.KeeperState.Expired ||
+ state == Event.KeeperState.Disconnected){
+ try {
+ zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(),6000,this);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 195bfc1..6fb3e24 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -28,8 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import com.google.common.eventbus.EventBus;
-
import org.apache.airavata.client.api.AiravataAPI;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
@@ -59,12 +57,12 @@ import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.airavata.gfac.core.utils.GfacExperimentState;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.registry.api.AiravataRegistry2;
import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.airavata.registry.cpi.Registry;
-import org.apache.tools.ant.taskdefs.optional.j2ee.HotDeploymentTool;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,12 +86,12 @@ public class BetterGfacImpl implements GFac {
private AiravataRegistry2 airavataRegistry2;
private ZooKeeper zk; // we are not storing zk instance in to jobExecution context
-
+
private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
private static File gfacConfigFile;
- private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
+ private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
private static MonitorPublisher monitorPublisher;
@@ -114,35 +112,36 @@ public class BetterGfacImpl implements GFac {
this.zk = zooKeeper;
}
- public static void startStatusUpdators(Registry registry,ZooKeeper zk,MonitorPublisher publisher) {
+ public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) {
try {
String[] listenerClassList = ServerSettings.getActivityListeners();
for (String listenerClass : listenerClassList) {
Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
AbstractActivityListener abstractActivityListener = aClass.newInstance();
activityListeners.add(abstractActivityListener);
- abstractActivityListener.setup(publisher, registry,zk);
+ abstractActivityListener.setup(publisher, registry, zk);
log.info("Registering listener: " + listenerClass);
publisher.registerListener(abstractActivityListener);
}
- }catch (ClassNotFoundException e) {
- log.error("Error loading the listener classes configured in airavata-server.properties",e);
+ } catch (ClassNotFoundException e) {
+ log.error("Error loading the listener classes configured in airavata-server.properties", e);
} catch (InstantiationException e) {
- log.error("Error loading the listener classes configured in airavata-server.properties",e);
+ log.error("Error loading the listener classes configured in airavata-server.properties", e);
} catch (IllegalAccessException e) {
- log.error("Error loading the listener classes configured in airavata-server.properties",e);
- } catch (ApplicationSettingsException e){
- log.error("Error loading the listener classes configured in airavata-server.properties",e);
+ log.error("Error loading the listener classes configured in airavata-server.properties", e);
+ } catch (ApplicationSettingsException e) {
+ log.error("Error loading the listener classes configured in airavata-server.properties", e);
}
}
- public static void startDaemonHandlers() {
+
+ public static void startDaemonHandlers() {
List<GFacHandlerConfig> daemonHandlerConfig = null;
URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
gfacConfigFile = new File(resource.getPath());
try {
daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile);
} catch (ParserConfigurationException e) {
- log.error("Error parsing gfac-config.xml, double check the xml configuration",e);
+ log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
} catch (IOException e) {
log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
} catch (SAXException e) {
@@ -151,14 +150,14 @@ public class BetterGfacImpl implements GFac {
log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
}
- for(GFacHandlerConfig handlerConfig:daemonHandlerConfig){
+ for (GFacHandlerConfig handlerConfig : daemonHandlerConfig) {
String className = handlerConfig.getClassName();
try {
Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class);
ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance();
threadedHandler.initProperties(handlerConfig.getProperties());
daemonHandlers.add(threadedHandler);
- }catch (ClassNotFoundException e){
+ } catch (ClassNotFoundException e) {
log.error("Error initializing the handler: " + className);
log.error(className + " class has to implement " + ThreadedHandler.class);
} catch (InstantiationException e) {
@@ -173,7 +172,7 @@ public class BetterGfacImpl implements GFac {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
- for(ThreadedHandler tHandler:daemonHandlers){
+ for (ThreadedHandler tHandler : daemonHandlers) {
(new Thread(tHandler)).start();
}
}
@@ -194,7 +193,7 @@ public class BetterGfacImpl implements GFac {
* @return
* @throws GFacException
*/
- public boolean submitJob(String experimentID,String taskID) throws GFacException {
+ public boolean submitJob(String experimentID, String taskID) throws GFacException {
JobExecutionContext jobExecutionContext = null;
try {
jobExecutionContext = createJEC(experimentID, taskID);
@@ -216,31 +215,31 @@ public class BetterGfacImpl implements GFac {
// 2. Add another property to jobExecutionContext and read them inside the provider and use it.
String serviceName = taskData.getApplicationId();
if (serviceName == null) {
- throw new GFacException("Error executing the job because there is not Application Name in this Experiment: " + serviceName );
+ throw new GFacException("Error executing the job because there is not Application Name in this Experiment: " + serviceName);
}
-
+
ServiceDescription serviceDescription = airavataRegistry2.getServiceDescriptor(serviceName);
- if (serviceDescription == null ) {
- throw new GFacException("Error executing the job because there is not Application Name in this Experiment: " + serviceName );
+ if (serviceDescription == null) {
+ throw new GFacException("Error executing the job because there is not Application Name in this Experiment: " + serviceName);
}
String hostName;
HostDescription hostDescription = null;
- if(taskData.getTaskScheduling().getResourceHostId() != null){
+ if (taskData.getTaskScheduling().getResourceHostId() != null) {
hostName = taskData.getTaskScheduling().getResourceHostId();
hostDescription = airavataRegistry2.getHostDescriptor(hostName);
- }else{
- List<HostDescription> registeredHosts = new ArrayList<HostDescription>();
- Map<String, ApplicationDescription> applicationDescriptors = airavataRegistry2.getApplicationDescriptors(serviceName);
- for (String hostDescName : applicationDescriptors.keySet()) {
- registeredHosts.add(airavataRegistry2.getHostDescriptor(hostDescName));
- }
- Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
- HostScheduler hostScheduler = aClass.newInstance();
+ } else {
+ List<HostDescription> registeredHosts = new ArrayList<HostDescription>();
+ Map<String, ApplicationDescription> applicationDescriptors = airavataRegistry2.getApplicationDescriptors(serviceName);
+ for (String hostDescName : applicationDescriptors.keySet()) {
+ registeredHosts.add(airavataRegistry2.getHostDescriptor(hostDescName));
+ }
+ Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
+ HostScheduler hostScheduler = aClass.newInstance();
hostDescription = hostScheduler.schedule(registeredHosts);
- hostName = hostDescription.getType().getHostName();
+ hostName = hostDescription.getType().getHostName();
}
- if(hostDescription == null){
- throw new GFacException("Error executing the job as the host is not registered " + hostName);
+ if (hostDescription == null) {
+ throw new GFacException("Error executing the job as the host is not registered " + hostName);
}
ApplicationDescription applicationDescription = airavataRegistry2.getApplicationDescriptors(serviceName, hostName);
URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
@@ -283,6 +282,8 @@ public class BetterGfacImpl implements GFac {
public boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException {
// We need to check whether this job is submitted as a part of a large workflow. If yes,
// we need to setup workflow tracking listerner.
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status
String workflowInstanceID = null;
if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
// This mean we need to register workflow tracking listener.
@@ -291,11 +292,11 @@ public class BetterGfacImpl implements GFac {
}
// Register log event listener. This is required in all scenarios.
jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
- schedule(jobExecutionContext);
+ launch(jobExecutionContext);
return true;
}
- private void schedule(JobExecutionContext jobExecutionContext) throws GFacException {
+ private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
// Scheduler will decide the execution flow of handlers and provider which handles
// the job.
String experimentID = jobExecutionContext.getExperimentID();
@@ -303,27 +304,33 @@ public class BetterGfacImpl implements GFac {
Scheduler.schedule(jobExecutionContext);
// Executing in handlers in the order as they have configured in GFac configuration
- invokeInFlowHandlers(jobExecutionContext);
+ // here we do not skip handler if some handler does not have to be run again during re-run it can implement
+ // that logic in to the handler
+ int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+ if(stateVal >=2){
+ reInvokeInFlowHandlers(jobExecutionContext);
+ }else {
+ invokeInFlowHandlers(jobExecutionContext); // to keep the consistency we always try to re-run to avoid complexity
+ }
// if (experimentID != null){
// registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
// }
// After executing the in handlers provider instance should be set to job execution context.
// We get the provider instance and execute it.
- GFacProvider provider = jobExecutionContext.getProvider();
- if (provider != null) {
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.PROVIDERINVOKING));
- initProvider(provider, jobExecutionContext);
- executeProvider(provider, jobExecutionContext);
- disposeProvider(provider, jobExecutionContext);
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.PROVIDERINVOKED));
- }
- if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
- invokeOutFlowHandlers(jobExecutionContext);
+ stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+ if (stateVal == 4) { // if the job is completed during resubmission we handle it here
+ reInvokeProvider(jobExecutionContext);
+ }else if(stateVal == 3){
+ invokeProvider(jobExecutionContext);
+ }else{
+ log.info("We skip invoking Handler, because the experiment state is beyond the Provider Invocation !!!");
+ log.info("ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID());
}
} catch (Exception e) {
try {
// we make the experiment as failed due to exception scenario
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
monitorPublisher.publish(new
ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
ExperimentState.FAILED));
@@ -335,9 +342,9 @@ public class BetterGfacImpl implements GFac {
));
monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext),
new JobIdentity(jobExecutionContext.getExperimentID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getJobDetails().getJobID()), JobState.FAILED));
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.FAILED));
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getJobDetails().getJobID()), JobState.FAILED
+ ));
} catch (NullPointerException e1) {
log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " +
"NullPointerException occurred because at this point there might not have Job Created", e1, e);
@@ -348,6 +355,35 @@ public class BetterGfacImpl implements GFac {
}
}
+ private void invokeProvider(JobExecutionContext jobExecutionContext) throws GFacException {
+ GFacProvider provider = jobExecutionContext.getProvider();
+ if (provider != null) {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+ initProvider(provider, jobExecutionContext);
+ executeProvider(provider, jobExecutionContext);
+ disposeProvider(provider, jobExecutionContext);
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+ }
+ if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+ invokeOutFlowHandlers(jobExecutionContext);
+ }
+ }
+
+ private void reInvokeProvider(JobExecutionContext jobExecutionContext) throws GFacException {
+ GFacProvider provider = jobExecutionContext.getProvider();
+ if (provider != null) {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+ initProvider(provider, jobExecutionContext);
+ executeProvider(provider, jobExecutionContext);
+ disposeProvider(provider, jobExecutionContext);
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+ }
+ if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+ invokeOutFlowHandlers(jobExecutionContext);
+ }
+ }
+
+
private void initProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
try {
provider.initialize(jobExecutionContext);
@@ -358,7 +394,7 @@ public class BetterGfacImpl implements GFac {
private void executeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
try {
- provider.execute(jobExecutionContext);
+ provider.execute(jobExecutionContext);
} catch (Exception e) {
throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e);
}
@@ -383,86 +419,203 @@ public class BetterGfacImpl implements GFac {
private void invokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
- ,GfacExperimentState.INHANDLERSINVOKING));
- for (GFacHandlerConfig handlerClassName : handlers) {
- Class<? extends GFacHandler> handlerClass;
- GFacHandler handler;
- try {
- handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
- handler = handlerClass.newInstance();
- handler.initProperties(handlerClassName.getProperties());
- } catch (ClassNotFoundException e) {
- throw new GFacException("Cannot load handler class " + handlerClassName, e);
- } catch (InstantiationException e) {
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- } catch (IllegalAccessException e) {
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- }
- try {
- handler.invoke(jobExecutionContext);
- } catch (GFacHandlerException e) {
- throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+ try {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.INHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ try {
+ handler.invoke(jobExecutionContext);
+ } catch (GFacHandlerException e) {
+ throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+ }
}
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.INHANDLERSINVOKED));
+ } catch (Exception e) {
+ throw new GFacException("Error invoking ZK", e);
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
- ,GfacExperimentState.INHANDLERSINVOKED));
}
- public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
- GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
- List<GFacHandlerConfig> handlers = null;
- if(gFacConfiguration != null){
- handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
- }else {
- try {
- jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e) {
- log.error("Error constructing job execution context during outhandler invocation");
- throw new GFacException(e);
+ private void reInvokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
+ try {
+ int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+ if (stateVal == 8 || stateVal == -1) {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.INHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ try {
+ handler.invoke(jobExecutionContext);
+ } catch (GFacHandlerException e) {
+ throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+ }
+ }
}
- schedule(jobExecutionContext);
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.INHANDLERSINVOKED));
+ } catch (Exception e) {
+ throw new GFacException("Error invoking ZK", e);
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.OUTHANDLERSINVOKING));
- for (GFacHandlerConfig handlerClassName : handlers) {
- Class<? extends GFacHandler> handlerClass;
- GFacHandler handler;
- try {
- handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
- handler = handlerClass.newInstance();
- handler.initProperties(handlerClassName.getProperties());
- } catch (ClassNotFoundException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot load handler class " + handlerClassName, e);
- } catch (InstantiationException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- } catch (IllegalAccessException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+
+ public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ int stateVal = -1;
+ try {
+ stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ if (stateVal >= 0 && stateVal < 6) {
+ GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+ List<GFacHandlerConfig> handlers = null;
+ if (gFacConfiguration != null) {
+ handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+ } else {
+ try {
+ jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e) {
+ log.error("Error constructing job execution context during outhandler invocation");
+ throw new GFacException(e);
+ }
+ launch(jobExecutionContext);
}
- try {
- handler.invoke(jobExecutionContext);
- } catch (Exception e) {
- // TODO: Better error reporting.
- throw new GFacException("Error Executing a OutFlow Handler", e);
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ try {
+ handler.invoke(jobExecutionContext);
+ } catch (Exception e) {
+ // TODO: Better error reporting.
+ throw new GFacException("Error Executing a OutFlow Handler", e);
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.OUTHANDLERSINVOKED));
+
+ // At this point all the execution is finished so we update the task and experiment statuses.
+ // Handler authors does not have to worry about updating experiment or task statuses.
+ monitorPublisher.publish(new
+ ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+ ExperimentState.COMPLETED));
+ // Updating the task status if there's any task associated
+ monitorPublisher.publish(new TaskStatusChangeRequest(
+ new TaskIdentity(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
+ ));
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
}
+ }
- // At this point all the execution is finished so we update the task and experiment statuses.
- // Handler authors does not have to worry about updating experiment or task statuses.
- monitorPublisher.publish(new
- ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
- ExperimentState.COMPLETED));
- // Updating the task status if there's any task associated
- monitorPublisher.publish(new TaskStatusChangeRequest(
- new TaskIdentity(jobExecutionContext.getExperimentID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
- ));
-
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.COMPLETED));
+ public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ int stateVal = -1;
+ try {
+ stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ if (stateVal >= 0 && stateVal < 6) {
+ GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+ List<GFacHandlerConfig> handlers = null;
+ if (gFacConfiguration != null) {
+ handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+ } else {
+ try {
+ jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e) {
+ log.error("Error constructing job execution context during outhandler invocation");
+ throw new GFacException(e);
+ }
+ launch(jobExecutionContext);
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ try {
+ handler.invoke(jobExecutionContext);
+ } catch (Exception e) {
+ // TODO: Better error reporting.
+ throw new GFacException("Error Executing a OutFlow Handler", e);
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+ }
+
+ // At this point all the execution is finished so we update the task and experiment statuses.
+ // Handler authors does not have to worry about updating experiment or task statuses.
+ monitorPublisher.publish(new
+ ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+ ExperimentState.COMPLETED));
+ // Updating the task status if there's any task associated
+ monitorPublisher.publish(new TaskStatusChangeRequest(
+ new TaskIdentity(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
+ ));
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index a6908ba..5bc789c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -58,12 +58,11 @@ import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.airavata.gfac.core.utils.GfacExperimentState;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.registry.api.AiravataRegistry2;
import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.airavata.registry.cpi.Registry;
-import org.apache.tools.ant.taskdefs.optional.j2ee.HotDeploymentTool;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 24b3989..3933976 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -22,14 +22,11 @@ package org.apache.airavata.gfac.core.monitor;
import com.google.common.eventbus.Subscribe;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
-import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +39,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
private ZooKeeper zk;
- private Integer mutex = -1;
+ private static Integer mutex = -1;
@Subscribe
public void updateZK(GfacExperimentStateChangeRequest statusChangeRequest) throws KeeperException, InterruptedException, ApplicationSettingsException {
@@ -73,16 +70,23 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
} catch (IOException e) {
e.printStackTrace();
}
+ Stat state = zk.exists(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ if(state == null) {
+ // state znode has to be created
+ zk.create(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }else {
+ zk.setData(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), state.getVersion());
+ }
switch (statusChangeRequest.getState()) {
case COMPLETED:
- zk.delete(experimentPath, exists.getVersion());
+// ZKUtil.deleteRecursive(zk,experimentPath);
break;
case FAILED:
- zk.delete(experimentPath, exists.getVersion());
+ ZKUtil.deleteRecursive(zk,experimentPath);
break;
default:
- zk.setData(experimentPath, (statusChangeRequest.getMonitorID().getJobID() +
- "," + statusChangeRequest.getMonitorID().getWorkflowNodeID()).getBytes(), exists.getVersion());
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
index 50a60de..5f7f2c2 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
@@ -22,7 +22,7 @@ package org.apache.airavata.gfac.core.monitor.state;
import org.apache.airavata.gfac.core.monitor.JobIdentity;
import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.airavata.gfac.core.utils.GfacExperimentState;
public class GfacExperimentStateChangeRequest {
private GfacExperimentState state;
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 7013f3e..f67b592 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -32,6 +32,8 @@ import java.util.*;
import org.apache.airavata.client.api.AiravataAPI;
import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.StringUtil;
import org.apache.airavata.commons.gfac.type.ActualParameter;
import org.apache.airavata.gfac.Constants;
@@ -48,6 +50,8 @@ import org.apache.airavata.registry.cpi.CompositeIdentifier;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.schemas.gfac.*;
import org.apache.axiom.om.OMElement;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -658,5 +662,22 @@ public class GFacUtils {
return stringObjectHashMap;
}
+ public static GfacExperimentState getZKExperimentState(ZooKeeper zk,JobExecutionContext jobExecutionContext)
+ throws ApplicationSettingsException, KeeperException, InterruptedException {
+ String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID());
+ return GfacExperimentState.findByValue(Integer.parseInt(expState));
+ }
+
+ public static int getZKExperimentStateValue(ZooKeeper zk, JobExecutionContext jobExecutionContext)
+ throws ApplicationSettingsException, KeeperException, InterruptedException {
+ String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID());
+ if(expState == null){
+ return -1;
+ }
+ return Integer.parseInt(expState);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java
new file mode 100644
index 0000000..db2cab0
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.airavata.gfac.core.utils;
+
+
+public enum GfacExperimentState {
+ LAUNCHED(0),
+ ACCEPTED(1),
+ INHANDLERSINVOKING(2),
+ INHANDLERSINVOKED(3),
+ PROVIDERINVOKING(4),
+ PROVIDERINVOKED(5),
+ OUTHANDLERSINVOKING(6),
+ OUTHANDLERSINVOKED(7),
+ COMPLETED(8),
+ FAILED(9),
+ UNKNOWN(10);
+
+ private final int value;
+
+ private GfacExperimentState(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ *
+ * @return null if the value is not found.
+ */
+ public static GfacExperimentState findByValue(int value) {
+ switch (value) {
+ case 0:
+ return INHANDLERSINVOKING;
+ case 1:
+ return INHANDLERSINVOKED;
+ case 2:
+ return PROVIDERINVOKING;
+ case 3:
+ return PROVIDERINVOKED;
+ case 4:
+ return OUTHANDLERSINVOKING;
+ case 5:
+ return OUTHANDLERSINVOKED;
+ case 6:
+ return COMPLETED;
+ case 7:
+ return FAILED;
+ case 8:
+ return UNKNOWN;
+ default:
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
deleted file mode 100644
index 8d06aad..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
+++ /dev/null
@@ -1,82 +0,0 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Autogenerated by Thrift Compiler (0.9.1)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
- */
-package org.apache.airavata.gfac.workspace.experiment;
-
-
-import java.util.Map;
-import java.util.HashMap;
-import org.apache.thrift.TEnum;
-
-@SuppressWarnings("all") public enum GfacExperimentState implements org.apache.thrift.TEnum {
- INHANDLERSINVOKING(0),
- INHANDLERSINVOKED(1),
- PROVIDERINVOKING(2),
- PROVIDERINVOKED(3),
- OUTHANDLERSINVOKING(4),
- OUTHANDLERSINVOKED(5),
- COMPLETED(6),
- FAILED(7),
- UNKNOWN(8);
-
- private final int value;
-
- private GfacExperimentState(int value) {
- this.value = value;
- }
-
- /**
- * Get the integer value of this enum value, as defined in the Thrift IDL.
- */
- public int getValue() {
- return value;
- }
-
- /**
- * Find a the enum type by its integer value, as defined in the Thrift IDL.
- * @return null if the value is not found.
- */
- public static GfacExperimentState findByValue(int value) {
- switch (value) {
- case 0:
- return INHANDLERSINVOKING;
- case 1:
- return INHANDLERSINVOKED;
- case 2:
- return PROVIDERINVOKING;
- case 3:
- return PROVIDERINVOKED;
- case 4:
- return OUTHANDLERSINVOKING;
- case 5:
- return OUTHANDLERSINVOKED;
- case 6:
- return COMPLETED;
- case 7:
- return FAILED;
- case 8:
- return UNKNOWN;
- default:
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
deleted file mode 100644
index 3b9273d..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
+++ /dev/null
@@ -1,516 +0,0 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Autogenerated by Thrift Compiler (0.9.1)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
- */
-package org.apache.airavata.gfac.workspace.experiment;
-
-import org.apache.thrift.scheme.IScheme;
-import org.apache.thrift.scheme.SchemeFactory;
-import org.apache.thrift.scheme.StandardScheme;
-
-import org.apache.thrift.scheme.TupleScheme;
-import org.apache.thrift.protocol.TTupleProtocol;
-import org.apache.thrift.protocol.TProtocolException;
-import org.apache.thrift.EncodingUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.server.AbstractNonblockingServer.*;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.EnumMap;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.EnumSet;
-import java.util.Collections;
-import java.util.BitSet;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("all") public class GfacExperimentStatus implements org.apache.thrift.TBase<GfacExperimentStatus, GfacExperimentStatus._Fields>, java.io.Serializable, Cloneable, Comparable<GfacExperimentStatus> {
- private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("GfacExperimentStatus");
-
- private static final org.apache.thrift.protocol.TField GFAC_EXPERIMENT_STATE_FIELD_DESC = new org.apache.thrift.protocol.TField("gfacExperimentState", org.apache.thrift.protocol.TType.I32, (short)1);
- private static final org.apache.thrift.protocol.TField TIME_OF_STATE_CHANGE_FIELD_DESC = new org.apache.thrift.protocol.TField("timeOfStateChange", org.apache.thrift.protocol.TType.I64, (short)2);
-
- private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
- static {
- schemes.put(StandardScheme.class, new GfacExperimentStatusStandardSchemeFactory());
- schemes.put(TupleScheme.class, new GfacExperimentStatusTupleSchemeFactory());
- }
-
- /**
- *
- * @see GfacExperimentState
- */
- public GfacExperimentState gfacExperimentState; // required
- public long timeOfStateChange; // optional
-
- /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
- @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
- /**
- *
- * @see GfacExperimentState
- */
- GFAC_EXPERIMENT_STATE((short)1, "gfacExperimentState"),
- TIME_OF_STATE_CHANGE((short)2, "timeOfStateChange");
-
- private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
-
- static {
- for (_Fields field : EnumSet.allOf(_Fields.class)) {
- byName.put(field.getFieldName(), field);
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, or null if its not found.
- */
- public static _Fields findByThriftId(int fieldId) {
- switch(fieldId) {
- case 1: // GFAC_EXPERIMENT_STATE
- return GFAC_EXPERIMENT_STATE;
- case 2: // TIME_OF_STATE_CHANGE
- return TIME_OF_STATE_CHANGE;
- default:
- return null;
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, throwing an exception
- * if it is not found.
- */
- public static _Fields findByThriftIdOrThrow(int fieldId) {
- _Fields fields = findByThriftId(fieldId);
- if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
- return fields;
- }
-
- /**
- * Find the _Fields constant that matches name, or null if its not found.
- */
- public static _Fields findByName(String name) {
- return byName.get(name);
- }
-
- private final short _thriftId;
- private final String _fieldName;
-
- _Fields(short thriftId, String fieldName) {
- _thriftId = thriftId;
- _fieldName = fieldName;
- }
-
- public short getThriftFieldId() {
- return _thriftId;
- }
-
- public String getFieldName() {
- return _fieldName;
- }
- }
-
- // isset id assignments
- private static final int __TIMEOFSTATECHANGE_ISSET_ID = 0;
- private byte __isset_bitfield = 0;
- private _Fields optionals[] = {_Fields.TIME_OF_STATE_CHANGE};
- public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
- static {
- Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
- tmpMap.put(_Fields.GFAC_EXPERIMENT_STATE, new org.apache.thrift.meta_data.FieldMetaData("gfacExperimentState", org.apache.thrift.TFieldRequirementType.REQUIRED,
- new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, GfacExperimentState.class)));
- tmpMap.put(_Fields.TIME_OF_STATE_CHANGE, new org.apache.thrift.meta_data.FieldMetaData("timeOfStateChange", org.apache.thrift.TFieldRequirementType.OPTIONAL,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
- metaDataMap = Collections.unmodifiableMap(tmpMap);
- org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(GfacExperimentStatus.class, metaDataMap);
- }
-
- public GfacExperimentStatus() {
- }
-
- public GfacExperimentStatus(
- GfacExperimentState gfacExperimentState)
- {
- this();
- this.gfacExperimentState = gfacExperimentState;
- }
-
- /**
- * Performs a deep copy on <i>other</i>.
- */
- public GfacExperimentStatus(GfacExperimentStatus other) {
- __isset_bitfield = other.__isset_bitfield;
- if (other.isSetGfacExperimentState()) {
- this.gfacExperimentState = other.gfacExperimentState;
- }
- this.timeOfStateChange = other.timeOfStateChange;
- }
-
- public GfacExperimentStatus deepCopy() {
- return new GfacExperimentStatus(this);
- }
-
- @Override
- public void clear() {
- this.gfacExperimentState = null;
- setTimeOfStateChangeIsSet(false);
- this.timeOfStateChange = 0;
- }
-
- /**
- *
- * @see GfacExperimentState
- */
- public GfacExperimentState getGfacExperimentState() {
- return this.gfacExperimentState;
- }
-
- /**
- *
- * @see GfacExperimentState
- */
- public GfacExperimentStatus setGfacExperimentState(GfacExperimentState gfacExperimentState) {
- this.gfacExperimentState = gfacExperimentState;
- return this;
- }
-
- public void unsetGfacExperimentState() {
- this.gfacExperimentState = null;
- }
-
- /** Returns true if field gfacExperimentState is set (has been assigned a value) and false otherwise */
- public boolean isSetGfacExperimentState() {
- return this.gfacExperimentState != null;
- }
-
- public void setGfacExperimentStateIsSet(boolean value) {
- if (!value) {
- this.gfacExperimentState = null;
- }
- }
-
- public long getTimeOfStateChange() {
- return this.timeOfStateChange;
- }
-
- public GfacExperimentStatus setTimeOfStateChange(long timeOfStateChange) {
- this.timeOfStateChange = timeOfStateChange;
- setTimeOfStateChangeIsSet(true);
- return this;
- }
-
- public void unsetTimeOfStateChange() {
- __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID);
- }
-
- /** Returns true if field timeOfStateChange is set (has been assigned a value) and false otherwise */
- public boolean isSetTimeOfStateChange() {
- return EncodingUtils.testBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID);
- }
-
- public void setTimeOfStateChangeIsSet(boolean value) {
- __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID, value);
- }
-
- public void setFieldValue(_Fields field, Object value) {
- switch (field) {
- case GFAC_EXPERIMENT_STATE:
- if (value == null) {
- unsetGfacExperimentState();
- } else {
- setGfacExperimentState((GfacExperimentState)value);
- }
- break;
-
- case TIME_OF_STATE_CHANGE:
- if (value == null) {
- unsetTimeOfStateChange();
- } else {
- setTimeOfStateChange((Long)value);
- }
- break;
-
- }
- }
-
- public Object getFieldValue(_Fields field) {
- switch (field) {
- case GFAC_EXPERIMENT_STATE:
- return getGfacExperimentState();
-
- case TIME_OF_STATE_CHANGE:
- return Long.valueOf(getTimeOfStateChange());
-
- }
- throw new IllegalStateException();
- }
-
- /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
- public boolean isSet(_Fields field) {
- if (field == null) {
- throw new IllegalArgumentException();
- }
-
- switch (field) {
- case GFAC_EXPERIMENT_STATE:
- return isSetGfacExperimentState();
- case TIME_OF_STATE_CHANGE:
- return isSetTimeOfStateChange();
- }
- throw new IllegalStateException();
- }
-
- @Override
- public boolean equals(Object that) {
- if (that == null)
- return false;
- if (that instanceof GfacExperimentStatus)
- return this.equals((GfacExperimentStatus)that);
- return false;
- }
-
- public boolean equals(GfacExperimentStatus that) {
- if (that == null)
- return false;
-
- boolean this_present_gfacExperimentState = true && this.isSetGfacExperimentState();
- boolean that_present_gfacExperimentState = true && that.isSetGfacExperimentState();
- if (this_present_gfacExperimentState || that_present_gfacExperimentState) {
- if (!(this_present_gfacExperimentState && that_present_gfacExperimentState))
- return false;
- if (!this.gfacExperimentState.equals(that.gfacExperimentState))
- return false;
- }
-
- boolean this_present_timeOfStateChange = true && this.isSetTimeOfStateChange();
- boolean that_present_timeOfStateChange = true && that.isSetTimeOfStateChange();
- if (this_present_timeOfStateChange || that_present_timeOfStateChange) {
- if (!(this_present_timeOfStateChange && that_present_timeOfStateChange))
- return false;
- if (this.timeOfStateChange != that.timeOfStateChange)
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return 0;
- }
-
- @Override
- public int compareTo(GfacExperimentStatus other) {
- if (!getClass().equals(other.getClass())) {
- return getClass().getName().compareTo(other.getClass().getName());
- }
-
- int lastComparison = 0;
-
- lastComparison = Boolean.valueOf(isSetGfacExperimentState()).compareTo(other.isSetGfacExperimentState());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetGfacExperimentState()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.gfacExperimentState, other.gfacExperimentState);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- lastComparison = Boolean.valueOf(isSetTimeOfStateChange()).compareTo(other.isSetTimeOfStateChange());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetTimeOfStateChange()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timeOfStateChange, other.timeOfStateChange);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- return 0;
- }
-
- public _Fields fieldForId(int fieldId) {
- return _Fields.findByThriftId(fieldId);
- }
-
- public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
- schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
- }
-
- public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
- schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder("GfacExperimentStatus(");
- boolean first = true;
-
- sb.append("gfacExperimentState:");
- if (this.gfacExperimentState == null) {
- sb.append("null");
- } else {
- sb.append(this.gfacExperimentState);
- }
- first = false;
- if (isSetTimeOfStateChange()) {
- if (!first) sb.append(", ");
- sb.append("timeOfStateChange:");
- sb.append(this.timeOfStateChange);
- first = false;
- }
- sb.append(")");
- return sb.toString();
- }
-
- public void validate() throws org.apache.thrift.TException {
- // check for required fields
- if (gfacExperimentState == null) {
- throw new org.apache.thrift.protocol.TProtocolException("Required field 'gfacExperimentState' was not present! Struct: " + toString());
- }
- // check for sub-struct validity
- }
-
- private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
- try {
- write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
- } catch (org.apache.thrift.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
- try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bitfield = 0;
- read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
- } catch (org.apache.thrift.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private static class GfacExperimentStatusStandardSchemeFactory implements SchemeFactory {
- public GfacExperimentStatusStandardScheme getScheme() {
- return new GfacExperimentStatusStandardScheme();
- }
- }
-
- private static class GfacExperimentStatusStandardScheme extends StandardScheme<GfacExperimentStatus> {
-
- public void read(org.apache.thrift.protocol.TProtocol iprot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TField schemeField;
- iprot.readStructBegin();
- while (true)
- {
- schemeField = iprot.readFieldBegin();
- if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
- break;
- }
- switch (schemeField.id) {
- case 1: // GFAC_EXPERIMENT_STATE
- if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
- struct.gfacExperimentState = GfacExperimentState.findByValue(iprot.readI32());
- struct.setGfacExperimentStateIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- case 2: // TIME_OF_STATE_CHANGE
- if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
- struct.timeOfStateChange = iprot.readI64();
- struct.setTimeOfStateChangeIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- default:
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- iprot.readFieldEnd();
- }
- iprot.readStructEnd();
-
- // check for required fields of primitive type, which can't be checked in the validate method
- struct.validate();
- }
-
- public void write(org.apache.thrift.protocol.TProtocol oprot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
- struct.validate();
-
- oprot.writeStructBegin(STRUCT_DESC);
- if (struct.gfacExperimentState != null) {
- oprot.writeFieldBegin(GFAC_EXPERIMENT_STATE_FIELD_DESC);
- oprot.writeI32(struct.gfacExperimentState.getValue());
- oprot.writeFieldEnd();
- }
- if (struct.isSetTimeOfStateChange()) {
- oprot.writeFieldBegin(TIME_OF_STATE_CHANGE_FIELD_DESC);
- oprot.writeI64(struct.timeOfStateChange);
- oprot.writeFieldEnd();
- }
- oprot.writeFieldStop();
- oprot.writeStructEnd();
- }
-
- }
-
- private static class GfacExperimentStatusTupleSchemeFactory implements SchemeFactory {
- public GfacExperimentStatusTupleScheme getScheme() {
- return new GfacExperimentStatusTupleScheme();
- }
- }
-
- private static class GfacExperimentStatusTupleScheme extends TupleScheme<GfacExperimentStatus> {
-
- @Override
- public void write(org.apache.thrift.protocol.TProtocol prot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
- TTupleProtocol oprot = (TTupleProtocol) prot;
- oprot.writeI32(struct.gfacExperimentState.getValue());
- BitSet optionals = new BitSet();
- if (struct.isSetTimeOfStateChange()) {
- optionals.set(0);
- }
- oprot.writeBitSet(optionals, 1);
- if (struct.isSetTimeOfStateChange()) {
- oprot.writeI64(struct.timeOfStateChange);
- }
- }
-
- @Override
- public void read(org.apache.thrift.protocol.TProtocol prot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
- TTupleProtocol iprot = (TTupleProtocol) prot;
- struct.gfacExperimentState = GfacExperimentState.findByValue(iprot.readI32());
- struct.setGfacExperimentStateIsSet(true);
- BitSet incoming = iprot.readBitSet(1);
- if (incoming.get(0)) {
- struct.timeOfStateChange = iprot.readI64();
- struct.setTimeOfStateChangeIsSet(true);
- }
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
deleted file mode 100644
index 20099f2..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
+++ /dev/null
@@ -1,59 +0,0 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Autogenerated by Thrift Compiler (0.9.1)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
- */
-package org.apache.airavata.gfac.workspace.experiment;
-
-import org.apache.thrift.scheme.IScheme;
-import org.apache.thrift.scheme.SchemeFactory;
-import org.apache.thrift.scheme.StandardScheme;
-
-import org.apache.thrift.scheme.TupleScheme;
-import org.apache.thrift.protocol.TTupleProtocol;
-import org.apache.thrift.protocol.TProtocolException;
-import org.apache.thrift.EncodingUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.server.AbstractNonblockingServer.*;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.EnumMap;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.EnumSet;
-import java.util.Collections;
-import java.util.BitSet;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("all") public class gfacDataModelConstants {
-
- public static final String DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS";
-
- public static final String DEFAULT_PROJECT_NAME = "DEFAULT";
-
- public static final String SINGLE_APP_NODE_NAME = "SINGLE_APP_NODE";
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 15e2c63..e3705ac 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Random;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.model.error.LaunchValidationException;
@@ -73,8 +74,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat
public OrchestratorServerHandler() {
// registering with zk
try {
- String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
- + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+ String zkhostPort = AiravataZKUtils.getZKhostPort();
String airavataServerHostPort = ServerSettings.getSetting(Constants.ORCHESTRATOR_SERVER_HOST)
+ ":" + ServerSettings.getSetting(Constants.ORCHESTRATOR_SERVER_PORT);
try {
@@ -124,6 +124,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat
+
/**
* * After creating the experiment Data user have the
* * experimentID as the handler to the experiment, during the launchExperiment
http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
index 23089aa..4629b15 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
@@ -21,9 +21,11 @@
package org.apache.airavata.orchestrator.core.impl;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.cpi.GfacService;
+import org.apache.airavata.gfac.core.utils.GfacExperimentState;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
@@ -74,6 +76,7 @@ public class GFACServiceJobSubmitter implements JobSubmitter,Watcher{
mutex.wait();
}
}
+ AiravataZKUtils.
String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
List<String> children = zk.getChildren(gfacServer, this);
@@ -92,6 +95,9 @@ public class GFACServiceJobSubmitter implements JobSubmitter,Watcher{
if (exists1 == null) {
zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+
+ zk.create(newExpNode + File.separator + "state", GfacExperimentState.LAUNCHED.toString().getBytes(),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
logger.info("ExperimentID: " + experimentID + " taskID: " + taskID + " is re-running due to gfac failure");
}
[4/5] git commit: Merge branch 'zk-work'
Posted by la...@apache.org.
Merge branch 'zk-work'
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/34816536
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/34816536
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/34816536
Branch: refs/heads/master
Commit: 34816536df1c384aca6bd4d39bc0f6c778c1daa0
Parents: f723d49 e86504a
Author: lahiru <la...@apache.org>
Authored: Tue Jun 24 14:04:11 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Jun 24 14:04:11 2014 -0400
----------------------------------------------------------------------
airavata-api/airavata-api-server/pom.xml | 6 +
.../server/handler/AiravataServerHandler.java | 94 ++-
.../handler/ApplicationCatalogHandler.java | 2 +-
.../airavata/api/server/util/Constants.java | 2 -
.../client/samples/CreateLaunchExperiment.java | 226 ++-----
.../airavataAPI.thrift | 1 +
modules/commons/utils/pom.xml | 12 +-
.../airavata/common/utils/AiravataZKUtils.java | 59 ++
.../apache/airavata/common/utils/Constants.java | 15 +
.../main/resources/airavata-server.properties | 22 +-
modules/distribution/server/pom.xml | 6 +
.../server/src/main/assembly/bin-assembly.xml | 3 +-
modules/gfac/airavata-gfac-service/pom.xml | 4 +-
.../apache/airavata/gfac/server/GfacServer.java | 11 +-
.../airavata/gfac/server/GfacServerHandler.java | 102 ++-
.../apache/airavata/gfac/util/Constants.java | 26 -
modules/gfac/gfac-core/pom.xml | 6 +
.../gfac/core/context/JobExecutionContext.java | 3 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 657 +++++++++++++++++++
.../apache/airavata/gfac/core/cpi/GFacImpl.java | 7 +-
.../core/monitor/GfacInternalStatusUpdator.java | 109 +++
.../airavata/gfac/core/monitor/MonitorID.java | 6 +-
.../state/GfacExperimentStateChangeRequest.java | 71 ++
.../gfac/core/provider/AbstractProvider.java | 6 +-
.../airavata/gfac/core/utils/GFacUtils.java | 21 +
.../gfac/core/utils/GfacExperimentState.java | 81 +++
.../generate-gfac-stubs.sh | 2 +
.../gfacDataModel.thrift | 55 ++
.../orchestrator/server/OrchestratorServer.java | 1 +
.../server/OrchestratorServerHandler.java | 142 +++-
.../util/OrchestratorRecoveryHandler.java | 107 +++
modules/orchestrator/orchestrator-core/pom.xml | 6 +
.../core/context/OrchestratorContext.java | 15 +
.../core/gfac/GFacClientFactory.java | 2 +-
.../core/impl/GFACServiceJobSubmitter.java | 80 ++-
.../cpi/impl/AbstractOrchestrator.java | 14 +
.../cpi/impl/SimpleOrchestratorImpl.java | 1 +
37 files changed, 1737 insertions(+), 246 deletions(-)
----------------------------------------------------------------------
[5/5] git commit: adding new type of handler
Posted by la...@apache.org.
adding new type of handler
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d56dd443
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d56dd443
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d56dd443
Branch: refs/heads/master
Commit: d56dd443e6fb4242d475e15264e08e9cab2edc59
Parents: 3481653
Author: lahiru <la...@apache.org>
Authored: Tue Jun 24 14:05:28 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Jun 24 14:05:28 2014 -0400
----------------------------------------------------------------------
.../handler/AbstractRecoverableHandler.java | 65 ++++++++++++++++++++
.../core/handler/GFacRecoverableHandler.java | 44 +++++++++++++
2 files changed, 109 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/d56dd443/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java
new file mode 100644
index 0000000..0c6810b
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.handler;
+
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.GFacImpl;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+
+public abstract class AbstractRecoverableHandler implements GFacRecoverableHandler {
+ protected Registry registry = null;
+
+ protected MonitorPublisher publisher = null;
+
+ protected AbstractRecoverableHandler() {
+ publisher = GFacImpl.getMonitorPublisher(); // This will not be null because this will be initialize in GFacIml
+ }
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ registry = jobExecutionContext.getRegistry();
+ if(registry == null){
+ try {
+ registry = RegistryFactory.getDefaultRegistry();
+ } catch (RegistryException e) {
+ throw new GFacHandlerException("unable to create registry instance", e);
+ }
+ }
+ }
+
+ public MonitorPublisher getPublisher() {
+ return publisher;
+ }
+
+ public void setPublisher(MonitorPublisher publisher) {
+ this.publisher = publisher;
+ }
+
+ public Registry getRegistry() {
+ return registry;
+ }
+
+ public void setRegistry(Registry registry) {
+ this.registry = registry;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d56dd443/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java
new file mode 100644
index 0000000..fef6b76
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.handler;
+
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+
+/**
+ * This handler type can be used to implement stateful Operation and
+ * we recommend to use the ZK client to store and retrieve the states
+ * of the handler implementation. Framework level we use
+ * ZK to decide handler ran successfully or not so each handler
+ * execution details can be found in following zk path
+ * /gfac-experiment/<gfac-node-name>/experimentId+taskId/full-qualified-handlername/state
+ * ex: /gfac-experiment/gfac-node0/echoExperiment_2c6c11b8-dea0-4ec8-9832-f3e69fe2e6bb+IDontNeedaNode_682faa66-6218-4897-9271-656bfb8b2bd1/org.apache.airavata.gfac.handlers.Test/state
+ */
+public interface GFacRecoverableHandler extends GFacHandler {
+
+ /**
+ * This method can be used to implement recovering part of the stateful handler
+ * If you do not want to recover an already ran handler you can simply implement
+ * GfacAbstract Handler or GFacHandler or leave this recover method empty.
+ * @param jobExecutionContext
+ */
+ public void recover(JobExecutionContext jobExecutionContext);
+
+}
[2/5] git commit: committing the initial version of zk work with
resubmitting all the failed jobs to the available gfac cluster nodes
Posted by la...@apache.org.
committing the initial version of zk work with resubmitting all the failed jobs to the available gfac cluster nodes
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/362da4e8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/362da4e8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/362da4e8
Branch: refs/heads/master
Commit: 362da4e88c5ab8ddeba234b3ad7c5f829477c272
Parents: 2bcadf5
Author: lahiru <la...@apache.org>
Authored: Tue Jun 24 00:44:33 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Jun 24 00:44:33 2014 -0400
----------------------------------------------------------------------
airavata-api/airavata-api-server/pom.xml | 6 +
.../server/handler/AiravataServerHandler.java | 94 +++-
.../handler/ApplicationCatalogHandler.java | 2 +-
.../airavata/api/server/util/Constants.java | 2 -
.../client/samples/CreateLaunchExperiment.java | 226 +++-----
.../airavataAPI.thrift | 1 +
.../airavata/client/tools/DocumentCreator.java | 2 +-
.../apache/airavata/common/utils/Constants.java | 15 +
.../main/resources/airavata-server.properties | 22 +-
modules/distribution/server/pom.xml | 6 +
.../server/src/main/assembly/bin-assembly.xml | 3 +-
modules/gfac/airavata-gfac-service/pom.xml | 4 +-
.../apache/airavata/gfac/server/GfacServer.java | 11 +-
.../airavata/gfac/server/GfacServerHandler.java | 93 +++-
.../apache/airavata/gfac/util/Constants.java | 26 -
modules/gfac/gfac-core/pom.xml | 6 +
.../gfac/core/context/JobExecutionContext.java | 3 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 504 ++++++++++++++++++
.../apache/airavata/gfac/core/cpi/GFacImpl.java | 6 +
.../core/monitor/GfacInternalStatusUpdator.java | 105 ++++
.../airavata/gfac/core/monitor/MonitorID.java | 6 +-
.../state/GfacExperimentStateChangeRequest.java | 71 +++
.../gfac/core/provider/AbstractProvider.java | 6 +-
.../experiment/GfacExperimentState.java | 82 +++
.../experiment/GfacExperimentStatus.java | 516 +++++++++++++++++++
.../experiment/gfacDataModelConstants.java | 59 +++
.../generate-gfac-stubs.sh | 2 +
.../gfacDataModel.thrift | 55 ++
.../orchestrator/server/OrchestratorServer.java | 1 +
.../server/OrchestratorServerHandler.java | 141 ++++-
.../util/OrchestratorRecoveryHandler.java | 107 ++++
modules/orchestrator/orchestrator-core/pom.xml | 6 +
.../core/context/OrchestratorContext.java | 15 +
.../core/gfac/GFacClientFactory.java | 2 +-
.../core/impl/GFACServiceJobSubmitter.java | 74 ++-
.../cpi/impl/AbstractOrchestrator.java | 14 +
.../cpi/impl/SimpleOrchestratorImpl.java | 1 +
37 files changed, 2052 insertions(+), 243 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/pom.xml
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/pom.xml b/airavata-api/airavata-api-server/pom.xml
index ff21028..1a89fcc 100644
--- a/airavata-api/airavata-api-server/pom.xml
+++ b/airavata-api/airavata-api-server/pom.xml
@@ -66,6 +66,12 @@
<artifactId>slf4j-log4j12</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
+ <!-- zookeeper dependencies -->
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.0</version>
+ </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index be35568..a8f4297 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -23,6 +23,7 @@ package org.apache.airavata.api.server.handler;
import org.apache.airavata.api.Airavata;
import org.apache.airavata.api.airavataAPIConstants;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.model.error.*;
import org.apache.airavata.model.workspace.Project;
@@ -35,20 +36,91 @@ import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.registry.cpi.*;
import org.apache.airavata.registry.cpi.utils.Constants;
import org.apache.thrift.TException;
+import org.apache.tools.ant.types.selectors.FileSelector;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
-public class AiravataServerHandler implements Airavata.Iface {
-
- private Registry registry;
- private OrchestratorService.Client orchestratorClient;
+public class AiravataServerHandler implements Airavata.Iface, Watcher {
private static final Logger logger = LoggerFactory.getLogger(AiravataServerHandler.class);
-
+ private Registry registry;
+ private OrchestratorService.Client orchestratorClient;
+
+ private ZooKeeper zk;
+ private static Integer mutex = -1;
+
+
+ public AiravataServerHandler() {
+ try {
+ String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+ + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+ String airavataServerHostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.API_SERVER_HOST)
+ + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.API_SERVER_PORT);
+ try {
+ zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is required, this will only use to store some data
+ String apiServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_API_SERVER_NODE,"/airavata-server");
+ String OrchServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE,"/orchestrator-server");
+ String gfacServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_SERVER_NODE,"/gfac-server");
+ String gfacExperiments = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,"/gfac-experiments");
+
+ synchronized (mutex) {
+ mutex.wait(); // waiting for the syncConnected event
+ }
+ Stat zkStat = zk.exists(apiServer, false);
+ if (zkStat == null) {
+ zk.create(apiServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ String instantNode = apiServer + File.separator + String.valueOf(new Random().nextInt(Integer.MAX_VALUE));
+ zkStat = zk.exists(instantNode, false);
+ if (zkStat == null) {
+ zk.create(instantNode,
+ airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL); // other component will watch these childeren creation deletion to monitor the status of the node
+ logger.info("Successfully created airavata-server node");
+ }
+
+ zkStat = zk.exists(OrchServer, false);
+ if (zkStat == null) {
+ zk.create(OrchServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ logger.info("Successfully created orchestrator-server node");
+ }
+ zkStat = zk.exists(gfacServer, false);
+ if (zkStat == null) {
+ zk.create(gfacServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ logger.info("Successfully created gfac-server node");
+ }
+ zkStat = zk.exists(gfacServer, false);
+ if (zkStat == null) {
+ zk.create(gfacExperiments, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ logger.info("Successfully created gfac-server node");
+ }
+ logger.info("Finished starting ZK: " + zk);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ }
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ }
+ }
+
+ synchronized public void process(WatchedEvent watchedEvent) {
+ synchronized (mutex) {
+ mutex.notify();
+ }
+ }
/**
* Query Airavata to fetch the API version
@@ -905,8 +977,8 @@ public class AiravataServerHandler implements Airavata.Iface {
}
private OrchestratorService.Client getOrchestratorClient() {
- final int serverPort = Integer.parseInt(ServerSettings.getSetting(org.apache.airavata.api.server.util.Constants.ORCHESTRATOR_SERVER_PORT,"8940"));
- final String serverHost = ServerSettings.getSetting(org.apache.airavata.api.server.util.Constants.ORCHESTRATOR_SERVER_HOST, null);
+ final int serverPort = Integer.parseInt(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ORCHESTRATOR_SERVER_PORT,"8940"));
+ final String serverHost = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ORCHESTRATOR_SERVER_HOST, null);
return orchestratorClient = OrchestratorClientFactory.createOrchestratorClient(serverHost, serverPort);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
index b3ce8fb..efec768 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
@@ -64,7 +64,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ApplicationCatalogHandler implements Iface {
- private static final Logger logger = LoggerFactory.getLogger(AiravataServerHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(ApplicationCatalogHandler.class);
AiravataRegistry2 registry;
private AiravataRegistry2 getRegistry() throws RegException, AiravataConfigurationException{
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
index eb6a119..92eac88 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
@@ -27,6 +27,4 @@ public class Constants {
public static final String APP_CATALOG_SERVER_PORT = "app.catalog.server.port";
public static final String APP_CATALOG_SERVER_HOST = "app.catalog.server.host";
public static final String API_SERVER_MIN_THREADS = "apiserver.server.min.threads";
- public static final String ORCHESTRATOR_SERVER_HOST = "orchestrator.server.host";
- public static final String ORCHESTRATOR_SERVER_PORT = "orchestrator.server.port";
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index c8c9235..170fb99 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -61,130 +61,26 @@ public class CreateLaunchExperiment {
AiravataUtils.setExecutionAsClient();
final Airavata.Client airavata = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
System.out.println("API version is " + airavata.getAPIVersion());
- addDescriptors();
+// addDescriptors();
// final String expId = createExperimentForSSHHost(airavata);
// final String expId = createExperimentForTrestles(airavata);
// final String expId = createExperimentForStampede(airavata);
- final String expId = createExperimentForLocalHost(airavata);
+ for (int i = 0; i < 1; i++) {
+ final String expId = createExperimentForLocalHost(airavata);
// final String expId = createExperimentForLonestar(airavata);
// final String expId = createExperimentWRFTrestles(airavata);
- System.out.println("Experiment ID : " + expId);
+ System.out.println("Experiment ID : " + expId);
// updateExperiment(airavata, expId);
- launchExperiment(airavata, expId);
- System.out.println("Launched successfully");
- List<Experiment> experiments = getExperimentsForUser(airavata, "admin");
- List<ExperimentSummary> searchedExps1 = searchExperimentsByName(airavata, "admin", "echo");
- List<ExperimentSummary> searchedExps2 = searchExperimentsByDesc(airavata, "admin", "Echo");
- List<ExperimentSummary> searchedExps3 = searchExperimentsByApplication(airavata, "admin", "cho");
- List<Project> projects = getAllUserProject(airavata, "admin");
- List<Project> searchProjects1 = searchProjectsByProjectName(airavata, "admin", "project");
- List<Project> searchProjects2 = searchProjectsByProjectDesc(airavata, "admin", "test");
- for (Experiment exp : experiments){
- System.out.println(" exp id : " + exp.getExperimentID());
- System.out.println("experiment Description : " + exp.getDescription()) ;
- if (exp.getExperimentStatus() != null) {
- System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
- }
- }
-
- for (ExperimentSummary exp : searchedExps1){
- System.out.println("search results by experiment name");
- System.out.println("experiment ID : " + exp.getExperimentID()) ;
- System.out.println("experiment Description : " + exp.getDescription()) ;
- if (exp.getExperimentStatus() != null) {
- System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
- }
- }
-
- for (ExperimentSummary exp : searchedExps2){
- System.out.println("search results by experiment desc");
- System.out.println("experiment ID : " + exp.getExperimentID()) ;
- if (exp.getExperimentStatus() != null) {
- System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
- }
- }
-
- for (ExperimentSummary exp : searchedExps3){
- System.out.println("search results by application");
- System.out.println("experiment ID : " + exp.getExperimentID()) ;
- if (exp.getExperimentStatus() != null) {
- System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
- }
+ launchExperiment(airavata, expId);
}
-
- for (Project pr : searchProjects1){
- System.out.println(" project id : " + pr.getProjectID());
- }
-
- for (Project pr : searchProjects2){
- System.out.println(" project id : " + pr.getProjectID());
- System.out.println(" project desc : " + pr.getDescription());
- }
-
- Thread monitor = (new Thread(){
- public void run() {
- Map<String, JobStatus> jobStatuses = null;
- while (true) {
- try {
- jobStatuses = airavata.getJobStatuses(expId);
- Set<String> strings = jobStatuses.keySet();
- for (String key : strings) {
- JobStatus jobStatus = jobStatuses.get(key);
- if(jobStatus == null){
- return;
- }else {
- if (JobState.COMPLETE.equals(jobStatus.getJobState())) {
- System.out.println("Job completed Job ID: " + key);
- return;
- }else{
- System.out.println("Job ID:" + key + jobStatuses.get(key).getJobState().toString());
- }
- }
- }
- ExperimentStatus experimentStatus = airavata.getExperimentStatus(expId);
- if(experimentStatus.getExperimentState().equals(ExperimentState.FAILED)){
- return;
- }
- System.out.println(experimentStatus);
- Thread.sleep(5000);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- }
- });
- monitor.start();
- try {
- monitor.join();
- } catch (InterruptedException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace(); // To change body of catch statement use
- // File | Settings | File Templates.
- }
-
-// System.out.println(airavata.getExperimentStatus(expId));
- List<DataObjectType> output = airavata.getExperimentOutputs(expId);
- for (DataObjectType dataObjectType : output) {
- System.out.println(dataObjectType.getKey() + " : " + dataObjectType.getType() + " : " + dataObjectType.getValue());
-
-
- }
- String clonedExpId = cloneExperiment(airavata, expId);
- System.out.println("Cloned Experiment ID : " + clonedExpId);
-// System.out.println("retrieved exp id : " + experiment.getExperimentID());
} catch (Exception e) {
logger.error("Error while connecting with server", e.getMessage());
e.printStackTrace();
}
}
- public static void addDescriptors() throws AiravataAPIInvocationException,ApplicationSettingsException {
+ public static void addDescriptors() throws AiravataAPIInvocationException, ApplicationSettingsException {
try {
DocumentCreator documentCreator = new DocumentCreator(getAiravataAPI());
documentCreator.createLocalHostDocs();
@@ -219,8 +115,8 @@ public class CreateLaunchExperiment {
return airavataAPI;
}
- public static String createExperimentForTrestles(Airavata.Client client) throws TException {
- try{
+ public static String createExperimentForTrestles(Airavata.Client client) throws TException {
+ try {
List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
DataObjectType input = new DataObjectType();
input.setKey("echo_input");
@@ -255,50 +151,50 @@ public class CreateLaunchExperiment {
} catch (AiravataClientException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new AiravataClientException(e);
- }catch (TException e) {
+ } catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
-
- public static String createExperimentWRFTrestles(Airavata.Client client) throws TException {
- try{
+
+ public static String createExperimentWRFTrestles(Airavata.Client client) throws TException {
+ try {
List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
DataObjectType input = new DataObjectType();
input.setKey("WRF_Namelist");
input.setType(DataType.URI);
input.setValue("/Users/raminder/Downloads/wrf_sample_inputs/namelist.input");
-
+
DataObjectType input1 = new DataObjectType();
input1.setKey("WRF_Input_File");
input1.setType(DataType.URI);
input1.setValue("/Users/raminder/Downloads/wrf_sample_inputs/wrfinput_d01");
-
+
DataObjectType input2 = new DataObjectType();
input2.setKey("WRF_Boundary_File");
input2.setType(DataType.URI);
input2.setValue("/Users/raminder/Downloads/wrf_sample_inputs/wrfbdy_d01");
-
+
exInputs.add(input);
exInputs.add(input1);
exInputs.add(input2);
-
+
List<DataObjectType> exOut = new ArrayList<DataObjectType>();
DataObjectType output = new DataObjectType();
output.setKey("WRF_Output");
output.setType(DataType.URI);
output.setValue("");
-
+
DataObjectType output1 = new DataObjectType();
output1.setKey("WRF_Execution_Log");
output1.setType(DataType.URI);
output1.setValue("");
-
-
+
+
exOut.add(output);
exOut.add(output1);
-
+
Experiment simpleExperiment =
ExperimentModelUtil.createSimpleExperiment("default", "admin", "WRFExperiment", "Testing", "WRF", exInputs);
@@ -320,35 +216,35 @@ public class CreateLaunchExperiment {
} catch (AiravataClientException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new AiravataClientException(e);
- }catch (TException e) {
+ } catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
- public static String cloneExperiment(Airavata.Client client, String expId) throws TException {
- try{
+ public static String cloneExperiment(Airavata.Client client, String expId) throws TException {
+ try {
return client.cloneExperiment(expId, "cloneExperiment1");
- }catch (TException e) {
+ } catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
- public static void updateExperiment(Airavata.Client client, String expId) throws TException {
- try{
+ public static void updateExperiment(Airavata.Client client, String expId) throws TException {
+ try {
Experiment experiment = client.getExperiment(expId);
experiment.setDescription("updatedDescription");
- client.updateExperiment(expId, experiment );
- }catch (TException e) {
+ client.updateExperiment(expId, experiment);
+ } catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
- public static String createExperimentForLocalHost(Airavata.Client client) throws TException {
- try{
+ public static String createExperimentForLocalHost(Airavata.Client client) throws TException {
+ try {
List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
DataObjectType input = new DataObjectType();
input.setKey("echo_input");
@@ -386,14 +282,14 @@ public class CreateLaunchExperiment {
} catch (AiravataClientException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new AiravataClientException(e);
- }catch (TException e) {
+ } catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
-
- public static String createExperimentForSSHHost(Airavata.Client client) throws TException {
- try{
+
+ public static String createExperimentForSSHHost(Airavata.Client client) throws TException {
+ try {
List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
DataObjectType input = new DataObjectType();
input.setKey("echo_input");
@@ -432,13 +328,14 @@ public class CreateLaunchExperiment {
} catch (AiravataClientException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new AiravataClientException(e);
- }catch (TException e) {
+ } catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
- public static String createExperimentForStampede(Airavata.Client client) throws TException {
- try{
+
+ public static String createExperimentForStampede(Airavata.Client client) throws TException {
+ try {
List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
DataObjectType input = new DataObjectType();
input.setKey("echo_input");
@@ -477,13 +374,14 @@ public class CreateLaunchExperiment {
} catch (AiravataClientException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new AiravataClientException(e);
- }catch (TException e) {
+ } catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
- public static String createExperimentForLonestar(Airavata.Client client) throws TException {
- try{
+
+ public static String createExperimentForLonestar(Airavata.Client client) throws TException {
+ try {
List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
DataObjectType input = new DataObjectType();
input.setKey("echo_input");
@@ -531,15 +429,15 @@ public class CreateLaunchExperiment {
}
}
throw e;
- }catch (TException e) {
+ } catch (TException e) {
logger.error("Error occured while creating the experiment...", e.getMessage());
throw new TException(e);
}
}
-
-
- public static void launchExperiment (Airavata.Client client, String expId)
- throws TException{
+
+
+ public static void launchExperiment(Airavata.Client client, String expId)
+ throws TException {
try {
client.launchExperiment(expId, "testToken");
} catch (ExperimentNotFoundException e) {
@@ -554,13 +452,13 @@ public class CreateLaunchExperiment {
} catch (AiravataClientException e) {
logger.error("Error occured while launching the experiment...", e.getMessage());
throw new AiravataClientException(e);
- }catch (TException e) {
+ } catch (TException e) {
logger.error("Error occured while launching the experiment...", e.getMessage());
throw new TException(e);
}
}
- public static List<Experiment> getExperimentsForUser (Airavata.Client client, String user){
+ public static List<Experiment> getExperimentsForUser(Airavata.Client client, String user) {
try {
return client.getAllUserExperiments(user);
} catch (AiravataSystemException e) {
@@ -569,13 +467,13 @@ public class CreateLaunchExperiment {
e.printStackTrace();
} catch (AiravataClientException e) {
e.printStackTrace();
- }catch (TException e){
+ } catch (TException e) {
e.printStackTrace();
}
return null;
}
- public static List<Project> getAllUserProject (Airavata.Client client, String user){
+ public static List<Project> getAllUserProject(Airavata.Client client, String user) {
try {
return client.getAllUserProjects(user);
} catch (AiravataSystemException e) {
@@ -584,13 +482,13 @@ public class CreateLaunchExperiment {
e.printStackTrace();
} catch (AiravataClientException e) {
e.printStackTrace();
- }catch (TException e){
+ } catch (TException e) {
e.printStackTrace();
}
return null;
}
- public static List<Project> searchProjectsByProjectName (Airavata.Client client, String user, String projectName){
+ public static List<Project> searchProjectsByProjectName(Airavata.Client client, String user, String projectName) {
try {
return client.searchProjectsByProjectName(user, projectName);
} catch (AiravataSystemException e) {
@@ -599,13 +497,13 @@ public class CreateLaunchExperiment {
e.printStackTrace();
} catch (AiravataClientException e) {
e.printStackTrace();
- }catch (TException e){
+ } catch (TException e) {
e.printStackTrace();
}
return null;
}
- public static List<Project> searchProjectsByProjectDesc (Airavata.Client client, String user, String desc){
+ public static List<Project> searchProjectsByProjectDesc(Airavata.Client client, String user, String desc) {
try {
return client.searchProjectsByProjectDesc(user, desc);
} catch (AiravataSystemException e) {
@@ -614,14 +512,14 @@ public class CreateLaunchExperiment {
e.printStackTrace();
} catch (AiravataClientException e) {
e.printStackTrace();
- }catch (TException e){
+ } catch (TException e) {
e.printStackTrace();
}
return null;
}
- public static List<ExperimentSummary> searchExperimentsByName (Airavata.Client client, String user, String expName){
+ public static List<ExperimentSummary> searchExperimentsByName(Airavata.Client client, String user, String expName) {
try {
return client.searchExperimentsByName(user, expName);
} catch (AiravataSystemException e) {
@@ -630,13 +528,13 @@ public class CreateLaunchExperiment {
e.printStackTrace();
} catch (AiravataClientException e) {
e.printStackTrace();
- }catch (TException e){
+ } catch (TException e) {
e.printStackTrace();
}
return null;
}
- public static List<ExperimentSummary> searchExperimentsByDesc(Airavata.Client client, String user, String desc){
+ public static List<ExperimentSummary> searchExperimentsByDesc(Airavata.Client client, String user, String desc) {
try {
return client.searchExperimentsByDesc(user, desc);
} catch (AiravataSystemException e) {
@@ -645,13 +543,13 @@ public class CreateLaunchExperiment {
e.printStackTrace();
} catch (AiravataClientException e) {
e.printStackTrace();
- }catch (TException e){
+ } catch (TException e) {
e.printStackTrace();
}
return null;
}
- public static List<ExperimentSummary> searchExperimentsByApplication(Airavata.Client client, String user, String app){
+ public static List<ExperimentSummary> searchExperimentsByApplication(Airavata.Client client, String user, String app) {
try {
return client.searchExperimentsByApplication(user, app);
} catch (AiravataSystemException e) {
@@ -660,7 +558,7 @@ public class CreateLaunchExperiment {
e.printStackTrace();
} catch (AiravataClientException e) {
e.printStackTrace();
- }catch (TException e){
+ } catch (TException e) {
e.printStackTrace();
}
return null;
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/airavataAPI.thrift b/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
index 7b2ec60..9c9ec7f 100644
--- a/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
+++ b/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
@@ -29,6 +29,7 @@ include "airavataDataModel.thrift"
include "experimentModel.thrift"
include "workspaceModel.thrift"
include "applicationCatalogAPI.thrift"
+include "gfacDataMode.thrift"
namespace java org.apache.airavata.api
namespace php Airavata.API
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
index d573da9..ffcff17 100644
--- a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
+++ b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
@@ -671,7 +671,7 @@ public class DocumentCreator {
ApplicationDescription applicationDeploymentDescription = new ApplicationDescription();
ApplicationDeploymentDescriptionType applicationDeploymentDescriptionType = applicationDeploymentDescription.getType();
applicationDeploymentDescriptionType.addNewApplicationName().setStringValue(serviceName);
- applicationDeploymentDescriptionType.setExecutableLocation("/bin/echo");
+ applicationDeploymentDescriptionType.setExecutableLocation("/tmp/echo.sh");
applicationDeploymentDescriptionType.setScratchWorkingDirectory("/tmp");
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
index bac5913..b8f999a 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -31,4 +31,19 @@ public final class Constants {
public static final String GFAC_CONFIG_XML = "gfac-config.xml";
public static final String PUSH = "push";
public static final String PULL = "pull";
+ public static final String API_SERVER_PORT = "apiserver.server.port";
+ public static final String API_SERVER_HOST = "apiserver.server.host";
+ public static final String ORCHESTRATOR_SERVER_HOST = "orchestrator.server.host";
+ public static final String ORCHESTRATOR_SERVER_PORT = "orchestrator.server.port";
+ public static final String GFAC_SERVER_HOST = "gfac.server.host";
+ public static final String GFAC_SERVER_PORT = "gfac.server.port";
+ public static final String ZOOKEEPER_SERVER_HOST = "zookeeper.server.host";
+ public static final String ZOOKEEPER_SERVER_PORT = "zookeeper.server.port";
+ public static final String ZOOKEEPER_API_SERVER_NODE = "airavata-server";
+ public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NODE = "orchestrator-server";
+ public static final String ZOOKEEPER_GFAC_SERVER_NODE = "gfac-server";
+ public static final String ZOOKEEPER_GFAC_EXPERIMENT_NODE = "gfac-experiments";
+ public static final String ZOOKEEPER_GFAC_SERVER_NAME = "gfac-server-name";
+ public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NAME = "orchestrator-server-name";
+ public static final String ZOOKEEPER_API_SERVER_NAME = "api-server-name";
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index b99c6cb..625f7f2 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -276,11 +276,11 @@ monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apach
amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
connection.name=xsede
-activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataExperimentStatusUpdator
+activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator
###---------------------------Orchestrator module Configurations---------------------------###
-job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
-#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
+#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
+job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
submitter.interval=10000
threadpool.size=10
@@ -297,7 +297,7 @@ appcatalogserver=org.apache.airavata.api.server.ApplicationCatalogServer
###---------------------------Airavata Server Configurations---------------------------###
-servers=apiserver,appcatalogserver,orchestrator
+servers=apiserver,appcatalogserver,orchestrator,gfac
#shutdown.trategy=NONE
shutdown.trategy=SELF_TERMINATE
# credential store specific parameters
@@ -323,4 +323,18 @@ app.catalog.server.host=localhost
app.catalog.server.port=8931
orchestrator.server.host=localhost
orchestrator.server.port=8940
+gfac.server.host=localhost
+gfac.server.port=8950
orchestrator.server.min.threads=30
+
+##----------------------------- Zookeeper Server Configurations ----------------------###
+
+zookeeper.server.host=localhost
+zookeeper.server.port=2181
+airavata-server=/api-server
+orchestrator-server=/orchestrator-server
+gfac-server=/gfac-server
+gfac-experiments=/gfac-experiments
+gfac-server-name=gfac-node0
+orchestrator-server-name=orch-node0
+airavata-server-name=api-node0
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index 108bd32..4d51a13 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -562,7 +562,13 @@
<artifactId>jackson-annotations</artifactId>
<version>2.0.0</version>
</dependency>
+ <!-- zookeeper dependencies -->
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.0</version>
+ </dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/distribution/server/src/main/assembly/bin-assembly.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/src/main/assembly/bin-assembly.xml b/modules/distribution/server/src/main/assembly/bin-assembly.xml
index 0c3e43a..b6c5aa7 100644
--- a/modules/distribution/server/src/main/assembly/bin-assembly.xml
+++ b/modules/distribution/server/src/main/assembly/bin-assembly.xml
@@ -196,7 +196,7 @@
<include>org.apache.airavata:airavata-data-models:jar</include>
<include>org.apache.airavata:airavata-credential-store:jar</include>
<include>org.apache.airavata:airavata-gfac-core:jar</include>
- <include>org.apache.airavata:airavata-gfac-server:jar</include>
+ <include>org.apache.airavata:airavata-gfac-service:jar</include>
<include>org.apache.airavata:airavata-gfac-ssh:jar</include>
<include>org.apache.airavata:airavata-gfac-local:jar</include>
<include>org.apache.airavata:airavata-gfac-gsissh:jar</include>
@@ -243,6 +243,7 @@
<include>com.fasterxml.jackson.core:jackson-databind</include>
<include>com.fasterxml.jackson.core:jackson-core</include>
<include>com.fasterxml.jackson.core:jackson-annotations</include>
+ <include>org.apache.zookeeper:zookeeper</include>
<!-- unicore start
<include>eu.unicore:ogsabes-client</include>
<include>eu.unicore:ogsabes-types</include>
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/pom.xml b/modules/gfac/airavata-gfac-service/pom.xml
index e57eccc..d02a658 100644
--- a/modules/gfac/airavata-gfac-service/pom.xml
+++ b/modules/gfac/airavata-gfac-service/pom.xml
@@ -40,12 +40,12 @@
<artifactId>airavata-gfac-core</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.airavata</groupId>
<artifactId>airavata-model-utils</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.airavata</groupId>
<artifactId>airavata-server-configuration</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
index bf6f933..f96e40f 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
@@ -20,10 +20,10 @@
*/
package org.apache.airavata.gfac.server;
+import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.IServer;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.cpi.GfacService;
-import org.apache.airavata.gfac.util.Constants;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
@@ -32,6 +32,8 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
+
public class GfacServer implements IServer{
private final static Logger logger = LoggerFactory.getLogger(GfacServer.class);
@@ -50,7 +52,12 @@ public class GfacServer implements IServer{
throws Exception {
try {
final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.GFAC_SERVER_PORT, "8950"));
- TServerTransport serverTransport = new TServerSocket(serverPort);
+ final String serverHost = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST, null);
+
+ InetSocketAddress inetSocketAddress = new InetSocketAddress(serverHost, serverPort);
+
+ TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
+
server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).processor(gfacServerHandlerProcessor));
new Thread() {
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 1b8d1e8..27733f9 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -20,12 +20,16 @@
*/
package org.apache.airavata.gfac.server;
+import com.google.common.eventbus.EventBus;
import org.apache.airavata.common.exception.AiravataConfigurationException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.cpi.GFac;
import org.apache.airavata.gfac.core.cpi.GFacImpl;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.cpi.GfacService;
import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
@@ -35,28 +39,111 @@ import org.apache.airavata.registry.api.Gateway;
import org.apache.airavata.registry.api.exception.RegException;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
-public class GfacServerHandler implements GfacService.Iface {
+
+public class GfacServerHandler implements GfacService.Iface, Watcher{
private final static Logger logger = LoggerFactory.getLogger(GfacServerHandler.class);
private Registry registry;
private String registryURL;
+
private String gatewayName;
+
private String airavataUserName;
+ private ZooKeeper zk;
+
+ private boolean connected = false;
+
+ private static Integer mutex = new Integer(-1);
+
+ private MonitorPublisher publisher;
+
+
public GfacServerHandler() {
+ // registering with zk
+ try {
+ String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+ + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+ String airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
+ + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
+ try {
+ zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is required, this will only use to store some data
+ String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE,"/gfac-server");
+ String gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,"/gfac-experiments");
+ synchronized(mutex){
+ mutex.wait(); // waiting for the syncConnected event
+ }
+ Stat zkStat = zk.exists(gfacServer, false);
+ if (zkStat == null) {
+ zk.create(gfacServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
+ String instantNode = gfacServer + File.separator + instanceId;
+ zkStat = zk.exists(instantNode, false);
+ if (zkStat == null) {
+ zk.create(instantNode,
+ airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL); // other component will watch these childeren creation deletion to monitor the status of the node
+ }
+ zkStat = zk.exists(gfacExperiments, false);
+ if (zkStat == null) {
+ zk.create(gfacExperiments,
+ airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ zkStat = zk.exists(gfacExperiments + File.separator + instanceId, false);
+ if (zkStat == null) {
+ zk.create(gfacExperiments + File.separator + instanceId,
+ airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }else{
+ logger.error(" Zookeeper is inconsistent state !!!!!");
+ }
+ logger.info("Finished starting ZK: " + zk);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ }
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ }
try {
+ publisher = new MonitorPublisher(new EventBus());
registry = RegistryFactory.getDefaultRegistry();
setGatewayProperties();
+ BetterGfacImpl.startDaemonHandlers();
+ BetterGfacImpl.startStatusUpdators(registry,zk,publisher);
}catch (Exception e){
logger.error("Error initialising GFAC",e);
}
}
+ synchronized public void process(WatchedEvent watchedEvent) {
+ synchronized (mutex) {
+ Event.KeeperState state = watchedEvent.getState();
+ if (state == Event.KeeperState.SyncConnected) {
+ mutex.notify();
+ connected = true;
+ }
+ }
+ }
+
public String getGFACServiceVersion() throws TException {
return gfac_cpi_serviceConstants.GFAC_CPI_VERSION;
}
@@ -114,9 +201,9 @@ public class GfacServerHandler implements GfacService.Iface {
private GFac getGfac()throws TException{
try {
- return new GFacImpl(registry, null,
+ return new BetterGfacImpl(registry, null,
AiravataRegistryFactory.getRegistry(new Gateway(getGatewayName()),
- new AiravataUser(getAiravataUserName())));
+ new AiravataUser(getAiravataUserName())),zk,publisher);
} catch (RegException e) {
throw new TException("Error initializing gfac instance",e);
} catch (AiravataConfigurationException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java
deleted file mode 100644
index 3e48898..0000000
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.util;
-
-public class Constants {
- public static final String GFAC_SERVER_PORT = "gfac.server.port";
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index da86055..19d5f09 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -120,6 +120,12 @@
<version>${xmlbeans.version}</version>
</dependency>
<!-- this is the dependency for amqp implementation -->
+ <!-- zookeeper dependencies -->
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.0</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index 86f4055..170c2c8 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -21,6 +21,7 @@
package org.apache.airavata.gfac.core.context;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -37,7 +38,7 @@ import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
import org.apache.airavata.registry.cpi.Registry;
-public class JobExecutionContext extends AbstractContext{
+public class JobExecutionContext extends AbstractContext implements Serializable{
private GFacConfiguration gfacConfiguration;
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
new file mode 100644
index 0000000..195bfc1
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -0,0 +1,504 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.cpi;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.eventbus.EventBus;
+
+import org.apache.airavata.client.api.AiravataAPI;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.commons.gfac.type.ServiceDescription;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacConfiguration;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.Scheduler;
+import org.apache.airavata.gfac.core.context.ApplicationContext;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.monitor.*;
+import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
+import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
+import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
+import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener;
+import org.apache.airavata.gfac.core.handler.GFacHandler;
+import org.apache.airavata.gfac.core.provider.GFacProvider;
+import org.apache.airavata.gfac.core.scheduler.HostScheduler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.handler.ThreadedHandler;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.api.AiravataRegistry2;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.tools.ant.taskdefs.optional.j2ee.HotDeploymentTool;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPathExpressionException;
+
+/**
+ * This is the GFac CPI class for external usage, this simply have a single method to submit a job to
+ * the resource, required data for the job has to be stored in registry prior to invoke this object.
+ */
+public class BetterGfacImpl implements GFac {
+ private static final Logger log = LoggerFactory.getLogger(GFacImpl.class);
+ public static final String ERROR_SENT = "ErrorSent";
+
+ private Registry registry;
+
+ private AiravataAPI airavataAPI;
+
+ private AiravataRegistry2 airavataRegistry2;
+
+ private ZooKeeper zk; // we are not storing zk instance in to jobExecution context
+
+ private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
+
+ private static File gfacConfigFile;
+
+ private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
+
+ private static MonitorPublisher monitorPublisher;
+
+ /**
+ * Constructor for GFac
+ *
+ * @param registry
+ * @param airavataAPI
+ * @param airavataRegistry2
+ * @param zooKeeper
+ */
+ public BetterGfacImpl(Registry registry, AiravataAPI airavataAPI, AiravataRegistry2 airavataRegistry2, ZooKeeper zooKeeper,
+ MonitorPublisher publisher) {
+ this.registry = registry;
+ this.airavataAPI = airavataAPI;
+ this.airavataRegistry2 = airavataRegistry2;
+ monitorPublisher = publisher; // This is a EventBus common for gfac
+ this.zk = zooKeeper;
+ }
+
+ public static void startStatusUpdators(Registry registry,ZooKeeper zk,MonitorPublisher publisher) {
+ try {
+ String[] listenerClassList = ServerSettings.getActivityListeners();
+ for (String listenerClass : listenerClassList) {
+ Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
+ AbstractActivityListener abstractActivityListener = aClass.newInstance();
+ activityListeners.add(abstractActivityListener);
+ abstractActivityListener.setup(publisher, registry,zk);
+ log.info("Registering listener: " + listenerClass);
+ publisher.registerListener(abstractActivityListener);
+ }
+ }catch (ClassNotFoundException e) {
+ log.error("Error loading the listener classes configured in airavata-server.properties",e);
+ } catch (InstantiationException e) {
+ log.error("Error loading the listener classes configured in airavata-server.properties",e);
+ } catch (IllegalAccessException e) {
+ log.error("Error loading the listener classes configured in airavata-server.properties",e);
+ } catch (ApplicationSettingsException e){
+ log.error("Error loading the listener classes configured in airavata-server.properties",e);
+ }
+ }
+ public static void startDaemonHandlers() {
+ List<GFacHandlerConfig> daemonHandlerConfig = null;
+ URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+ gfacConfigFile = new File(resource.getPath());
+ try {
+ daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile);
+ } catch (ParserConfigurationException e) {
+ log.error("Error parsing gfac-config.xml, double check the xml configuration",e);
+ } catch (IOException e) {
+ log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+ } catch (SAXException e) {
+ log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+ } catch (XPathExpressionException e) {
+ log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+ }
+
+ for(GFacHandlerConfig handlerConfig:daemonHandlerConfig){
+ String className = handlerConfig.getClassName();
+ try {
+ Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class);
+ ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance();
+ threadedHandler.initProperties(handlerConfig.getProperties());
+ daemonHandlers.add(threadedHandler);
+ }catch (ClassNotFoundException e){
+ log.error("Error initializing the handler: " + className);
+ log.error(className + " class has to implement " + ThreadedHandler.class);
+ } catch (InstantiationException e) {
+ log.error("Error initializing the handler: " + className);
+ log.error(className + " class has to implement " + ThreadedHandler.class);
+ } catch (IllegalAccessException e) {
+ log.error("Error initializing the handler: " + className);
+ log.error(className + " class has to implement " + ThreadedHandler.class);
+ } catch (GFacHandlerException e) {
+ log.error("Error initializing the handler " + className);
+ } catch (GFacException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ for(ThreadedHandler tHandler:daemonHandlers){
+ (new Thread(tHandler)).start();
+ }
+ }
+
+ /**
+ * This can be used to submit jobs for testing purposes just by filling parameters by hand (JobExecutionContext)
+ */
+ public BetterGfacImpl() {
+ daemonHandlers = new ArrayList<ThreadedHandler>();
+ startDaemonHandlers();
+ }
+
+ /**
+ * This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers
+ * And update the registry occordingly, so the users can query the database to retrieve status and output from Registry
+ *
+ * @param experimentID
+ * @return
+ * @throws GFacException
+ */
+ public boolean submitJob(String experimentID,String taskID) throws GFacException {
+ JobExecutionContext jobExecutionContext = null;
+ try {
+ jobExecutionContext = createJEC(experimentID, taskID);
+ return submitJob(jobExecutionContext);
+ } catch (Exception e) {
+ log.error("Error inovoking the job with experiment ID: " + experimentID);
+ throw new GFacException(e);
+ }
+ }
+
+ private JobExecutionContext createJEC(String experimentID, String taskID) throws Exception {
+ JobExecutionContext jobExecutionContext;
+ TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
+
+ // this is wear our new model and old model is mapping (so serviceName in ExperimentData and service name in ServiceDescriptor
+ // has to be same.
+
+ // 1. Get the Task from the task ID and construct the Job object and save it in to registry
+ // 2. Add another property to jobExecutionContext and read them inside the provider and use it.
+ String serviceName = taskData.getApplicationId();
+ if (serviceName == null) {
+ throw new GFacException("Error executing the job because there is not Application Name in this Experiment: " + serviceName );
+ }
+
+ ServiceDescription serviceDescription = airavataRegistry2.getServiceDescriptor(serviceName);
+ if (serviceDescription == null ) {
+ throw new GFacException("Error executing the job because there is not Application Name in this Experiment: " + serviceName );
+ }
+ String hostName;
+ HostDescription hostDescription = null;
+ if(taskData.getTaskScheduling().getResourceHostId() != null){
+ hostName = taskData.getTaskScheduling().getResourceHostId();
+ hostDescription = airavataRegistry2.getHostDescriptor(hostName);
+ }else{
+ List<HostDescription> registeredHosts = new ArrayList<HostDescription>();
+ Map<String, ApplicationDescription> applicationDescriptors = airavataRegistry2.getApplicationDescriptors(serviceName);
+ for (String hostDescName : applicationDescriptors.keySet()) {
+ registeredHosts.add(airavataRegistry2.getHostDescriptor(hostDescName));
+ }
+ Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
+ HostScheduler hostScheduler = aClass.newInstance();
+ hostDescription = hostScheduler.schedule(registeredHosts);
+ hostName = hostDescription.getType().getHostName();
+ }
+ if(hostDescription == null){
+ throw new GFacException("Error executing the job as the host is not registered " + hostName);
+ }
+ ApplicationDescription applicationDescription = airavataRegistry2.getApplicationDescriptors(serviceName, hostName);
+ URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+ Properties configurationProperties = ServerSettings.getProperties();
+ GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), airavataAPI, configurationProperties);
+
+
+ // start constructing jobexecutioncontext
+ jobExecutionContext = new JobExecutionContext(gFacConfiguration, serviceName);
+
+ // setting experiment/task/workflownode related information
+ Experiment experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentID);
+ jobExecutionContext.setExperiment(experiment);
+ jobExecutionContext.setExperimentID(experimentID);
+ jobExecutionContext.setWorkflowNodeDetails(experiment.getWorkflowNodeDetailsList().get(0));
+ jobExecutionContext.setTaskData(taskData);
+
+ // setting the registry
+ jobExecutionContext.setRegistry(registry);
+
+ ApplicationContext applicationContext = new ApplicationContext();
+ applicationContext.setApplicationDeploymentDescription(applicationDescription);
+ applicationContext.setHostDescription(hostDescription);
+ applicationContext.setServiceDescription(serviceDescription);
+ jobExecutionContext.setApplicationContext(applicationContext);
+
+ List<DataObjectType> experimentInputs = taskData.getApplicationInputs();
+ jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getMessageContext(experimentInputs,
+ serviceDescription.getType().getInputParametersArray())));
+
+ List<DataObjectType> outputData = taskData.getApplicationOutputs();
+ jobExecutionContext.setOutMessageContext(new MessageContext(GFacUtils.getMessageContext(outputData,
+ serviceDescription.getType().getOutputParametersArray())));
+
+ jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
+
+ return jobExecutionContext;
+ }
+
+ public boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException {
+ // We need to check whether this job is submitted as a part of a large workflow. If yes,
+ // we need to setup workflow tracking listerner.
+ String workflowInstanceID = null;
+ if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
+ // This mean we need to register workflow tracking listener.
+ //todo implement WorkflowTrackingListener properly
+ registerWorkflowTrackingListener(workflowInstanceID, jobExecutionContext);
+ }
+ // Register log event listener. This is required in all scenarios.
+ jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
+ schedule(jobExecutionContext);
+ return true;
+ }
+
+ private void schedule(JobExecutionContext jobExecutionContext) throws GFacException {
+ // Scheduler will decide the execution flow of handlers and provider which handles
+ // the job.
+ String experimentID = jobExecutionContext.getExperimentID();
+ try {
+ Scheduler.schedule(jobExecutionContext);
+
+ // Executing in handlers in the order as they have configured in GFac configuration
+ invokeInFlowHandlers(jobExecutionContext);
+// if (experimentID != null){
+// registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
+// }
+
+ // After executing the in handlers provider instance should be set to job execution context.
+ // We get the provider instance and execute it.
+ GFacProvider provider = jobExecutionContext.getProvider();
+ if (provider != null) {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.PROVIDERINVOKING));
+ initProvider(provider, jobExecutionContext);
+ executeProvider(provider, jobExecutionContext);
+ disposeProvider(provider, jobExecutionContext);
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.PROVIDERINVOKED));
+ }
+ if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+ invokeOutFlowHandlers(jobExecutionContext);
+ }
+ } catch (Exception e) {
+ try {
+ // we make the experiment as failed due to exception scenario
+ monitorPublisher.publish(new
+ ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+ ExperimentState.FAILED));
+ // Updating the task status if there's any task associated
+ monitorPublisher.publish(new TaskStatusChangeRequest(
+ new TaskIdentity(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED
+ ));
+ monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext),
+ new JobIdentity(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getJobDetails().getJobID()), JobState.FAILED));
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.FAILED));
+ } catch (NullPointerException e1) {
+ log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " +
+ "NullPointerException occurred because at this point there might not have Job Created", e1, e);
+ }
+ jobExecutionContext.setProperty(ERROR_SENT, "true");
+ jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
+ throw new GFacException(e.getMessage(), e);
+ }
+ }
+
+ private void initProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+ try {
+ provider.initialize(jobExecutionContext);
+ } catch (Exception e) {
+ throw new GFacException("Error while initializing provider " + provider.getClass().getName() + ".", e);
+ }
+ }
+
+ private void executeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+ try {
+ provider.execute(jobExecutionContext);
+ } catch (Exception e) {
+ throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e);
+ }
+ }
+
+ private void disposeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+ try {
+ provider.dispose(jobExecutionContext);
+ } catch (Exception e) {
+ throw new GFacException("Error while invoking provider " + provider.getClass().getName() + " dispose method.", e);
+ }
+ }
+
+ private void registerWorkflowTrackingListener(String workflowInstanceID, JobExecutionContext jobExecutionContext) {
+ String workflowNodeID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_NODE_ID);
+ String topic = (String) jobExecutionContext.getProperty(Constants.PROP_TOPIC);
+ String brokerUrl = (String) jobExecutionContext.getProperty(Constants.PROP_BROKER_URL);
+ jobExecutionContext.getNotificationService().registerListener(
+ new WorkflowTrackingListener(workflowInstanceID, workflowNodeID, brokerUrl, topic));
+
+ }
+
+ private void invokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ ,GfacExperimentState.INHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ try {
+ handler.invoke(jobExecutionContext);
+ } catch (GFacHandlerException e) {
+ throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+ }
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ ,GfacExperimentState.INHANDLERSINVOKED));
+ }
+
+ public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+ List<GFacHandlerConfig> handlers = null;
+ if(gFacConfiguration != null){
+ handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+ }else {
+ try {
+ jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e) {
+ log.error("Error constructing job execution context during outhandler invocation");
+ throw new GFacException(e);
+ }
+ schedule(jobExecutionContext);
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.OUTHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ try {
+ handler.invoke(jobExecutionContext);
+ } catch (Exception e) {
+ // TODO: Better error reporting.
+ throw new GFacException("Error Executing a OutFlow Handler", e);
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.OUTHANDLERSINVOKED));
+ }
+
+ // At this point all the execution is finished so we update the task and experiment statuses.
+ // Handler authors does not have to worry about updating experiment or task statuses.
+ monitorPublisher.publish(new
+ ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+ ExperimentState.COMPLETED));
+ // Updating the task status if there's any task associated
+ monitorPublisher.publish(new TaskStatusChangeRequest(
+ new TaskIdentity(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
+ ));
+
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.COMPLETED));
+ }
+
+
+ public AiravataAPI getAiravataAPI() {
+ return airavataAPI;
+ }
+
+ public AiravataRegistry2 getAiravataRegistry2() {
+ return airavataRegistry2;
+ }
+
+ public static List<ThreadedHandler> getDaemonHandlers() {
+ return daemonHandlers;
+ }
+
+ public static String getErrorSent() {
+ return ERROR_SENT;
+ }
+
+ public File getGfacConfigFile() {
+ return gfacConfigFile;
+ }
+
+ public static MonitorPublisher getMonitorPublisher() {
+ return monitorPublisher;
+ }
+
+ public Registry getRegistry() {
+ return registry;
+ }
+
+ public ZooKeeper getZk() {
+ return zk;
+ }
+
+ public void setZk(ZooKeeper zk) {
+ this.zk = zk;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index f1fa244..a6908ba 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -58,11 +58,13 @@ import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.registry.api.AiravataRegistry2;
import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.tools.ant.taskdefs.optional.j2ee.HotDeploymentTool;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
@@ -83,6 +85,8 @@ public class GFacImpl implements GFac {
private AiravataAPI airavataAPI;
private AiravataRegistry2 airavataRegistry2;
+
+ private ZooKeeper zk;
private static List<ThreadedHandler> daemonHandlers;
@@ -436,6 +440,8 @@ public class GFacImpl implements GFac {
throw new GFacException("Error Executing a OutFlow Handler", e);
}
}
+
+ monitorPublisher.publish(GfacExperimentState.COMPLETED);
// At this point all the execution is finished so we update the task and experiment statuses.
// Handler authors does not have to worry about updating experiment or task statuses.
monitorPublisher.publish(new