You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/01/27 20:27:14 UTC
[2/5] airavata git commit: retiring workflow tracking schema -
AIRAVATA-1557
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataDurationImpl.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataDurationImpl.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataDurationImpl.java
deleted file mode 100644
index 225b593..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataDurationImpl.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl.state;
-
-import java.net.URI;
-
-import org.apache.airavata.workflow.tracking.common.DataDurationObj;
-import org.apache.airavata.workflow.tracking.common.DataObj;
-
-public class DataDurationImpl extends DurationImpl implements DataDurationObj {
-
- protected DataObj dataObj;
- protected URI remoteLocation;
-
- public DataDurationImpl(DataObj dataObj_, URI remoteLocation_) {
-
- super(); // set start time to now
- dataObj = dataObj_;
- remoteLocation = remoteLocation_;
- }
-
- public DataDurationImpl(DataObj dataObj_, URI remoteLocation_, long fixedDuration) {
-
- super(fixedDuration); // set duration to passed value
- dataObj = dataObj_;
- remoteLocation = remoteLocation_;
- }
-
- public DataObj getDataObj() {
-
- return dataObj;
- }
-
- public URI getRemoteLocation() {
-
- return remoteLocation;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataObjImpl.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataObjImpl.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataObjImpl.java
deleted file mode 100644
index 89722bb..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataObjImpl.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl.state;
-
-import java.io.File;
-import java.net.URI;
-import java.util.List;
-
-import org.apache.airavata.workflow.tracking.common.DataObj;
-
-public class DataObjImpl implements DataObj {
-
- protected URI dataId;
- protected List<URI> locations;
- protected long sizeInBytes = -1;
-
- public DataObjImpl(URI dataId_, List<URI> location_) {
-
- dataId = dataId_;
- if (dataId == null || dataId.toString().length() == 0)
- throw new RuntimeException("Data ID cannot be NULL or empty");
-
- locations = location_;
- }
-
- public DataObjImpl(URI dataId_, List<URI> location_, long sizeInBytes_) {
-
- this(dataId_, location_);
- sizeInBytes = sizeInBytes_;
- }
-
- public URI getId() {
-
- return dataId;
- }
-
- public URI getLocalLocation() {
-
- return locations != null && locations.size() > 0 ? locations.get(0) : null;
- }
-
- public List<URI> getLocations() {
-
- return locations;
- }
-
- public long getSizeInBytes() {
-
- // skip getting bytes if already calculated or not possible to calculate
- if (sizeInBytes >= 0 || locations == null || locations.size() == 0)
- return sizeInBytes;
-
- // check if the location is a local file. If so, we calculate the size.
- URI location = locations.get(0);
- String scheme = location.getScheme();
- String authority = location.getAuthority();
- if ((scheme == null && authority == null) || "file".equals(scheme)) {
- sizeInBytes = getFileSize(new File(location.getPath()));
- }
- return sizeInBytes;
- }
-
- protected static final long getFileSize(File file) {
- if (file.isDirectory()) {
- return getDirSize(file, 0, true);
- } else {
- return file.length();
- }
- }
-
- private static final long getDirSize(File dir, long size, boolean recurse) {
- File[] files = dir.listFiles();
- if (files == null)
- return size;
- for (int i = 0; i < files.length; i++) {
- if (files[i].isDirectory()) {
- if (recurse)
- size += getDirSize(files[i], size, recurse);
- } else {
- size += files[i].length();
- }
- }
- return size;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DurationImpl.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DurationImpl.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DurationImpl.java
deleted file mode 100644
index c45e533..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DurationImpl.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl.state;
-
-import org.apache.airavata.workflow.tracking.common.DurationObj;
-
-/**
- * Convinience class to record the state of computation related notifications.
- */
-public class DurationImpl implements DurationObj {
-
- protected long startTimeMillis = Long.MAX_VALUE;
- protected long endTimeMillis = Long.MIN_VALUE;
- protected long fixedDuration = Long.MIN_VALUE;
- protected boolean isFixedDuration = false;
-
- public DurationImpl() {
- startTimeMillis = System.currentTimeMillis();
- }
-
- public DurationImpl(long fixedDuration_) {
- isFixedDuration = true;
- fixedDuration = fixedDuration_;
- }
-
- public long markStartTimeMillis() {
-
- this.startTimeMillis = System.currentTimeMillis();
- return startTimeMillis;
- }
-
- public long getStartTimeMillis() {
-
- return startTimeMillis;
- }
-
- public long markEndTimeMillis() {
-
- this.endTimeMillis = System.currentTimeMillis();
- return endTimeMillis;
- }
-
- public long getEndTimeMillis() {
-
- return endTimeMillis;
- }
-
- public long getDurationMillis() {
-
- if (isFixedDuration)
- return fixedDuration;
- return endTimeMillis - startTimeMillis;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationContextImpl.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationContextImpl.java
deleted file mode 100644
index c2bad60..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationContextImpl.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl.state;
-
-import org.apache.airavata.workflow.tracking.common.InvocationContext;
-import org.apache.airavata.workflow.tracking.common.InvocationEntity;
-
-public class InvocationContextImpl extends InvocationEntityImpl implements InvocationContext {
-
- InvocationEntity remoteEntity;
-
- public InvocationContextImpl(InvocationEntity localEntity_, InvocationEntity remoteEntity_) {
- super(localEntity_);
- remoteEntity = remoteEntity_;
- }
-
- public InvocationEntity getRemoteEntity() {
-
- return remoteEntity;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationEntityImpl.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationEntityImpl.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationEntityImpl.java
deleted file mode 100644
index ded569a..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationEntityImpl.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl.state;
-
-import java.net.URI;
-
-import org.apache.airavata.workflow.tracking.common.InvocationEntity;
-import org.apache.airavata.workflow.tracking.common.WorkflowTrackingContext;
-import org.apache.airavata.workflow.tracking.types.BaseIDType;
-
-public class InvocationEntityImpl implements InvocationEntity {
-
- protected URI workflowID;
- protected URI serviceID;
- protected String workflowNodeID;
- protected Integer workflowTimestep;
-
- /**
- * Constructor used when only service ID is available (i.e. entity not in the context of an invocation)
- *
- * @param serviceID_
- * an URI
- *
- */
- public InvocationEntityImpl(URI serviceID_) {
-
- if (serviceID_ == null)
- throw new RuntimeException("ServiceID passed was null!");
-
- serviceID = serviceID_;
-
- workflowID = null;
- workflowNodeID = null;
- workflowTimestep = null;
- }
-
- /**
- * Constructor used when all IDs are potentially available (i.e. entity in the context of an invocation)
- *
- * @param workflowID_
- * an URI
- * @param serviceID_
- * an URI
- * @param workflowNodeID_
- * a String
- * @param workflowTimestep_
- * an int
- *
- */
- public InvocationEntityImpl(URI workflowID_, URI serviceID_, String workflowNodeID_, Integer workflowTimestep_) {
-
- if (serviceID_ == null)
- throw new RuntimeException("ServiceID passed was null!");
-
- workflowID = workflowID_;
- serviceID = serviceID_;
- workflowNodeID = workflowNodeID_;
- workflowTimestep = workflowTimestep_;
- }
-
- /**
- * Copy Constructor
- *
- * @param source
- * an InvocationEntity
- *
- */
- protected InvocationEntityImpl(InvocationEntity source) {
- this(source.getWorkflowID(), source.getServiceID(), source.getWorkflowNodeID(), source.getWorkflowTimestep());
- }
-
- public String getWorkflowNodeID() {
-
- return workflowNodeID;
- }
-
- public URI getServiceID() {
-
- return serviceID;
- }
-
- public Integer getWorkflowTimestep() {
-
- return workflowTimestep;
- }
-
- public URI getWorkflowID() {
-
- return workflowID;
- }
-
- public BaseIDType toBaseIDType() {
-
- BaseIDType baseID = BaseIDType.Factory.newInstance();
- if (serviceID != null)
- baseID.setServiceID(serviceID.toString());
- if (workflowID != null)
- baseID.setWorkflowID(workflowID.toString());
- if (workflowTimestep != null)
- baseID.setWorkflowTimestep(workflowTimestep);
- if (workflowNodeID != null)
- baseID.setWorkflowNodeID(workflowNodeID);
-
- return baseID;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/LeadNotificationHandler.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/LeadNotificationHandler.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/LeadNotificationHandler.java
deleted file mode 100644
index 1ed60a8..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/LeadNotificationHandler.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl.subscription;
-
-import javax.xml.namespace.QName;
-import javax.xml.stream.XMLStreamException;
-
-import org.apache.airavata.workflow.tracking.client.Callback;
-import org.apache.airavata.workflow.tracking.client.NotificationType;
-import org.apache.airavata.workflow.tracking.client.Subscription;
-import org.apache.airavata.workflow.tracking.util.MessageUtil;
-import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
-import org.apache.airavata.wsmg.client.ConsumerServer;
-import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axiom.soap.SOAPHeader;
-import org.apache.xmlbeans.XmlCursor;
-import org.apache.xmlbeans.XmlException;
-import org.apache.xmlbeans.XmlObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility for clients to subscribe and receive Lead notifications using new message schema. The agent implements the
- * LeadNotificationHandler.Callback interface and starts the notification handler with the broker location, the topic,
- * and an option to pull the messages (to get around firewalls) by providing a message box url. The deliverMessage
- * method in the Callback interface in invoked when a message with the nes LEAD message type arrives. If a
- * LeadEvent/NCSAEvent arrives, it is silently dropped after being logged. Check the main() method for sample usage.
- */
-public class LeadNotificationHandler implements ConsumerNotificationHandler {
-
- private static final Logger logger = LoggerFactory.getLogger(LeadNotificationHandler.class);
-
- private String topic;
-
- private String brokerLoc;
-
- private Callback callback;
-
- private int consumerServerPort;
-
- public LeadNotificationHandler(String brokerLoc, String topic, Callback callback, int port) {
- if (port == 0)
- this.consumerServerPort = 2222;
- else
- this.consumerServerPort = port;
- this.brokerLoc = brokerLoc;
- this.topic = topic;
- this.callback = callback;
-
- }
-
- /**
- * NON API Method. Use LeadNotificationManager.CreateSubscription() method to create a subscription
- *
- * @param topic
- * @param callback
- * @return
- * @throws Exception
- */
- public Subscription createSubscription() throws Exception {
- WseMsgBrokerClient wseClient = new WseMsgBrokerClient();
- wseClient.init(brokerLoc);
- logger.debug("Starting Subscription for topic [" + topic + "]at the broker location:" + brokerLoc);
- ConsumerServer xs = new ConsumerServer(consumerServerPort, this);
- xs.start();
- String subscriptionId = wseClient.subscribe(xs.getConsumerServiceEPRs()[0], topic, null);
- logger.debug("The consumer server started on EPR" + xs.getConsumerServiceEPRs()[0]);
- Subscription subscription = new Subscription(xs, subscriptionId, topic, callback, brokerLoc);
- return subscription;
- }
-
- /**
- * NONAPI method Method handleNotification. Called by the message broker when a message arrives at the subscribed
- * topic. Should NOT be called locally. This method will call the Callback interface's deliverMessage when a valid
- * Lead Message is received.
- *
- * @param messageBody
- * the soap message body containing the notification message
- *
- */
-
- public void handleNotification(SOAPEnvelope envelope) {
- OMElement messageContent = envelope.getBody().getFirstElement();
- SOAPHeader soapHeader = envelope.getHeader();
- OMElement topicEl = soapHeader.getFirstChildWithName(new QName(null, "Topic"));
- XmlObject messageObj = null;
-
- if (topicEl != null) {
- if (topicEl.getChildElements().hasNext()) {
- OMElement widgetTopicOMEl = (OMElement) topicEl.getChildElements().next();
- String widgetTopicString = null;
- try {
- widgetTopicString = widgetTopicOMEl.toStringWithConsume();
- } catch (XMLStreamException e) {
- logger.error(e.getMessage(), e);
- }
- String[] topicSubstrings = widgetTopicString.split(":");
- if (topicSubstrings.length > 1) {
- topic = topicSubstrings[1];
- }
- }
- }
-
- if (topic != null) {
- try {
- try {
- messageObj = XmlObject.Factory.parse(messageContent.toStringWithConsume());
- } catch (XMLStreamException e) {
- logger.error("error parsing message content: " + messageContent, e);
- }
- XmlCursor xc = messageObj.newCursor();
- xc.toNextToken();
-
- xc.dispose();
- } catch (XmlException e) {
- logger.error("error parsing message content: " + messageContent, e);
- }
- NotificationType type = MessageUtil.getType(messageObj);
- this.callback.deliverMessage(topic, type, messageObj);
-
- } else {
- logger.warn("Notification came without a Notification Topic:" + envelope);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.java
deleted file mode 100644
index e452467..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl.subscription;
-
-import java.rmi.RemoteException;
-
-import org.apache.airavata.workflow.tracking.WorkflowTrackingException;
-import org.apache.airavata.workflow.tracking.client.Callback;
-import org.apache.airavata.workflow.tracking.client.NotificationType;
-import org.apache.airavata.workflow.tracking.client.Subscription;
-import org.apache.airavata.workflow.tracking.util.MessageUtil;
-import org.apache.airavata.wsmg.client.MsgBrokerClientException;
-import org.apache.airavata.wsmg.client.NotificationHandler;
-import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
-import org.apache.airavata.wsmg.client.msgbox.MessagePuller;
-import org.apache.airavata.wsmg.client.msgbox.MsgboxHandler;
-import org.apache.airavata.wsmg.commons.MsgBoxQNameConstants;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.addressing.EndpointReferenceHelper;
-import org.apache.xmlbeans.XmlCursor;
-import org.apache.xmlbeans.XmlException;
-import org.apache.xmlbeans.XmlObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MessageBoxNotificationHandler implements NotificationHandler {
-
- private static final Logger logger = LoggerFactory.getLogger(MessageBoxNotificationHandler.class);
-
- private String messageBoxUrl;
-
- private String brokerURL;
-
- private String subscriptionId;
-
- private MessagePuller messagePuller;
-
- private Callback callback;
-
- private String topic;
-
- public MessageBoxNotificationHandler(String messageBoxUrl, String brokerURL) {
- if (messageBoxUrl == null || "".equals(messageBoxUrl)) {
- logger.error("Invalid messagebox Location :" + messageBoxUrl);
- throw new WorkflowTrackingException("BrokerLocation should be not null messaboxUrl:" + messageBoxUrl);
- }
-
- if (brokerURL == null || "".equals(brokerURL)) {
- logger.error("Invalid broker Location :" + brokerURL);
- throw new WorkflowTrackingException("BrokerLocation should be not null brokerurl:" + brokerURL);
- }
- this.messageBoxUrl = messageBoxUrl;
- this.brokerURL = brokerURL;
-
- }
-
- public void handleNotification(String message) {
- XmlObject messageObj = null;
-
- try {
- messageObj = XmlObject.Factory.parse(message);
- XmlCursor xc = messageObj.newCursor();
- xc.toNextToken();
-
- xc.dispose();
- } catch (XmlException e) {
- logger.error("error parsing message content: " + message, e);
- }
- NotificationType type = MessageUtil.getType(messageObj);
- this.callback.deliverMessage(this.topic, type, messageObj);
-
- }
-
- public void destroy(EndpointReference msgBoxEpr) throws RemoteException {
- if (this.messagePuller != null) {
- messagePuller.stopPulling();
-
- if (logger.isDebugEnabled())
- logger.debug("\n\nStopping the Messagebox for topic" + this.topic);
- }
-
- try {
- WseMsgBrokerClient client = new WseMsgBrokerClient();
- client.init(this.brokerURL.toString());
- client.unSubscribe(this.subscriptionId);
-
- MsgboxHandler msgboxHandler = new MsgboxHandler();
-
- logger.debug("Unsubscribing the messagebox that was destroyed," + " SubscriptionID:" + this.subscriptionId);
-
- msgboxHandler.deleteMsgBox(msgBoxEpr, 2000L);
-
- } catch (MsgBrokerClientException e) {
- logger.error("unable to unsubscribe", e);
- }
-
- }
-
- public Subscription renewMessageboxSubscription(String epr, String subscriptionId, String topic, String xpath,
- boolean subscribePermanatly) throws MsgBrokerClientException {
-
- this.subscriptionId = subscriptionId;
- this.topic = topic;
- WseMsgBrokerClient wseClient = new WseMsgBrokerClient();
- EndpointReference endpointReference = null;
- try {
- endpointReference = EndpointReferenceHelper.fromString(epr);
- } catch (AxisFault f) {
- throw new MsgBrokerClientException("unable to convert end point reference", f);
- }
- subscriptionId = subscribeToBroker(endpointReference.getAddress(), topic, xpath, wseClient, subscribePermanatly);
- Subscription subscription = new Subscription(this, subscriptionId, topic, callback, this.brokerURL);
- subscription.setMessageBoxEpr(endpointReference);
- return subscription;
- }
-
- public Subscription renewMessageboxSubscription(EndpointReference endpointReference, String subscriptionId,
- String topic, String xpath, boolean subscribePermanatly) throws MsgBrokerClientException {
-
- this.subscriptionId = subscriptionId;
- this.topic = topic;
- WseMsgBrokerClient wseClient = new WseMsgBrokerClient();
- logger.debug("\n\nCreate Subscription for topic" + topic + " [Messagebox]\n\n");
-
- subscriptionId = subscribeToBroker(endpointReference.getAddress(), topic, xpath, wseClient, subscribePermanatly);
- Subscription subscription = new Subscription(this, subscriptionId, topic, callback, this.brokerURL);
- subscription.setMessageBoxEpr(endpointReference);
- return subscription;
- }
-
- public Subscription startListeningToPreviousMessageBox(EndpointReference msgBoxAddr, String subscriptionId,
- String topic, String xpath, Callback callback, boolean subscribePermanatly) throws MsgBrokerClientException {
- this.callback = callback;
- this.subscriptionId = subscriptionId;
- this.topic = topic;
- WseMsgBrokerClient wseClient = new WseMsgBrokerClient();
- MsgboxHandler msgboxHandler = new MsgboxHandler();
-
- messagePuller = msgboxHandler.startPullingFromExistingMsgBox(msgBoxAddr, this, 500L, 1000L);
- if (logger.isDebugEnabled())
- logger.debug("\n\nCreate Subscription for topic" + topic + " [Messagebox]\n\n");
- String msgBoxEventSink = msgBoxAddr.getAddress();
-
- String formattedEventSink = null;
-
- if (msgBoxEventSink.contains("clientid")) {
- formattedEventSink = msgBoxEventSink;
- } else {
- if (msgBoxAddr.getAllReferenceParameters() == null)
- throw new MsgBrokerClientException("Invalid Message Box EPR, no reference parameters found");
- String msgBoxId = msgBoxAddr.getAllReferenceParameters().get(MsgBoxQNameConstants.MSG_BOXID_QNAME)
- .getText();
- if (msgBoxId == null)
- throw new MsgBrokerClientException("Invalid Message Box EPR, reference parameter MsgBoxAddr is missing");
- String format = msgBoxEventSink.endsWith("/") ? "%sclientid/%s" : "%s/clientid/%s";
-
- formattedEventSink = String.format(format, msgBoxEventSink, msgBoxId);
-
- }
-
- subscriptionId = subscribeToBroker(formattedEventSink, topic, xpath, wseClient, subscribePermanatly);
- Subscription subscription = new Subscription(this, subscriptionId, topic, callback, this.brokerURL);
- subscription.setMessageBoxEpr(msgBoxAddr);
- return subscription;
-
- }
-
- private String subscribeToBroker(String eventSink, String topic, String xpath, WseMsgBrokerClient wseClient,
- boolean subscribePermanatly) throws MsgBrokerClientException {
- String subId = null;
-
- wseClient.init(brokerURL);
- if (subscribePermanatly) {
-
- subId = wseClient.subscribe(new EndpointReference(eventSink), topic, xpath, -1);
- } else {
- subId = wseClient.subscribe(eventSink, topic, xpath);
- }
- return subId;
- }
-
- private String subToBrokerWithMsgBoxSink(EndpointReference msgBoxEpr, String topic, String xpath,
- WseMsgBrokerClient wseClient, boolean subscribePermanatly) throws MsgBrokerClientException {
- String subId;
- wseClient.init(brokerURL);
-
- if (subscribePermanatly) {
-
- subId = wseClient.subscribeMsgBox(msgBoxEpr, topic, xpath, -1);
- } else {
- subId = wseClient.subscribeMsgBox(msgBoxEpr, topic, xpath,
- WsmgCommonConstants.DEFAULT_SUBSCRIPTION_EXPIRATION_TIME);
- }
- return subId;
- }
-
- public Subscription createSubscription(String topic, String xpath, Callback callback, boolean subscribePermananly)
- throws Exception {
- this.topic = topic;
- this.callback = callback;
-
- WseMsgBrokerClient wseClient = new WseMsgBrokerClient();
- MsgboxHandler msgboxHandler = new MsgboxHandler();
- EndpointReference msgBoxAddr = msgboxHandler.createPullMsgBox(this.messageBoxUrl, 12000l);
-
- String messageBoxAddress = msgBoxAddr.getAddress();
- if (logger.isDebugEnabled())
- logger.debug("\n\nCreated Messagebox at address :" + messageBoxAddress);
-
- subscriptionId = subToBrokerWithMsgBoxSink(msgBoxAddr, topic, xpath, wseClient, subscribePermananly);
- messagePuller = msgboxHandler.startPullingEventsFromMsgBox(msgBoxAddr, this, 1500L, 30000l);
- if (logger.isDebugEnabled())
- logger.debug("\n\nCreate Subscription for topic" + topic + " [Messagebox]\n\n");
-
- Subscription subscription = new Subscription(this, subscriptionId, topic, callback, this.brokerURL);
- subscription.setMessageBoxEpr(msgBoxAddr);
- subscription.setBrokerURL(this.brokerURL);
- return subscription;
- }
-
- public Subscription createMsgBoxSubscription(String topic2, String xpath, Callback callback2,
- boolean subscribePermanatly) throws MsgBrokerClientException {
-
- this.topic = topic2;
- this.callback = callback2;
-
- WseMsgBrokerClient wseClient = new WseMsgBrokerClient();
- MsgboxHandler msgboxHandler = new MsgboxHandler();
- EndpointReference msgBoxAddr = msgboxHandler.createPullMsgBox(this.messageBoxUrl, 12000l);
- if (logger.isDebugEnabled())
- logger.debug("\n\nCreated Messagebox at address :" + msgBoxAddr.getAddress());
-
- subscriptionId = subToBrokerWithMsgBoxSink(msgBoxAddr, topic, xpath, wseClient, subscribePermanatly);
- messagePuller = msgboxHandler.startPullingEventsFromMsgBox(msgBoxAddr, this, 500L, 30000l);
- if (logger.isDebugEnabled())
- logger.debug("\n\nCreate Subscription for topic" + topic + " [Messagebox]\n\n");
- Subscription subscription = new Subscription(this, subscriptionId, topic, callback, this.brokerURL);
- subscription.setMessageBoxEpr(msgBoxAddr);
- subscription.setBrokerURL(this.brokerURL);
- return subscription;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/listener/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/listener/CallbackHandler.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/listener/CallbackHandler.java
deleted file mode 100644
index ef58654..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/listener/CallbackHandler.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Lanka Software Foundation (LSF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The LSF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.workflow.tracking.samples.listener;
-
-import java.rmi.RemoteException;
-
-import org.apache.airavata.workflow.tracking.client.NotificationType;
-import org.apache.airavata.workflow.tracking.client.Subscription;
-import org.apache.xmlbeans.XmlObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CallbackHandler implements org.apache.airavata.workflow.tracking.client.Callback {
-
- private Subscription subscription;
- private static final Logger log = LoggerFactory.getLogger(CallbackHandler.class);
-
-
- /*
- * This methods will be callbacked when the particular subcription receives a notification (non-Javadoc)
- *
- * @see org.apache.airavata.workflow.tracking.client.Callback#deliverMessage(java.lang.String,
- * org.apache.airavata.workflow.tracking.client.NotificationType, org.apache.xmlbeans.XmlObject)
- */
- public void deliverMessage(String topic, NotificationType notificationType, XmlObject messageObj) {
- System.out.println("Received a notification of type[" + notificationType + "] for the topic[" + topic);
- System.out.println("The notification message is:");
- System.out.println(messageObj.toString());
- if (subscription != null && Listener.finalNotification.equals(messageObj.toString())) {
- try {
- subscription.destroy();
- } catch (RemoteException e) {
- log.error(e.getMessage(), e);
- }
- System.out.println("Ending the subscription and exiting");
- System.exit(0);
- }
- }
-
- public void setSubscription(Subscription subscription) {
- this.subscription = subscription;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/listener/Listener.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/listener/Listener.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/listener/Listener.java
deleted file mode 100644
index 4e519c6..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/listener/Listener.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.samples.listener;
-
-import java.io.StringReader;
-
-import org.apache.airavata.commons.WorkFlowUtils;
-import org.apache.airavata.workflow.tracking.client.LeadNotificationManager;
-import org.apache.airavata.workflow.tracking.client.Subscription;
-import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
-import org.apache.axiom.om.OMAbstractFactory;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axis2.addressing.EndpointReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Listener {
-
- private static OMFactory factory = OMAbstractFactory.getOMFactory();
- private static final Logger log = LoggerFactory.getLogger(Listener.class);
-
- public static final String finalNotification = "<end>This is the last Notification. end your subscription when you receive this</end>";
-
- /**
- * @param args
- */
- public static void main(String[] args) {
-
- String brokerLocation = args.length > 0 ? args[0] : "localhost:8080/axis2/services/EventingService";
- String topic = "pickTheTOpicThatWorkflowPublishTheEventsFrom";
-
- Subscription subscription = null;
- // Create a sbscription
- try {
- // create a callback
- CallbackHandler callback = new CallbackHandler();
- // create the subscription
- subscription = LeadNotificationManager.createSubscription(brokerLocation, topic, callback, 2222);
- // set the subscription in the callback so we could destroy the
- // subscription within the callback
- callback.setSubscription(subscription);
- } catch (Exception e) {
- // Falied to create subscription
- log.error("Failed to create Subscription", e);
- // do what you want to do instead of rethrowing. e.g. like retrying
- throw new RuntimeException(e);
- }
-
- // Subscription is created and now we listen. Now the workflow should
- // publish notification with
- // that particular topic.
- // Inthis sample we emulate it by manually publishing notifications
-
- // created a publisher
-
- WseMsgBrokerClient publisher = new WseMsgBrokerClient();
- EndpointReference endpointRef = publisher.createEndpointReference(brokerLocation, topic);
- publisher.init(endpointRef.getAddress());
-
- try {
-
- OMElement finalNotificationEl = WorkFlowUtils.reader2OMElement(new StringReader(finalNotification));
-
- OMElement testNotification = factory.createOMElement("Test", null);
-
- testNotification.setText("test event for workflow tracking sample");
-
- publisher.publish(null, testNotification);
- publisher.publish(null, finalNotificationEl);
- } catch (Exception e) {
- log.error("Failed to publish", e);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/simple_listener/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/simple_listener/CallbackHandler.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/simple_listener/CallbackHandler.java
deleted file mode 100644
index 3d63843..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/simple_listener/CallbackHandler.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.samples.simple_listener;
-
-import org.apache.airavata.workflow.tracking.client.NotificationType;
-import org.apache.xmlbeans.XmlObject;
-
-public class CallbackHandler implements org.apache.airavata.workflow.tracking.client.Callback {
-
- /*
- * This methods will be callbacked when the particular subcription receives a notification (non-Javadoc)
- *
- * @see org.apache.airavata.workflow.tracking.client.Callback#deliverMessage(java.lang.String,
- * org.apache.airavata.workflow.tracking.client.NotificationType, org.apache.xmlbeans.XmlObject)
- */
- public void deliverMessage(String topic, NotificationType notificationType, XmlObject messageObj) {
- System.out.println("Received a notification of type[" + notificationType + "] for the topic[" + topic);
- System.out.println("The notification message is:");
- System.out.println(messageObj.toString());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/simple_listener/SimpleListener.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/simple_listener/SimpleListener.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/simple_listener/SimpleListener.java
deleted file mode 100644
index e876559..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/samples/simple_listener/SimpleListener.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.samples.simple_listener;
-
-import java.rmi.RemoteException;
-
-import org.apache.airavata.workflow.tracking.client.LeadNotificationManager;
-import org.apache.airavata.workflow.tracking.client.Subscription;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SimpleListener {
-
- private static final Logger log = LoggerFactory.getLogger(SimpleListener.class);
- /**
- * @param args
- * @throws RemoteException
- */
- public static void main(String[] args) throws RemoteException {
-
- String brokerLocation = args.length > 0 ? args[0] : "http://localhost:8080/axis2/services/EventingService";
- // "rainier.extreme.indiana.edu:12346";
- String topic = "pickTheTOpicThatWorkflowPublishTheEventsFrom";
-
- System.out.println(LeadNotificationManager.getBrokerPublishEPR(brokerLocation, topic));
- Subscription subscription = null;
- // Create a sbscription
- try {
- subscription = LeadNotificationManager.createSubscription(brokerLocation, topic,
- new org.apache.airavata.workflow.tracking.samples.simple_listener.CallbackHandler(), 2222);
- } catch (Exception e) {
- // Falied to create subscription
- log.error("Failed to publish", e);
- // do what you want to do instead of rethrowing. e.g. like retrying
- throw new RuntimeException(e);
- }
-
- System.out.println(subscription.getBrokerPublishEPR());
- subscription.destroy();
- System.out.println("Subscription cleared");
- System.exit(0);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/ActivityTime.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/ActivityTime.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/ActivityTime.java
deleted file mode 100644
index 734108b..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/ActivityTime.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.util;
-
-import java.util.Date;
-
-public class ActivityTime implements Comparable {
-
- private Date clockTime;
- private int logicalTime;
-
- public ActivityTime(int logicalTime_, Date clockTime_) {
- logicalTime = logicalTime_;
- clockTime = clockTime_;
- }
-
- public Date getClockTime() {
- return clockTime;
- }
-
- public int getLogicalTime() {
- return logicalTime;
- }
-
- public int compareTo(Object o) {
- if (o == null)
- throw new NullPointerException();
- if (!ActivityTime.class.isAssignableFrom(o.getClass())) {
- throw new ClassCastException("cannot assign " + o.getClass() + " to " + ActivityTime.class);
- }
- // start comparison
- ActivityTime other = (ActivityTime) o;
- // compare logical time first if they are both positive
- if (this.logicalTime >= 0 && other.logicalTime >= 0) {
- if (this.logicalTime > other.logicalTime)
- return +1;
- if (this.logicalTime < other.logicalTime)
- return -1;
- assert this.logicalTime == other.logicalTime;
- }
- // both logical times are equal or not set
- // compare wallclock time
- return this.clockTime.compareTo(other.clockTime);
- }
-
- @Override
- public boolean equals(Object o) {
- if (o != null) {
- return compareTo(o) == 0;
- } else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return "WF Timestep[" + logicalTime + "] Timestamp[" + clockTime + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/LinkedMessageQueue.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/LinkedMessageQueue.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/LinkedMessageQueue.java
deleted file mode 100644
index d25196f..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/LinkedMessageQueue.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.util;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * similar to linkedblocking queue but Non-concurrent version. can have only one thread putting elements into queue and
- * another thread getting elements from queue. has added method get that does a block-peek that is missing in
- * linkedblocking queue. implemented using a linked list.
- */
-public class LinkedMessageQueue<E> implements Iterable {
-
- private final LinkedList<E> list;
- private final AtomicInteger count;
- private final int capacity;
-
- private final Object takeLock = new Object();
- private final Object putLock = new Object();
-
- private boolean canStop = false;
-
- public LinkedMessageQueue() {
- this(Integer.MAX_VALUE); // default capacity is MAX_INT
- }
-
- public LinkedMessageQueue(int maxSize) {
- list = new LinkedList<E>();
- count = new AtomicInteger(0);
- capacity = maxSize;
- }
-
- /*** add, offer, and put are called by application thread adding to notification queue ***/
-
- /** add to tail of queue if not full; throw exceptionif unable to add */
- public final void add(E entry) {
-
- if (count.get() >= capacity) {
- throw new IllegalStateException("Cannot add element. queue is full.");
- }
-
- list.add(entry);
- count.incrementAndGet();
-
- synchronized (putLock) {
- putLock.notify();
- }
- }
-
- /** add to tail of queue if possible; return false if unable to add */
- public final boolean offer(E entry) {
-
- if (count.get() >= capacity) {
- return false;
- }
-
- list.add(entry);
- count.incrementAndGet();
-
- synchronized (putLock) {
- putLock.notify();
- }
- return true;
- }
-
- /** add to tail of queue, blocking if necessary */
- public final void put(E entry) throws InterruptedException {
-
- if (count.get() >= capacity) { // do initial check before checking & waiting
- synchronized (takeLock) {
- while (count.get() >= capacity) {
- takeLock.wait();
- }
- }
- }
-
- list.add(entry);
- count.incrementAndGet();
-
- synchronized (putLock) {
- putLock.notify();
- }
- }
-
- /*** poll, get, peek, and take are called by publisher thread removing from notification queue ***/
-
- /** return & remove head of queue; do not block & return null if none available */
- public final E poll() {
-
- if (count.get() <= 0)
- return null;
-
- count.decrementAndGet();
- E entry = list.removeFirst();
-
- synchronized (takeLock) {
- takeLock.notify();
- }
- return entry;
- }
-
- /** return (but dont remove) head of queue; block if empty */
- public final E get() throws InterruptedException {
-
- if (count.get() <= 0) { // do initial check before checking & waiting
- while (count.get() <= 0 && !canStop) {
- synchronized (putLock) {
- putLock.wait(1);
- }
- return null;
- }
- }
-
- return list.getFirst();
- }
-
- /** return (but dont remove) head of queue; return null if empty */
- public final E peek() {
-
- if (count.get() <= 0)
- return null;
- else
- return list.getFirst();
- }
-
- /** return & remove head of queue; block if empty */
- public final E take() throws InterruptedException {
-
- if (count.get() <= 0) { // do initial check before checking & waiting
- synchronized (putLock) {
- while (count.get() <= 0) {
- putLock.wait();
- }
- }
- }
-
- count.decrementAndGet();
- final E entry = list.removeFirst();
-
- synchronized (takeLock) {
- takeLock.notify();
- }
- return entry;
- }
-
- /** return number of entries in queue */
- public final int size() {
-
- return count.get();
- }
-
- public final Iterator<E> iterator() {
-
- return list.iterator();
- }
-
- public void setCanStop(boolean canStop) {
- this.canStop = canStop;
- }
-}