You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/01/27 20:27:15 UTC
[3/5] airavata git commit: retiring workflow tracking schema -
AIRAVATA-1557
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/common/NotifierException.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/common/NotifierException.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/common/NotifierException.java
deleted file mode 100644
index a0e194c..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/common/NotifierException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.common;
-
-public class NotifierException extends RuntimeException {
- public NotifierException(String msg) {
- super(msg);
- }
-
- public NotifierException(Throwable ex) {
- super(ex);
- }
-
- public NotifierException(String msg, Throwable ex) {
- super(msg, ex);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/common/NotifierVersion.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/common/NotifierVersion.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/common/NotifierVersion.java
deleted file mode 100644
index 51ca5f6..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/common/NotifierVersion.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.common;
-
-/**
- * One place to put and check required version number.
- */
-public class NotifierVersion {
-
- private final static String TYPES_VERSION = "2.6";
- private final static String IMPL_VERSION = "2.8.0";
-
- public static String getTypesVersion() {
- return TYPES_VERSION;
- }
-
- public static String getImplVersion() {
- return IMPL_VERSION;
- }
-
- /**
- * Print version when exxecuted from command line.
- */
- public static void main(String[] args) {
- String IMPL_OPT = "-impl";
- String TYPE_OPT = "-types";
- if (IMPL_OPT.equals(args[0])) {
- System.out.println(IMPL_VERSION);
- } else if (TYPE_OPT.equals(args[0])) {
- System.out.println(TYPES_VERSION);
- } else {
- System.out.println(NotifierVersion.class.getName() + " Error: " + TYPE_OPT + " or " + IMPL_OPT
- + " is required");
- System.exit(-1);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/common/WorkflowTrackingContext.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/common/WorkflowTrackingContext.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/common/WorkflowTrackingContext.java
deleted file mode 100644
index bf35162..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/common/WorkflowTrackingContext.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.common;
-
-import java.util.Properties;
-
-import org.apache.axis2.addressing.EndpointReference;
-
-public class WorkflowTrackingContext {
- private EndpointReference brokerEpr;
- private String globalAnnotations;
- private InvocationEntity myself;
- private String publisherImpl;
- private boolean enableAsyncPublishing;
- private String topic;
-
- public void setGlobalAnnotations(Properties globalAnnotations) {
- StringBuffer buf = new StringBuffer();
- if (globalAnnotations != null) {
- for (Object key : globalAnnotations.keySet()) {
- buf.append("<").append(key).append(">").append(globalAnnotations.get(key)).append("</").append(key)
- .append(">");
- }
- }
- this.globalAnnotations = buf.toString();
- }
-
- public String getGlobalAnnotations() {
- return globalAnnotations;
- }
-
- public InvocationEntity getMyself() {
- return myself;
- }
-
- public void setMyself(InvocationEntity myself) {
- this.myself = myself;
- }
-
- public EndpointReference getBrokerEpr() {
- return brokerEpr;
- }
-
- public void setBrokerEpr(EndpointReference brokerEpr) {
- this.brokerEpr = brokerEpr;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public String getPublisherImpl() {
- return publisherImpl;
- }
-
- public void setPublisherImpl(String publisherImpl) {
- this.publisherImpl = publisherImpl;
- }
-
- public boolean isEnableAsyncPublishing() {
- return enableAsyncPublishing;
- }
-
- public void setEnableAsyncPublishing(boolean enableAsyncPublishing) {
- this.enableAsyncPublishing = enableAsyncPublishing;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/GenericNotifierImpl.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/GenericNotifierImpl.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/GenericNotifierImpl.java
deleted file mode 100644
index 478b826..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/GenericNotifierImpl.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl;
-
-import org.apache.airavata.workflow.tracking.AbstractNotifier;
-import org.apache.airavata.workflow.tracking.GenericNotifier;
-import org.apache.airavata.workflow.tracking.WorkflowTrackingException;
-import org.apache.airavata.workflow.tracking.common.InvocationContext;
-import org.apache.airavata.workflow.tracking.common.InvocationEntity;
-import org.apache.airavata.workflow.tracking.common.WorkflowTrackingContext;
-import org.apache.airavata.workflow.tracking.impl.state.InvocationContextImpl;
-import org.apache.airavata.workflow.tracking.types.BaseNotificationType;
-import org.apache.airavata.workflow.tracking.types.LogDebugDocument;
-import org.apache.airavata.workflow.tracking.types.LogExceptionDocument;
-import org.apache.airavata.workflow.tracking.types.LogInfoDocument;
-import org.apache.airavata.workflow.tracking.types.LogWarningDocument;
-import org.apache.airavata.workflow.tracking.types.PublishURLDocument;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * DOES NOT SUPPORT MULTI_THREADING -- PUBLISHER QUEUE, DATA CONSUMED/PRODUCED BATCHING
- *
- * The constructor of this class uses the following properties from CONSTS: BROKER_URL, TOPIC, WORKFLOW_ID, NODE_ID,
- * TIMESTEP, SERVICE_ID, ASYNC_PUB_MODE
- */
-public class GenericNotifierImpl extends AbstractNotifier implements GenericNotifier {
-
- // private AnnotationProps globalAnnotations;
-
- protected static final Logger logger = LoggerFactory.getLogger(GenericNotifierImpl.class);
-
- public GenericNotifierImpl() throws WorkflowTrackingException {
- super();
- }
-
- public InvocationContext createInitialContext(WorkflowTrackingContext context) {
- if (context.getMyself() == null) {
- throw new RuntimeException("Local entity passed to createInitialContext was NULL");
- }
- return new InvocationContextImpl(context.getMyself(), null);
- }
-
- public InvocationContext createInvocationContext(WorkflowTrackingContext context, InvocationEntity remoteEntity) {
-
- if (context.getMyself() == null) {
- throw new RuntimeException("Local entity passed to createInitialContext was NULL");
- }
-
- if (remoteEntity == null) {
- throw new RuntimeException("Remote entity passed to createInitialContext was NULL");
- }
-
- return new InvocationContextImpl(context.getMyself(), remoteEntity);
- }
-
- public void debug(WorkflowTrackingContext context, String... descriptionAndAnnotation) {
- LogDebugDocument logMsg = LogDebugDocument.Factory.newInstance();
-
- // add timestamp and notification source; add description, and
- // annotation if present
- sendNotification(context, logMsg, descriptionAndAnnotation, null);
- }
-
- public void exception(WorkflowTrackingContext context, String... descriptionAndAnnotation) {
- LogExceptionDocument logMsg = LogExceptionDocument.Factory.newInstance();
- BaseNotificationType log = logMsg.addNewLogException();
- sendNotification(context, logMsg, descriptionAndAnnotation, null);
- }
-
- public void info(WorkflowTrackingContext context, String... descriptionAndAnnotation) {
- LogInfoDocument logMsg = LogInfoDocument.Factory.newInstance();
- BaseNotificationType log = logMsg.addNewLogInfo();
- // add timestamp and notification source; add description, and
- // annotation if present
- // publish activity
- sendNotification(context, logMsg, descriptionAndAnnotation, null);
- }
-
- public void publishURL(WorkflowTrackingContext context, String title, String url,
- String... descriptionAndAnnotation) {
- PublishURLDocument pubMsg = PublishURLDocument.Factory.newInstance();
- PublishURLDocument.PublishURL pub = pubMsg.addNewPublishURL();
- pub.setTitle(title);
- pub.setLocation(url);
- sendNotification(context, pubMsg, descriptionAndAnnotation, null);
- }
-
- public void warning(WorkflowTrackingContext context, String... descriptionAndAnnotation) {
- LogWarningDocument logMsg = LogWarningDocument.Factory.newInstance();
- BaseNotificationType log = logMsg.addNewLogWarning();
- sendNotification(context, logMsg, descriptionAndAnnotation, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/NotifierImpl.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/NotifierImpl.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/NotifierImpl.java
deleted file mode 100644
index f7c3c4a..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/NotifierImpl.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.airavata.commons.LeadCrosscutParametersUtil;
-import org.apache.airavata.workflow.tracking.Notifier;
-import org.apache.airavata.workflow.tracking.common.DataDurationObj;
-import org.apache.airavata.workflow.tracking.common.DataObj;
-import org.apache.airavata.workflow.tracking.common.DurationObj;
-import org.apache.airavata.workflow.tracking.common.WorkflowTrackingContext;
-import org.apache.airavata.workflow.tracking.impl.state.DataDurationImpl;
-import org.apache.airavata.workflow.tracking.impl.state.DataObjImpl;
-import org.apache.airavata.workflow.tracking.impl.state.DurationImpl;
-import org.apache.airavata.workflow.tracking.types.*;
-
-/**
- * DOES NOT SUPPORT MULTI_THREADING -- PUBLISHER QUEUE, DATA CONSUMED/PRODUCED BATCHING * Utility to create and send
- * Lead notification messages for an application (script/web service). Since it extends WorkflowNotifierImpl, it can
- * also send workflow related notifications.
- *
- * The constructor of this class uses the following properties from CONSTS: BROKER_URL, TOPIC, WORKFLOW_ID, NODE_ID,
- * TIMESTEP, SERVICE_ID, SERVICE_WSDL, IN_XML_MESSAGE, NAME_RESOLVER_URL, FILE_ACCESS_PROTOCOL, DISABLE_NAME_RESOLVER,
- * BATCH_PROVENANCE_MSGS, ASYNC_PUB_MODE
- *
- */
-public class NotifierImpl extends ProvenanceNotifierImpl implements Notifier {
-
- private static final String WFT_NS = "http://lead.extreme.indiana.edu/namespaces/2006/06/workflow_tracking";
-
- private static final HashMap<String, String> NS_MAP = new HashMap<String, String>();
- static {
- NS_MAP.put("", WFT_NS);
- }
-
- // public NotifierImpl(ConstructorProps props) throws XMLStreamException, IOException {
- // super( props);
- // }
-
- /**
- * @param batchProvMessages
- * whether provenance messages should be batched and sent as one message
- * @param publisher
- * a NotificationPublisher used to send the notifications
- *
- */
- public NotifierImpl() {
- }
-
- // //////////////////////////////////////////////////////////////////////////////////////////////
- //
- // AUDIT NOTIFIER
- //
- // //////////////////////////////////////////////////////////////////////////////////////////////
-
- /**
- * {@inheritDoc}
- *
- */
- public void resourceMapping(WorkflowTrackingContext context, String mappedResource, int retryStatusCount,
- String... descriptionAndAnnotation) {
-
- ResourceMappingDocument mapMsg = ResourceMappingDocument.Factory.newInstance();
- ResourceMappingType map = mapMsg.addNewResourceMapping();
- map.setMappedResource(mappedResource);
- map.setRetryStatusCount(retryStatusCount);
-
- sendNotification(context, mapMsg, descriptionAndAnnotation, "[Resource mapping done for" + mappedResource + "]");
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void jobStatus(WorkflowTrackingContext context, String status, int retryCount,
- String... descriptionAndAnnotation) {
-
- JobStatusDocument jobMsg = JobStatusDocument.Factory.newInstance();
- JobStatusType job = jobMsg.addNewJobStatus();
- job.setJobStatus(status);
- job.setRetryCount(retryCount);
-
- sendNotification(context, jobMsg, descriptionAndAnnotation, "[Job status is " + status + "]");
- }
-
- // //////////////////////////////////////////////////////////////////////////////////////////////
- //
- // AUDIT NOTIFIER
- //
- // //////////////////////////////////////////////////////////////////////////////////////////////
-
- /**
- * {@inheritDoc}
- *
- */
- public void appAudit(WorkflowTrackingContext context, String name, URI jobHandle, String host, String queueName,
- String jobId, String dName, String projectId, String rsl, String... descriptionAndAnnotation) {
- final ApplicationAuditDocument appAuditMsg = ApplicationAuditDocument.Factory.newInstance();
- final ApplicationAuditType appAudit = appAuditMsg.addNewApplicationAudit();
- appAudit.setJobHandle(jobHandle.toString());
- appAudit.setName(name);
- appAudit.setHost(host);
- appAudit.setQueueName(queueName); // queueName is an optional element
- appAudit.setJobId(jobId); // jobId is an optional element
- appAudit.setDistinguishedName(dName);
- appAudit.setProjectId(projectId); // projectId is an optional element
- appAudit.setRsl(rsl);
-
- sendNotification(context, appAuditMsg, descriptionAndAnnotation, "[Audit msg for '" + name + "' at host "
- + host + " for DN " + dName + "]" // default
- );
- }
-
- // //////////////////////////////////////////////////////////////////////////////////////////////
- //
- // PERFORMANCE NOTIFIER
- //
- // //////////////////////////////////////////////////////////////////////////////////////////////
- /**
- * {@inheritDoc}
- *
- */
- public DurationObj computationStarted() {
-
- return new DurationImpl();
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public DurationObj computationFinished(WorkflowTrackingContext context, DurationObj compObj,
- String... descriptionAndAnnotation) {
-
- if (context == null)
- throw new RuntimeException("Local entity passed was NULL.");
- if (compObj == null)
- throw new RuntimeException("Comp duration object passed was NULL.");
-
- // mark computation end
- compObj.markEndTimeMillis();
-
- // create activity
- ComputationDurationDocument activity = ComputationDurationDocument.Factory.newInstance();
- ComputationDurationDocument.ComputationDuration activityType = activity.addNewComputationDuration();
-
- activityType.setDurationInMillis(compObj.getDurationMillis());
-
- sendNotification(context, activity, descriptionAndAnnotation,
- "[Computation Time taken = " + compObj.getDurationMillis() + " ms]");
-
- return compObj;
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public DurationObj computationDuration(WorkflowTrackingContext context, long durationMillis,
- String... descriptionAndAnnotation) {
-
- DurationObj compObj = new DurationImpl(durationMillis);
- return computationFinished(context, compObj, descriptionAndAnnotation);
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public DataDurationObj dataSendStarted(DataObj dataObj, URI remoteLocation) {
-
- return new DataDurationImpl(dataObj, remoteLocation);
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public DataDurationObj dataSendFinished(WorkflowTrackingContext context, DataDurationObj dataDurationObj,
- String... descriptionAndAnnotation) {
-
- if (context == null)
- throw new RuntimeException("Local entity passed was NULL.");
- if (dataDurationObj == null)
- throw new RuntimeException("Data duration object passed was NULL.");
-
- DataObj dataObj = null;
- if ((dataObj = dataDurationObj.getDataObj()) == null)
- throw new RuntimeException("Data duration object's DataObje was NULL.");
- if (dataObj.getId() == null)
- throw new RuntimeException("Data object's ID was NULL.");
- if (dataObj.getLocalLocation() == null)
- throw new RuntimeException("Local file URL passed in DataDurationObj.getDataObj was NULL.");
- if (dataDurationObj.getRemoteLocation() == null)
- throw new RuntimeException("Remote file URL passed in DataDurationObj was NULL.");
-
- // mark computation end
- dataDurationObj.markEndTimeMillis();
-
- // create activity
- DataSendDurationDocument activity = DataSendDurationDocument.Factory.newInstance();
- DataTransferDurationType activityType = activity.addNewDataSendDuration();
-
- activityType.setId(dataObj.getId().toString());
- activityType.setDurationInMillis(dataDurationObj.getDurationMillis());
- activityType.setSizeInBytes(dataObj.getSizeInBytes());
-
- activityType.setSource(dataObj.getLocalLocation().toString());
-
- activityType.setTarget(dataDurationObj.getRemoteLocation().toString());
-
- sendNotification(context, activity, descriptionAndAnnotation, "[Data at " + dataObj.getLocalLocation()
- + " was sent to " + dataDurationObj.getRemoteLocation() + "]");
-
- return dataDurationObj;
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public DataDurationObj dataSendDuration(WorkflowTrackingContext context, URI dataID, URI localLocation,
- URI remoteLocation, int sizeInBytes, long durationMillis, String... descriptionAndAnnotation) {
-
- List<URI> locations = new ArrayList<URI>(2);
- locations.add(localLocation);
- locations.add(remoteLocation);
-
- DataObj dataObj = new DataObjImpl(dataID, locations, sizeInBytes);
- DataDurationObj dataDurationObj = new DataDurationImpl(dataObj, remoteLocation, durationMillis);
-
- return dataSendFinished(context, dataDurationObj, descriptionAndAnnotation);
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public DataDurationObj dataReceiveStarted(URI dataID, URI remoteLocation, URI localLocation) {
-
- List<URI> locations = new ArrayList<URI>(2);
- locations.add(localLocation);
- locations.add(remoteLocation);
-
- DataObj dataObj = new DataObjImpl(dataID, locations);
- DataDurationObj dataDurationObj = new DataDurationImpl(dataObj, remoteLocation);
-
- return dataDurationObj;
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public DataDurationObj dataReceiveFinished(WorkflowTrackingContext context, DataDurationObj dataDurationObj,
- String... descriptionAndAnnotation) {
-
- if (context == null)
- throw new RuntimeException("Local entity passed was NULL.");
- if (dataDurationObj == null)
- throw new RuntimeException("Data duration object passed was NULL.");
-
- DataObj dataObj = null;
- if ((dataObj = dataDurationObj.getDataObj()) == null)
- throw new RuntimeException("Data duration object's DataObj was NULL.");
- if (dataObj.getId() == null)
- throw new RuntimeException("Data object's ID was NULL.");
- if (dataObj.getLocalLocation() == null)
- throw new RuntimeException("Local file URL passed in DataDurationObj.getDataObj was NULL.");
- if (dataDurationObj.getRemoteLocation() == null)
- throw new RuntimeException("Remote file URL passed in DataDurationObj was NULL.");
-
- // mark computation end
- dataDurationObj.markEndTimeMillis();
-
- // create activity
- DataReceiveDurationDocument activity = DataReceiveDurationDocument.Factory.newInstance();
- DataTransferDurationType activityType = activity.addNewDataReceiveDuration();
-
- activityType.setId(dataObj.getId().toString());
- activityType.setDurationInMillis(dataDurationObj.getDurationMillis());
- activityType.setSizeInBytes(dataObj.getSizeInBytes());
-
- activityType.setSource(dataObj.getLocalLocation().toString());
-
- activityType.setTarget(dataDurationObj.getRemoteLocation().toString());
-
- sendNotification(context, activity, descriptionAndAnnotation,
- "[Data from " + dataDurationObj.getRemoteLocation() + " was received at " + dataObj.getLocalLocation()
- + "]");
-
- return dataDurationObj;
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public DataDurationObj dataReceiveDuration(WorkflowTrackingContext context, URI dataID, URI remoteLocation,
- URI localLocation, int sizeInBytes, long durationMillis, String... descriptionAndAnnotation) {
-
- List<URI> locations = new ArrayList<URI>(2);
- locations.add(localLocation);
- locations.add(remoteLocation);
-
- DataObj dataObj = new DataObjImpl(dataID, locations, sizeInBytes);
- DataDurationObj dataDurationObj = new DataDurationImpl(dataObj, remoteLocation, durationMillis);
-
- return dataReceiveFinished(context, dataDurationObj, descriptionAndAnnotation);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/ProvenanceNotifierImpl.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/ProvenanceNotifierImpl.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/ProvenanceNotifierImpl.java
deleted file mode 100644
index d118345..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/ProvenanceNotifierImpl.java
+++ /dev/null
@@ -1,824 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.net.URI;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.List;
-
-import org.apache.airavata.workflow.tracking.ProvenanceNotifier;
-import org.apache.airavata.workflow.tracking.common.DataObj;
-import org.apache.airavata.workflow.tracking.common.InvocationContext;
-import org.apache.airavata.workflow.tracking.common.InvocationEntity;
-import org.apache.airavata.workflow.tracking.common.WorkflowTrackingContext;
-import org.apache.airavata.workflow.tracking.impl.state.DataObjImpl;
-import org.apache.airavata.workflow.tracking.impl.state.InvocationContextImpl;
-import org.apache.airavata.workflow.tracking.impl.state.InvocationEntityImpl;
-import org.apache.airavata.workflow.tracking.types.AcknowledgeFailureType;
-import org.apache.airavata.workflow.tracking.types.AcknowledgeSuccessType;
-import org.apache.airavata.workflow.tracking.types.BaseNotificationType;
-import org.apache.airavata.workflow.tracking.types.DataConsumedDocument;
-import org.apache.airavata.workflow.tracking.types.DataProducedDocument;
-import org.apache.airavata.workflow.tracking.types.DataProductNotificationType;
-import org.apache.airavata.workflow.tracking.types.DataProductType;
-import org.apache.airavata.workflow.tracking.types.FaultMessageType;
-import org.apache.airavata.workflow.tracking.types.FaultReceiverType;
-import org.apache.airavata.workflow.tracking.types.FaultResponderType;
-import org.apache.airavata.workflow.tracking.types.InvocationMessageType;
-import org.apache.airavata.workflow.tracking.types.InvokingServiceDocument;
-import org.apache.airavata.workflow.tracking.types.InvokingServiceFailedDocument;
-import org.apache.airavata.workflow.tracking.types.InvokingServiceSucceededDocument;
-import org.apache.airavata.workflow.tracking.types.ReceivedFaultDocument;
-import org.apache.airavata.workflow.tracking.types.ReceivedResultDocument;
-import org.apache.airavata.workflow.tracking.types.RequestInitiatorType;
-import org.apache.airavata.workflow.tracking.types.RequestReceiverType;
-import org.apache.airavata.workflow.tracking.types.ResultReceiverType;
-import org.apache.airavata.workflow.tracking.types.ResultResponderType;
-import org.apache.airavata.workflow.tracking.types.SendingFaultDocument;
-import org.apache.airavata.workflow.tracking.types.SendingResponseFailedDocument;
-import org.apache.airavata.workflow.tracking.types.SendingResponseSucceededDocument;
-import org.apache.airavata.workflow.tracking.types.SendingResultDocument;
-import org.apache.airavata.workflow.tracking.types.ServiceInitializedDocument;
-import org.apache.airavata.workflow.tracking.types.ServiceInvokedDocument;
-import org.apache.airavata.workflow.tracking.types.ServiceTerminatedDocument;
-import org.apache.airavata.workflow.tracking.types.WorkflowInitializedDocument;
-import org.apache.airavata.workflow.tracking.types.WorkflowInvokedDocument;
-import org.apache.airavata.workflow.tracking.types.WorkflowTerminatedDocument;
-import org.apache.xmlbeans.XmlObject;
-import org.apache.xmlbeans.XmlString;
-
-/**
- * DOES NOT SUPPORT MULTI_THREADING -- PUBLISHER QUEUE, DATA CONSUMED/PRODUCED BATCHING
- *
- * Utility to create and send Lead notification messages using new notification schema from a Workflow Engine
- *
- * The constructor of this class uses the following properties from CONSTS: BROKER_URL, TOPIC, WORKFLOW_ID, NODE_ID,
- * TIMESTEP, SERVICE_ID, ASYNC_PUB_MODE
- */
-public class ProvenanceNotifierImpl extends GenericNotifierImpl implements ProvenanceNotifier {
-
- private DataConsumedDocument dataConsumedBatchActivity;
- private DataProducedDocument dataProducedBatchActivity;
-
- // public ProvenanceNotifierImpl(ConstructorProps props) throws XMLStreamException, IOException {
- // super(props);
- // DATA_BATCHED = Boolean.parseBoolean((String)props.get(ENABLE_BATCH_PROVENANCE));
- // }
-
- public ProvenanceNotifierImpl() {
- super();
- }
-
- /**
- * this method allows us to override the default timestamp with a user supplied one
- *
- * @param msg
- * a BaseNotificationType
- * @param entity
- * an InvocationEntity
- *
- */
- // @Override
- // protected void setIDAndTimestamp(WorkfloBaseNotificationType msg, InvocationEntity entity) {
- // if(activityTimestamp == null)
- // super.setIDAndTimestamp(msg, entity);
- // else
- // super.setIDAndTimestamp(msg, entity, activityTimestamp);
- // }
-
- // protected void setIDAndTimestamp(BaseNotificationType msg, URI serviceID) {
- // setIDAndTimestamp(msg, createEntity(serviceID));
- // }
-
- protected InvocationEntity createEntity(URI serviceID) {
-
- return new InvocationEntityImpl(serviceID);
- }
-
- // /////////////////////////////////////////////////////////////////////////////
- // //
- // WORKFLOW NOTIFIER //
- // //
- // /////////////////////////////////////////////////////////////////////////////
-
- /**
- * {@inheritDoc}
- *
- */
- public void workflowInitialized(WorkflowTrackingContext context, URI serviceID, String... descriptionAndAnnotation) {
- WorkflowInitializedDocument activity = WorkflowInitializedDocument.Factory.newInstance();
- activity.addNewWorkflowInitialized();
- // add timestamp and notification source; add description, and annotation if present
- sendNotification(context, activity, descriptionAndAnnotation, "[Workflow is initialized; ready to be invoked]");
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void workflowTerminated(WorkflowTrackingContext context, URI serviceID, String... descriptionAndAnnotation) {
- WorkflowTerminatedDocument activity = WorkflowTerminatedDocument.Factory.newInstance();
- BaseNotificationType activityType = activity.addNewWorkflowTerminated();
- sendNotification(context, activity, descriptionAndAnnotation,
- "[Workflow is terminated; cannot be invoked anymore]");
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public InvocationContext workflowInvoked(WorkflowTrackingContext context, InvocationEntity initiator,
- String... descriptionAndAnnotation) {
- return workflowInvoked(context, initiator, null, null, descriptionAndAnnotation);
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public InvocationContext workflowInvoked(WorkflowTrackingContext context, InvocationEntity initiator,
- XmlObject header, XmlObject body, String... descriptionAndAnnotation) {
-
- WorkflowInvokedDocument activity = WorkflowInvokedDocument.Factory.newInstance();
- RequestReceiverType activityType = activity.addNewWorkflowInvoked();
-
- // create the invocation context; set the initiator to the remote entity
- InvocationContextImpl invocationContext = new InvocationContextImpl(context.getMyself(), initiator);
- if (initiator != null) {
- activityType.addNewInitiator().set(initiator.toBaseIDType());
- } else {
- logger.warn("Possible Error in context that was passed. "
- + "There was no remote invoker defined for workflow invoked (initiator=NULL)");
- }
-
- // add header and body fields
- if (header != null || body != null) {
- InvocationMessageType request = activityType.addNewRequest();
- if (header != null)
- request.addNewHeader().set(header);
- if (body != null)
- request.addNewBody().set(body);
- }
- sendNotification(context, activity, descriptionAndAnnotation, "[Workflow is invoked]");
- return invocationContext;
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public InvocationContext invokingService(WorkflowTrackingContext context, InvocationEntity receiver,
- String... descriptionAndAnnotation) {
- return invokingService(context, receiver, null, null, descriptionAndAnnotation);
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public InvocationContext invokingService(WorkflowTrackingContext context, InvocationEntity receiver,
- XmlObject header, XmlObject body, String... descriptionAndAnnotation) {
-
- InvokingServiceDocument activity = InvokingServiceDocument.Factory.newInstance();
- RequestInitiatorType activityType = activity.addNewInvokingService();
-
- // create the invocation context; set the receiver to the remote entity
- InvocationContextImpl invocationContext = new InvocationContextImpl(context.getMyself(), receiver);
- activityType.addNewReceiver().set(receiver.toBaseIDType());
-
- // add header and body fields
- if (header != null || body != null) {
- InvocationMessageType request = activityType.addNewRequest();
- if (header != null)
- request.addNewHeader().set(header);
- if (body != null)
- request.addNewBody().set(body);
- }
- sendNotification(context, activity, descriptionAndAnnotation, "[Service is invoked]");
- return invocationContext;
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void invokingServiceSucceeded(WorkflowTrackingContext wtcontext, InvocationContext context,
- String... descriptionAndAnnotation) {
-
- if (context == null)
- throw new RuntimeException("Context passed was NULL.");
-
- InvokingServiceSucceededDocument activity = InvokingServiceSucceededDocument.Factory.newInstance();
- AcknowledgeSuccessType activityType = activity.addNewInvokingServiceSucceeded();
-
- // set the remote entity as receiver
- if (context.getRemoteEntity() != null) {
- activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
- } else {
- logger.warn("Error in context that was passed. "
- + "there was no remote entity defined (requestReceiver=NULL)");
- }
- sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Service finished successfully]");
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void invokingServiceFailed(WorkflowTrackingContext wtcontext, InvocationContext context,
- String... descriptionAndAnnotation) {
-
- invokingServiceFailed(wtcontext, context, null, descriptionAndAnnotation);
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void invokingServiceFailed(WorkflowTrackingContext wtcontext, InvocationContext context, Throwable trace,
- String... descriptionAndAnnotation) {
-
- if (context == null)
- throw new RuntimeException("Context passed was NULL.");
- final InvokingServiceFailedDocument activity = InvokingServiceFailedDocument.Factory.newInstance();
- final AcknowledgeFailureType activityType = activity.addNewInvokingServiceFailed();
-
- // set the remote entity as receiver
- if (context.getRemoteEntity() != null) {
- activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
- } else {
- logger.warn("Error in context that was passed. "
- + "there was no remote entity defined (requestReceiver=NULL)");
- }
-
- // set stack trace if present
- if (trace != null) {
- final StringWriter sw = new StringWriter();
- trace.printStackTrace(new PrintWriter(sw));
-
- XmlString traceXmlStr = XmlString.Factory.newInstance();
- traceXmlStr.setStringValue(sw.toString());
- activityType.addNewFailure().addNewTrace().set(traceXmlStr);
- }
- sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Service failed]");
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void receivedResult(WorkflowTrackingContext wtcontext, InvocationContext context,
- String... descriptionAndAnnotation) {
-
- receivedResult(wtcontext, context, null, null, descriptionAndAnnotation);
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void receivedResult(WorkflowTrackingContext wtcontext, InvocationContext context, XmlObject header,
- XmlObject body, String... descriptionAndAnnotation) {
-
- if (context == null)
- throw new RuntimeException("Context passed was NULL.");
-
- ReceivedResultDocument activity = ReceivedResultDocument.Factory.newInstance();
- ResultReceiverType activityType = activity.addNewReceivedResult();
-
- // set the responder to the remote entity
- if (context.getRemoteEntity() != null) {
- activityType.addNewResponder().set(context.getRemoteEntity().toBaseIDType());
- } else {
- logger.warn("Error in context that was passed. " + "There was no remote entity defined (responder=NULL)");
- }
-
- // add header and body fields
- if (header != null || body != null) {
- InvocationMessageType result = activityType.addNewResult();
- if (header != null)
- result.addNewHeader().set(header);
- if (body != null)
- result.addNewBody().set(body);
- }
- sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Service failed]");
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void receivedFault(WorkflowTrackingContext wtcontext, InvocationContext context,
- String... descriptionAndAnnotation) {
-
- receivedFault(wtcontext, context, null, null, descriptionAndAnnotation);
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void receivedFault(WorkflowTrackingContext wtcontext, InvocationContext context, XmlObject header,
- XmlObject faultBody, String... descriptionAndAnnotation) {
-
- if (context == null)
- throw new RuntimeException("Context passed was NULL.");
-
- ReceivedFaultDocument activity = ReceivedFaultDocument.Factory.newInstance();
- FaultReceiverType activityType = activity.addNewReceivedFault();
-
- // set the responder to the remote entity
- if (context.getRemoteEntity() != null) {
- activityType.addNewResponder().set(context.getRemoteEntity().toBaseIDType());
- } else {
- logger.warn("Error in context that was passed. " + "There was no remote entity defined (responder=NULL)");
- }
-
- sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Fault is received for invocation ]");
- }
-
- // /////////////////////////////////////////////////////////////////////////////
- // //
- // SERVICE NOTIFIER //
- // //
- // /////////////////////////////////////////////////////////////////////////////
- /**
- * {@inheritDoc}
- *
- */
- public void serviceInitialized(WorkflowTrackingContext context, URI serviceID, String... descriptionAndAnnotation) {
-
- ServiceInitializedDocument activity = ServiceInitializedDocument.Factory.newInstance();
- activity.addNewServiceInitialized();
- sendNotification(context, activity, descriptionAndAnnotation, "[Service is initialized; ready to be invoked]");
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void serviceTerminated(WorkflowTrackingContext context, URI serviceID, String... descriptionAndAnnotation) {
-
- ServiceTerminatedDocument activity = ServiceTerminatedDocument.Factory.newInstance();
- activity.addNewServiceTerminated();
- sendNotification(context, activity, descriptionAndAnnotation,
- "[Service is terminated; cannot be invoked anymore]");
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public InvocationContext serviceInvoked(WorkflowTrackingContext context, InvocationEntity initiator,
- String... descriptionAndAnnotation) {
- return serviceInvoked(context, initiator, null, null, descriptionAndAnnotation);
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public InvocationContext serviceInvoked(WorkflowTrackingContext context, InvocationEntity initiator,
- XmlObject header, XmlObject body, String... descriptionAndAnnotation) {
-
- ServiceInvokedDocument activity = ServiceInvokedDocument.Factory.newInstance();
- RequestReceiverType activityType = activity.addNewServiceInvoked();
-
- // create the invocation context; set the initiator to the remote entity
- InvocationContextImpl invocationContext = new InvocationContextImpl(context.getMyself(), initiator);
- if (initiator != null) {
- activityType.addNewInitiator().set(initiator.toBaseIDType());
- } else {
- logger.warn("Possible Error in context that was passed. "
- + "There was no remote invoker defined (initiator=NULL)");
- }
-
- // add header and body fields
- if (header != null || body != null) {
- InvocationMessageType request = activityType.addNewRequest();
- if (header != null)
- request.addNewHeader().set(header);
- if (body != null)
- request.addNewBody().set(body);
- }
-
- sendNotification(context, activity, descriptionAndAnnotation, "[Service is invoked]");
-
- return invocationContext;
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void sendingResult(WorkflowTrackingContext wtcontext, InvocationContext context,
- String... descriptionAndAnnotation) {
-
- sendingResult(wtcontext, context, null, null, descriptionAndAnnotation);
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void sendingResult(WorkflowTrackingContext wtcontext, InvocationContext context, XmlObject header,
- XmlObject body, String... descriptionAndAnnotation) {
-
- if (context == null)
- throw new RuntimeException("Context passed was NULL.");
-
- SendingResultDocument activity = SendingResultDocument.Factory.newInstance();
- ResultResponderType activityType = activity.addNewSendingResult();
-
- // set the receiver to the remote entity
- if (context.getRemoteEntity() != null) {
- activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
- } else {
- logger.warn("Possible Error in context that was passed. "
- + "There was no remote entity defined (responseReceiver=NULL)");
- }
-
- // add header and body fields
- if (header != null || body != null) {
- InvocationMessageType result = activityType.addNewResult();
- if (header != null)
- result.addNewHeader().set(header);
- if (body != null)
- result.addNewBody().set(body);
- }
-
- sendNotification(wtcontext, activity, descriptionAndAnnotation,
- "[Trying to send successful result of invocation]");
- }
-
- /**
- * {@inheritDoc}
- *
- *
- */
- public void sendingFault(WorkflowTrackingContext wtcontext, InvocationContext context,
- String... descriptionAndAnnotation) {
-
- sendingFault(wtcontext, context, null, null, descriptionAndAnnotation);
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void sendingFault(WorkflowTrackingContext wtcontext, InvocationContext context, XmlObject header,
- XmlObject faultBody, String... descriptionAndAnnotation) {
-
- if (context == null)
- throw new RuntimeException("Context passed was NULL.");
-
- SendingFaultDocument activity = SendingFaultDocument.Factory.newInstance();
- FaultResponderType activityType = activity.addNewSendingFault();
-
- // set the receiver to the remote entity
- if (context.getRemoteEntity() != null) {
- activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
- } else {
- logger.warn("Error in context that was passed. "
- + "There was no remote entity defined (responseReceiver=NULL)");
- }
-
- // add header and body fields
- if (header != null || faultBody != null) {
- FaultMessageType result = activityType.addNewFault();
- if (header != null)
- result.addNewHeader().set(header);
- if (faultBody != null)
- result.addNewBody().set(faultBody);
- }
-
- sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Trying to sending fault from invocation]");
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void sendingResponseSucceeded(WorkflowTrackingContext wtcontext, InvocationContext context,
- String... descriptionAndAnnotation) {
-
- if (context == null)
- throw new RuntimeException("Context passed was NULL.");
-
- SendingResponseSucceededDocument activity = SendingResponseSucceededDocument.Factory.newInstance();
- AcknowledgeSuccessType activityType = activity.addNewSendingResponseSucceeded();
-
- // set the remote entity as receiver
- if (context.getRemoteEntity() != null) {
- activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
- } else {
- logger.warn("Error in context that was passed. "
- + "there was no remote entity defined (responseReceiver=NULL)");
- }
-
- sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Successfully sent response of invocation]");
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void sendingResponseFailed(WorkflowTrackingContext wtcontext, InvocationContext context,
- String... descriptionAndAnnotation) {
-
- sendingResponseFailed(wtcontext, context, null, descriptionAndAnnotation);
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public void sendingResponseFailed(WorkflowTrackingContext wtcontext, InvocationContext context, Throwable trace,
- String... descriptionAndAnnotation) {
-
- if (context == null)
- throw new RuntimeException("Context passed was NULL.");
-
- SendingResponseFailedDocument activity = SendingResponseFailedDocument.Factory.newInstance();
- AcknowledgeFailureType activityType = activity.addNewSendingResponseFailed();
-
- // set the remote entity as receiver
- if (context.getRemoteEntity() != null) {
- activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
- } else {
- logger.warn("Error in context that was passed. "
- + "there was no remote entity defined (responseReceiver=NULL)");
- }
-
- // set stack trace if present
- if (trace != null) {
- final StringWriter sw = new StringWriter();
- trace.printStackTrace(new PrintWriter(sw));
-
- XmlString traceXmlStr = XmlString.Factory.newInstance();
- traceXmlStr.setStringValue(sw.toString());
- activityType.addNewFailure().addNewTrace().set(traceXmlStr);
- }
-
- sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Unable to send result of invocation]");
- }
-
- // /////////////////////////////////////////////////////////////////////////////
- // //
- // DATA PROVENANCE //
- // //
- // /////////////////////////////////////////////////////////////////////////////
-
- /**
- * {@inheritDoc}
- *
- */
- public DataObj dataConsumed(WorkflowTrackingContext context, URI dataId, List<URI> locations,
- String... descriptionAndAnnotation) {
-
- DataObj dataObj = new DataObjImpl(dataId, locations);
- return dataConsumed(context, dataObj, descriptionAndAnnotation);
- }
-
- public DataObj dataConsumed(WorkflowTrackingContext context, URI dataId, List<URI> locations, int sizeInBytes,
- String... descriptionAndAnnotation) {
-
- DataObj dataObj = new DataObjImpl(dataId, locations, sizeInBytes);
- return dataConsumed(context, dataObj, descriptionAndAnnotation);
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public DataObj dataConsumed(WorkflowTrackingContext context, DataObj dataObj, String... descriptionAndAnnotation) {
- InvocationEntity entity = context.getMyself();
- if (entity == null)
- throw new RuntimeException("Local entity passed was NULL.");
- if (dataObj == null)
- throw new RuntimeException("Data object passed was NULL.");
- if (dataObj.getId() == null)
- throw new RuntimeException("Data object's ID was NULL.");
-
- DataConsumedDocument activity = DataConsumedDocument.Factory.newInstance();
- DataProductNotificationType activityType = activity.addNewDataConsumed();
-
- // set the data product to the consumed data
- DataProductType dataProduct = activityType.addNewDataProduct();
- // set data ID and size
- dataProduct.setId(dataObj.getId().toString());
- dataProduct.setSizeInBytes(dataObj.getSizeInBytes());
- // set data URLs
- List<URI> locations = dataObj.getLocations();
- for (URI location : locations) {
- dataProduct.addLocation(location.toString());
- }
- // set data timestampp
- final Calendar cal = new GregorianCalendar();
- cal.setTime(activityTimestamp != null ? activityTimestamp : new Date());
- dataProduct.setTimestamp(cal);
-
- sendNotification(context, activity, descriptionAndAnnotation, "[consumed: ID=<" + dataObj.getId().toString()
- + ">; URL=<#" + locations.size() + "><" + (locations.size() > 0 ? locations.get(0) : "") + ">]");
-
- return dataObj;
- }
-
- // /**
- // * Adds the file/directory was used by this invocation to the current dataConsuemd
- // * notification batch. If the notification batch did not exist, it is created. The notification
- // * is not sent until {@link #flush()} is called.
- // *
- // * @param entity identity of the workflow/service's invocation that consumed this file
- // * @param dataObj data object recording the dataId, local/remote URLs, timestamp of
- // * the file/dir, that was returned by another data notification method
- // * @param descriptionAndAnnotation optional vararg. The first element is used as the
- // * human readable description for this notification. The subsequent strings need to be
- // * serialized XML fragments that are added as annotation to the notification.
- // *
- // * @return the data object passed to this method with file/dir size filled in if not
- // * already when passed.
- // *
- // */
- // protected DataObj dataConsumedBatched(WorkflowTrackingContext context, InvocationEntity entity, DataObj dataObj,
- // String...descriptionAndAnnotation) {
- //
- // if(entity == null) throw new RuntimeException("Local entity passed was NULL.");
- // if(dataObj == null) throw new RuntimeException("Data object passed was NULL.");
- // if(dataObj.getId() == null) throw new RuntimeException("Data object's ID was NULL.");
- //
- // if (dataConsumedBatchActivity == null) {
- //
- // // create initial consumed notification container
- // dataConsumedBatchActivity = DataConsumedDocument.Factory.newInstance();
- // DataProductNotificationType activityType = dataConsumedBatchActivity.addNewDataConsumed();
- //
- //
- // }
- //
- // // get existing consumed notification container
- // DataProductNotificationType activityType = dataConsumedBatchActivity.addNewDataConsumed();
- //
- // // add nre data product to the consumed data
- // DataProductType dataProduct = activityType.addNewDataProduct();
- // // set data ID and size
- // dataProduct.setId(dataObj.getId().toString());
- // dataProduct.setSizeInBytes(dataObj.getSizeInBytes());
- // // set data URLs
- // List<URI> locations = dataObj.getLocations();
- // for(URI location : locations){
- // dataProduct.addLocation(location.toString());
- // }
- // // set data timestampp
- // final Calendar cal = new GregorianCalendar();
- // cal.setTime(activityTimestamp != null ? activityTimestamp : new Date());
- // dataProduct.setTimestamp(cal);
- //
- // sendNotification(context, activityType, descriptionAndAnnotation,
- // "[consumed: ID=<" + dataObj.getId().toString() +
- // ">; URL=<#" + locations.size() + "><" +
- // (locations.size() > 0 ? locations.get(0) : "") +
- // ">]"
- // );
- //
- // return dataObj;
- // }
-
- /**
- * {@inheritDoc}
- *
- */
- public DataObj dataProduced(WorkflowTrackingContext context, URI dataId, List<URI> locations,
- String... descriptionAndAnnotation) {
-
- DataObj dataObj = new DataObjImpl(dataId, locations);
- return dataProduced(context, dataObj, descriptionAndAnnotation);
- }
-
- public DataObj dataProduced(WorkflowTrackingContext context, URI dataId, List<URI> locations, int sizeInBytes,
- String... descriptionAndAnnotation) {
-
- DataObj dataObj = new DataObjImpl(dataId, locations, sizeInBytes);
- return dataProduced(context, dataObj, descriptionAndAnnotation);
- }
-
- /**
- * {@inheritDoc}
- *
- */
- public DataObj dataProduced(WorkflowTrackingContext context, DataObj dataObj, String... descriptionAndAnnotation) {
- InvocationEntity entity = context.getMyself();
- if (entity == null)
- throw new RuntimeException("Local entity passed was NULL.");
- if (dataObj == null)
- throw new RuntimeException("Data object passed was NULL.");
- if (dataObj.getId() == null)
- throw new RuntimeException("Data object's ID was NULL.");
-
- DataProducedDocument activity = DataProducedDocument.Factory.newInstance();
- DataProductNotificationType activityType = activity.addNewDataProduced();
-
- // set the data product to the produced data
- DataProductType dataProduct = activityType.addNewDataProduct();
- // set data ID and size
- dataProduct.setId(dataObj.getId().toString());
- dataProduct.setSizeInBytes(dataObj.getSizeInBytes());
- // set data URLs
- List<URI> locations = dataObj.getLocations();
- for (URI location : locations) {
- dataProduct.addLocation(location.toString());
- }
- // set data timestampp
- final Calendar cal = new GregorianCalendar();
- cal.setTime(activityTimestamp != null ? activityTimestamp : new Date());
- dataProduct.setTimestamp(cal);
-
- sendNotification(context, activity, descriptionAndAnnotation, "[produced: ID=<" + dataObj.getId().toString()
- + ">; URL=<#" + locations.size() + "><" + (locations.size() > 0 ? locations.get(0) : "") + ">]");
-
- return dataObj;
- }
-
- /**
- * Adds the file/directory was used by this invocation to the current dataProduced notification batch. If the
- * notification batch did not exist, it is created. The notification is not sent untill {@link #flush()} is called.
- *
- * @param entity
- * identity of the workflow/service's invocation that produced this file
- * @param dataObj
- * data object recording the dataId, local/remote URLs, timestamp of the file/dir, that was returned by
- * another data notification method
- * @param descriptionAndAnnotation
- * optional vararg. The first element is used as the human readable description for this notification.
- * The subsequent strings need to be serialized XML fragments that are added as annotation to the
- * notification.
- *
- * @return the data object passed to this method with file/dir size filled in if not already when passed.
- *
- */
- protected DataObj dataProducedBatched(WorkflowTrackingContext context, InvocationEntity entity, DataObj dataObj,
- String... descriptionAndAnnotation) {
-
- if (entity == null)
- throw new RuntimeException("Local entity passed was NULL.");
- if (dataObj == null)
- throw new RuntimeException("Data object passed was NULL.");
- if (dataObj.getId() == null)
- throw new RuntimeException("Data object's ID was NULL.");
-
- if (dataProducedBatchActivity == null) {
-
- // create initial produced notification container
- dataProducedBatchActivity = DataProducedDocument.Factory.newInstance();
- }
-
- // get existing produced notification container
- DataProductNotificationType activityType = dataProducedBatchActivity.addNewDataProduced();
-
- // add new data product to the produced data
- DataProductType dataProduct = activityType.addNewDataProduct();
- // set data ID and size
- dataProduct.setId(dataObj.getId().toString());
- dataProduct.setSizeInBytes(dataObj.getSizeInBytes());
- // set data URLs
- List<URI> locations = dataObj.getLocations();
- for (URI location : locations) {
- dataProduct.addLocation(location.toString());
- }
- // set data timestamp
- final Calendar cal = new GregorianCalendar();
- cal.setTime(activityTimestamp != null ? activityTimestamp : new Date());
- dataProduct.setTimestamp(cal);
-
- // add description, and annotation to DATA PRODUCT if present
- sendNotification(context, dataProducedBatchActivity, descriptionAndAnnotation, "[produced: ID=<"
- + dataObj.getId().toString() + ">; URL=<#" + locations.size() + "><"
- + (locations.size() > 0 ? locations.get(0) : "") + ">]");
-
- return dataObj;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java
deleted file mode 100644
index f1abcc2..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl.publish;
-
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.airavata.workflow.tracking.util.LinkedMessageQueue;
-import org.apache.airavata.workflow.tracking.util.Timer;
-import org.apache.xmlbeans.XmlObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Abstract method to publish messages in sync or async mode. In async mode, the messages are kept in an in-memory queue
- * and published. Calling flush() blocks till all messages are sent and the queue is empty. In sync mode, the call
- * blocks till the message is transmitted.
- */
-public abstract class AbstractPublisher implements Runnable, NotificationPublisher {
-
- protected static final Logger logger = LoggerFactory.getLogger(AbstractPublisher.class);
- protected static final boolean IS_LOG_FINEST = logger.isDebugEnabled();
- private final LinkedMessageQueue<BrokerEntry> messageQueue;
- protected static final boolean IS_TIMER = Boolean.getBoolean("ENABLE_TIMER");
- protected static final Timer notifTP = Timer.init("PubNotif");
-
- private boolean finished = false;
- private final Lock LOCK = new ReentrantLock();
- private final Condition CONDITION = LOCK.newCondition();
- private static int PUB_ID = 0;
-
- private final boolean IS_DEFAULT_MODE_ASYNC;
-
- private boolean deleted = false;
-
- private boolean deleteNow = false;
-
- private final Thread pubThread;
-
- protected AbstractPublisher(int capacity, boolean defaultAsync) {
-
- messageQueue = new LinkedMessageQueue<BrokerEntry>(capacity);
- IS_DEFAULT_MODE_ASYNC = defaultAsync;
- deleted = false;
- pubThread = new Thread(this, "PUBLISHER #" + PUB_ID++);
- pubThread.setDaemon(true);
- pubThread.start();
- }
-
- // public abstract void publishSync(String leadMessage);
- public final void delete() {
- deleted = true;
- messageQueue.setCanStop(true);
- deleteNow = true;
- }
-
- public final boolean isDeleted() {
- return deleted;
- }
-
- public final void publish(String leadMessage) {
-
- if (IS_DEFAULT_MODE_ASYNC) {
- publishAsync(leadMessage);
- } else {
- publishSync(leadMessage);
- }
- }
-
- public final void publish(XmlObject xmlMessage) {
-
- if (IS_DEFAULT_MODE_ASYNC) {
- publishAsync(xmlMessage);
- } else {
- publishSync(xmlMessage.xmlText());
- }
- }
-
- public final void publishAsync(String leadMessage) {
-
- if (IS_LOG_FINEST) {
- logger.debug("ASYNC: adding to queue, notification: " + leadMessage);
- }
- final BrokerEntry brokerEntry = new BrokerEntry(leadMessage);
- try {
- messageQueue.put(brokerEntry);
- } catch (InterruptedException e) {
- throw new RuntimeException("Publisher interrupted. Is it being deleted!?");
- }
- }
-
- public final void publishAsync(XmlObject xmlMessage) {
-
- if (IS_LOG_FINEST) {
- logger.debug("ASYNC: adding to queue, notification: " + xmlMessage);
- }
-
- final BrokerEntry brokerEntry = new BrokerEntry(xmlMessage);
- try {
- messageQueue.put(brokerEntry);
- } catch (InterruptedException e) {
- throw new RuntimeException("Publisher interrupted. Is it being deleted!?");
- }
- }
-
- public final void publishSync(XmlObject xmlMessage) {
-
- if (IS_LOG_FINEST) {
- logger.debug("SYNC: sending notification: " + xmlMessage);
- }
- publishSync(xmlMessage.xmlText());
- }
-
- public final void flush() {
-
- finished = true;
- LOCK.lock();
- while (messageQueue.size() > 0) {
- try {
- // wait to be signalled that all messages were sent...
- CONDITION.await();
- } catch (InterruptedException e) {
- throw new RuntimeException("Publisher interrupted. Is it being deleted!?");
- }
- }
- finished = false;
- CONDITION.signal(); // send ack...
- LOCK.unlock();
- return;
- }
-
- public final void run() {
-
- BrokerEntry brokerEntry = null;
- while (deleteNow) {
-
- try {
- // get the head from queue, but dont remove it yet
- // block for message to arrive only if not finished;
- // if finished, dont block...just quit
- brokerEntry = finished ? messageQueue.peek() : messageQueue.get();
- if(deleteNow){
- break;
- }
- if (brokerEntry == null) {
-
- // the queue has been flushed
- if (finished) {
- LOCK.lock();
- CONDITION.signal(); // signal flushed queue...
- try {
- CONDITION.await(); // and wait for ack.
- } catch (InterruptedException e) {
- throw e;
- }
- LOCK.unlock();
- } else { /* ignore...this should not happen */
- }
-
- // go back to to start and wait for new message in flushed queue...
- continue;
-
- } else {
-
- if (IS_LOG_FINEST) {
- logger.debug("ASYNC: sending notification: " + brokerEntry.getMessage());
- }
-
- // publish message
- publishSync(brokerEntry.getMessage());
-
- // remove the published head from queue
- messageQueue.poll();
- }
-
- } catch (InterruptedException e) {
- if (deleted)
- break;
- else
- logger.error("Interrupted when queue size: " + messageQueue.size() + ". deleted == false", e);
- } catch (RuntimeException e) {
-
- logger.error("Runtime Error: " + e.getMessage());
- if (logger.isDebugEnabled()) {
- logger.debug("Runtime Error at message: "
- + (brokerEntry != null ? brokerEntry.getMessage() : "NULL") + "; queue size: "
- + messageQueue.size(), e);
- }
- // fixme: we should remove the entry from queue if it cannot be sent...
- // otherwise, if broker is down, this may cause an infinite loop!!!
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/BrokerEntry.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/BrokerEntry.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/BrokerEntry.java
deleted file mode 100644
index 9966fb9..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/BrokerEntry.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl.publish;
-
-import org.apache.xmlbeans.XmlObject;
-
-final class BrokerEntry {
-
- public BrokerEntry(String message) {
- this.message = message;
- this.xml = null;
- }
-
- public BrokerEntry(XmlObject xml) {
- this.xml = xml;
- this.message = null;
- }
-
- public final String getMessage() {
- if (message == null)
- return xml.xmlText();
- else
- return message;
- }
-
- public final XmlObject getXmlObject() {
- return xml;
- }
-
- private final String message;
- private final XmlObject xml;
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/LoopbackPublisher.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/LoopbackPublisher.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/LoopbackPublisher.java
deleted file mode 100644
index cbe61c1..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/LoopbackPublisher.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl.publish;
-
-import java.io.PrintStream;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.GregorianCalendar;
-
-import org.apache.airavata.workflow.tracking.client.Callback;
-import org.apache.airavata.workflow.tracking.client.NotificationType;
-import org.apache.airavata.workflow.tracking.common.ConstructorConsts;
-import org.apache.airavata.workflow.tracking.common.ConstructorProps;
-import org.apache.airavata.workflow.tracking.types.BaseIDType;
-import org.apache.airavata.workflow.tracking.types.BaseNotificationType;
-import org.apache.airavata.workflow.tracking.types.LogInfoDocument;
-import org.apache.airavata.workflow.tracking.types.PublishURLDocument;
-import org.apache.airavata.workflow.tracking.util.MessageUtil;
-import org.apache.xmlbeans.XmlException;
-import org.apache.xmlbeans.XmlObject;
-
-/**
- * acts as a workflow tracking NotificationPublisher that calls a workflow tracking callback listener without having to
- * pass though a real notification broker. Default listener prints to stdout
- */
-public class LoopbackPublisher extends AbstractPublisher implements NotificationPublisher {
-
- private Callback listener;
- private String topic;
- private static int globalCount = 0;
-
- public LoopbackPublisher(Callback listener_, String topic_) {
- super(10, false); // capacity, async
- topic = topic_;
- listener = listener_;
- if (listener == null) {
- listener = new Callback() {
- int count = 0;
-
- public void deliverMessage(String topic, NotificationType notificationType, XmlObject messageObj) {
-
- System.out
- .printf("----\nReceived Message [L:%d/G:%d] on topic [%s] of type [%s] with payload:\n[%s]\n====\n",
- count, globalCount, topic, notificationType, messageObj);
- count++;
- globalCount++;
- }
- };
- }
- }
-
- public LoopbackPublisher(final PrintStream out_, String topic_) {
- this(new Callback() {
- int count = 0;
-
- public void deliverMessage(String topic, NotificationType notificationType, XmlObject messageObj) {
-
- out_.printf(
- "----\nReceived Message [L:%d/G:%d] on topic [%s] of type [%s] with payload:\n[%s]\n====\n",
- count, globalCount, topic, notificationType, messageObj);
- count++;
- globalCount++;
- }
- }, topic_);
- }
-
- public LoopbackPublisher(String topic_) {
- this(System.out, topic_);
- }
-
- public LoopbackPublisher(ConstructorProps props) {
- this((Callback) props.get(ConstructorConsts.CALLBACK_LISTENER), (String) props.get(ConstructorConsts.TOPIC));
- }
-
- /**
- * Method publishSync
- *
- * @param message
- * a String message that should be a valid XML String (serialized XML document)
- *
- */
- public void publishSync(String message) {
-
- try {
- XmlObject xmlMessage = XmlObject.Factory.parse(message);
- NotificationType type = MessageUtil.getType(xmlMessage);
- listener.deliverMessage(topic, type, xmlMessage);
- } catch (XmlException e) {
- logger.error("Error parsing workflow tracking message : [" + message + "]\n" + "as an XML Object", e);
- }
- }
-
- public static void main(String args[]) {
-
- LoopbackPublisher publisher = new LoopbackPublisher(new Callback() {
- int count = 0;
-
- public void deliverMessage(String topic, NotificationType notificationType, XmlObject messageObj) {
-
- System.out.printf("----\nReceived Message [%d] on topic [%s] of type [%s] with payload:\n[%s]\n====\n",
- count++, topic, notificationType, messageObj);
- }
- }, "testTopic");
-
- // create & publish log message
- {
- LogInfoDocument logMsg = LogInfoDocument.Factory.newInstance();
- BaseNotificationType log = logMsg.addNewLogInfo();
- // add timestamp
- final Calendar cal = new GregorianCalendar();
- cal.setTime(new Date());
- log.setTimestamp(cal);
- // add notification source
- BaseIDType baseID = BaseIDType.Factory.newInstance();
- baseID.setServiceID("http://tempuri.org/test_service");
- log.addNewNotificationSource().set(baseID);
- // add description
- log.setDescription("A test message");
-
- // publish message as XML Object
- publisher.publish(logMsg);
- }
- // create & publish publishURl message
- {
- // create publish URL message
- PublishURLDocument pubMsg = PublishURLDocument.Factory.newInstance();
- PublishURLDocument.PublishURL pub = pubMsg.addNewPublishURL();
- // add timestamp
- final Calendar cal = new GregorianCalendar();
- cal.setTime(new Date());
- pub.setTimestamp(cal);
- // add notification source
- BaseIDType baseID = BaseIDType.Factory.newInstance();
- baseID.setServiceID("http://tempuri.org/test_service");
- pub.addNewNotificationSource().set(baseID);
- pub.setTitle("Some URL's Title");
- pub.setLocation("http://tempuri.org/published_url");
-
- // publish message as XML string
- publisher.publish(pubMsg.xmlText());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/NotificationPublisher.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/NotificationPublisher.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/NotificationPublisher.java
deleted file mode 100644
index e41f275..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/NotificationPublisher.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl.publish;
-
-import org.apache.xmlbeans.XmlObject;
-
-public interface NotificationPublisher {
-
- public void publish(String leadMessage);
-
- public void publish(XmlObject xmlMessage);
-
- public void publishSync(String leadMessage);
-
- public void publishSync(XmlObject xmlMessage);
-
- public void publishAsync(String leadMessage);
-
- public void publishAsync(XmlObject xmlMessage);
-
- public void flush();
-
- public void delete();
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/42f77edb/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/WSMPublisher.java
----------------------------------------------------------------------
diff --git a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/WSMPublisher.java b/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/WSMPublisher.java
deleted file mode 100644
index a8cffb2..0000000
--- a/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/WSMPublisher.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.tracking.impl.publish;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.Properties;
-
-import javax.xml.stream.XMLStreamException;
-
-import org.apache.airavata.commons.WorkFlowUtils;
-import org.apache.airavata.workflow.tracking.WorkflowTrackingException;
-import org.apache.airavata.workflow.tracking.common.WorkflowTrackingContext;
-import org.apache.airavata.wsmg.client.MsgBrokerClientException;
-import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.addressing.EndpointReferenceHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Publish WS-Eventing messages using WS-Messenger client API
- *
- */
-public class WSMPublisher extends AbstractPublisher implements NotificationPublisher {
-
- protected final WseMsgBrokerClient broker;
- protected final EndpointReference brokerEpr;
- protected Properties configs = new Properties();
-
- private Logger log = LoggerFactory.getLogger(WSMPublisher.class);
-
- public WSMPublisher(WorkflowTrackingContext context) {
- this(10, context.isEnableAsyncPublishing(), context.getBrokerEpr());
- }
-
- public WSMPublisher(int capacity, boolean defaultAsync, String brokerLoc, String topic) throws IOException {
- super(capacity, defaultAsync);
- broker = new WseMsgBrokerClient();
- brokerEpr = broker.createEndpointReference(brokerLoc, topic);
- broker.init(brokerEpr.getAddress());
- }
-
- public WSMPublisher(int capacity, boolean defaultAsync, EndpointReference brokerEpr_)
- throws WorkflowTrackingException {
- super(capacity, defaultAsync);
- try {
- brokerEpr = brokerEpr_;
- broker = new WseMsgBrokerClient();
- broker.init(brokerEpr_.getAddress());
- } catch (Exception e) {
- throw new WorkflowTrackingException(e);
- }
- }
-
- public WSMPublisher(int capacity, boolean defaultAsync, String brokerEpr_) throws IOException {
-
- this(capacity, defaultAsync, brokerEpr_, false);
-
- }
-
- public WSMPublisher(int capacity, boolean defaultAsync, String brokerEpr_, boolean isXmlEpr) throws IOException {
- super(capacity, defaultAsync);
- if (!isXmlEpr) {
- brokerEpr = new EndpointReference(brokerEpr_);// EndpointReferenceHelper.fro(brokerEpr_);
-
- } else {
- brokerEpr = EndpointReferenceHelper.fromString(brokerEpr_);
- }
-
- broker = new WseMsgBrokerClient();
- broker.init(brokerEpr.getAddress());
- }
-
- /**
- * Method publishSync
- *
- * @param leadMessage
- * a String
- *
- */
- public void publishSync(String leadMessage) {
- if (isDeleted())
- throw new RuntimeException("Publisher has been deleted!");
- if (IS_LOG_FINEST) {
- logger.debug("publishing notification to messenger broker: " + leadMessage);
- }
- try {
- OMElement msg = WorkFlowUtils.reader2OMElement(new StringReader(leadMessage));
- broker.publish(null, msg);
-
- } catch (MsgBrokerClientException e) {
- log.error("unablet to publish the lead message", e);
- } catch (XMLStreamException e) {
- log.error("unable to parse the load message - " + leadMessage, e);
- }
- }
-
-}