You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2011/07/03 20:33:03 UTC

svn commit: r1142475 [4/7] - in /incubator/airavata/ws-messaging/trunk/workflow-tracking: ./ .settings/ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/airavata/ src/main/java/org/apache/airavata/comm...

Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/ProvenanceNotifierImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/ProvenanceNotifierImpl.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/ProvenanceNotifierImpl.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/ProvenanceNotifierImpl.java Sun Jul  3 18:32:59 2011
@@ -0,0 +1,795 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.URI;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+import org.apache.airavata.workflow.tracking.ProvenanceNotifier;
+import org.apache.airavata.workflow.tracking.common.DataObj;
+import org.apache.airavata.workflow.tracking.common.InvocationContext;
+import org.apache.airavata.workflow.tracking.common.InvocationEntity;
+import org.apache.airavata.workflow.tracking.common.WorkflowTrackingContext;
+import org.apache.airavata.workflow.tracking.impl.state.DataObjImpl;
+import org.apache.airavata.workflow.tracking.impl.state.InvocationContextImpl;
+import org.apache.airavata.workflow.tracking.impl.state.InvocationEntityImpl;
+import org.apache.xmlbeans.XmlObject;
+import org.apache.xmlbeans.XmlString;
+
+/**
+ * DOES NOT SUPPORT MULTI_THREADING -- PUBLISHER QUEUE, DATA CONSUMED/PRODUCED BATCHING
+ * 
+ * Utility to create and send Lead notification messages using new notification schema from a Workflow Engine
+ * 
+ * The constructor of this class uses the following properties from CONSTS: BROKER_URL, TOPIC, WORKFLOW_ID, NODE_ID,
+ * TIMESTEP, SERVICE_ID, ASYNC_PUB_MODE
+ */
+public class ProvenanceNotifierImpl extends GenericNotifierImpl implements ProvenanceNotifier {
+
+    private DataConsumedDocument dataConsumedBatchActivity;
+    private DataProducedDocument dataProducedBatchActivity;
+
+    // public ProvenanceNotifierImpl(ConstructorProps props) throws XMLStreamException, IOException {
+    // super(props);
+    // DATA_BATCHED = Boolean.parseBoolean((String)props.get(ENABLE_BATCH_PROVENANCE));
+    // }
+
+    public ProvenanceNotifierImpl() {
+        super();
+    }
+
+    /**
+     * this method allows us to override the default timestamp with a user supplied one
+     * 
+     * @param msg
+     *            a BaseNotificationType
+     * @param entity
+     *            an InvocationEntity
+     * 
+     */
+    // @Override
+    // protected void setIDAndTimestamp(WorkfloBaseNotificationType msg, InvocationEntity entity) {
+    // if(activityTimestamp == null)
+    // super.setIDAndTimestamp(msg, entity);
+    // else
+    // super.setIDAndTimestamp(msg, entity, activityTimestamp);
+    // }
+
+    // protected void setIDAndTimestamp(BaseNotificationType msg, URI serviceID) {
+    // setIDAndTimestamp(msg, createEntity(serviceID));
+    // }
+
+    protected InvocationEntity createEntity(URI serviceID) {
+
+        return new InvocationEntityImpl(serviceID);
+    }
+
+    // /////////////////////////////////////////////////////////////////////////////
+    // //
+    // WORKFLOW NOTIFIER //
+    // //
+    // /////////////////////////////////////////////////////////////////////////////
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void workflowInitialized(WorkflowTrackingContext context, URI serviceID, String... descriptionAndAnnotation) {
+        WorkflowInitializedDocument activity = WorkflowInitializedDocument.Factory.newInstance();
+        activity.addNewWorkflowInitialized();
+        // add timestamp and notification source; add description, and annotation if present
+        sendNotification(context, activity, descriptionAndAnnotation, "[Workflow is initialized; ready to be invoked]");
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void workflowTerminated(WorkflowTrackingContext context, URI serviceID, String... descriptionAndAnnotation) {
+        WorkflowTerminatedDocument activity = WorkflowTerminatedDocument.Factory.newInstance();
+        BaseNotificationType activityType = activity.addNewWorkflowTerminated();
+        sendNotification(context, activity, descriptionAndAnnotation,
+                "[Workflow is terminated; cannot be invoked anymore]");
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public InvocationContext workflowInvoked(WorkflowTrackingContext context, InvocationEntity initiator,
+            String... descriptionAndAnnotation) {
+        return workflowInvoked(context, initiator, null, null, descriptionAndAnnotation);
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public InvocationContext workflowInvoked(WorkflowTrackingContext context, InvocationEntity initiator,
+            XmlObject header, XmlObject body, String... descriptionAndAnnotation) {
+
+        WorkflowInvokedDocument activity = WorkflowInvokedDocument.Factory.newInstance();
+        RequestReceiverType activityType = activity.addNewWorkflowInvoked();
+
+        // create the invocation context; set the initiator to the remote entity
+        InvocationContextImpl invocationContext = new InvocationContextImpl(context.getMyself(), initiator);
+        if (initiator != null) {
+            activityType.addNewInitiator().set(initiator.toBaseIDType());
+        } else {
+            logger.warn("Possible Error in context that was passed. "
+                    + "There was no remote invoker defined for workflow invoked (initiator=NULL)");
+        }
+
+        // add header and body fields
+        if (header != null || body != null) {
+            InvocationMessageType request = activityType.addNewRequest();
+            if (header != null)
+                request.addNewHeader().set(header);
+            if (body != null)
+                request.addNewBody().set(body);
+        }
+        sendNotification(context, activity, descriptionAndAnnotation, "[Workflow is invoked]");
+        return invocationContext;
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public InvocationContext invokingService(WorkflowTrackingContext context, InvocationEntity receiver,
+            String... descriptionAndAnnotation) {
+        return invokingService(context, receiver, null, null, descriptionAndAnnotation);
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public InvocationContext invokingService(WorkflowTrackingContext context, InvocationEntity receiver,
+            XmlObject header, XmlObject body, String... descriptionAndAnnotation) {
+
+        InvokingServiceDocument activity = InvokingServiceDocument.Factory.newInstance();
+        RequestInitiatorType activityType = activity.addNewInvokingService();
+
+        // create the invocation context; set the receiver to the remote entity
+        InvocationContextImpl invocationContext = new InvocationContextImpl(context.getMyself(), receiver);
+        activityType.addNewReceiver().set(receiver.toBaseIDType());
+
+        // add header and body fields
+        if (header != null || body != null) {
+            InvocationMessageType request = activityType.addNewRequest();
+            if (header != null)
+                request.addNewHeader().set(header);
+            if (body != null)
+                request.addNewBody().set(body);
+        }
+        sendNotification(context, activity, descriptionAndAnnotation, "[Service is invoked]");
+        return invocationContext;
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void invokingServiceSucceeded(WorkflowTrackingContext wtcontext, InvocationContext context,
+            String... descriptionAndAnnotation) {
+
+        if (context == null)
+            throw new RuntimeException("Context passed was NULL.");
+
+        InvokingServiceSucceededDocument activity = InvokingServiceSucceededDocument.Factory.newInstance();
+        AcknowledgeSuccessType activityType = activity.addNewInvokingServiceSucceeded();
+
+        // set the remote entity as receiver
+        if (context.getRemoteEntity() != null) {
+            activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
+        } else {
+            logger.warn("Error in context that was passed. "
+                    + "there was no remote entity defined (requestReceiver=NULL)");
+        }
+        sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Service finished successfully]");
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void invokingServiceFailed(WorkflowTrackingContext wtcontext, InvocationContext context,
+            String... descriptionAndAnnotation) {
+
+        invokingServiceFailed(wtcontext, context, null, descriptionAndAnnotation);
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void invokingServiceFailed(WorkflowTrackingContext wtcontext, InvocationContext context, Throwable trace,
+            String... descriptionAndAnnotation) {
+
+        if (context == null)
+            throw new RuntimeException("Context passed was NULL.");
+
+        final InvokingServiceFailedDocument activity = InvokingServiceFailedDocument.Factory.newInstance();
+        final AcknowledgeFailureType activityType = activity.addNewInvokingServiceFailed();
+
+        // set the remote entity as receiver
+        if (context.getRemoteEntity() != null) {
+            activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
+        } else {
+            logger.warn("Error in context that was passed. "
+                    + "there was no remote entity defined (requestReceiver=NULL)");
+        }
+
+        // set stack trace if present
+        if (trace != null) {
+            final StringWriter sw = new StringWriter();
+            trace.printStackTrace(new PrintWriter(sw));
+
+            XmlString traceXmlStr = XmlString.Factory.newInstance();
+            traceXmlStr.setStringValue(sw.toString());
+            activityType.addNewFailure().addNewTrace().set(traceXmlStr);
+        }
+        sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Service failed]");
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void receivedResult(WorkflowTrackingContext wtcontext, InvocationContext context,
+            String... descriptionAndAnnotation) {
+
+        receivedResult(wtcontext, context, null, null, descriptionAndAnnotation);
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void receivedResult(WorkflowTrackingContext wtcontext, InvocationContext context, XmlObject header,
+            XmlObject body, String... descriptionAndAnnotation) {
+
+        if (context == null)
+            throw new RuntimeException("Context passed was NULL.");
+
+        ReceivedResultDocument activity = ReceivedResultDocument.Factory.newInstance();
+        ResultReceiverType activityType = activity.addNewReceivedResult();
+
+        // set the responder to the remote entity
+        if (context.getRemoteEntity() != null) {
+            activityType.addNewResponder().set(context.getRemoteEntity().toBaseIDType());
+        } else {
+            logger.warn("Error in context that was passed. " + "There was no remote entity defined (responder=NULL)");
+        }
+
+        // add header and body fields
+        if (header != null || body != null) {
+            InvocationMessageType result = activityType.addNewResult();
+            if (header != null)
+                result.addNewHeader().set(header);
+            if (body != null)
+                result.addNewBody().set(body);
+        }
+        sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Service failed]");
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void receivedFault(WorkflowTrackingContext wtcontext, InvocationContext context,
+            String... descriptionAndAnnotation) {
+
+        receivedFault(wtcontext, context, null, null, descriptionAndAnnotation);
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void receivedFault(WorkflowTrackingContext wtcontext, InvocationContext context, XmlObject header,
+            XmlObject faultBody, String... descriptionAndAnnotation) {
+
+        if (context == null)
+            throw new RuntimeException("Context passed was NULL.");
+
+        ReceivedFaultDocument activity = ReceivedFaultDocument.Factory.newInstance();
+        FaultReceiverType activityType = activity.addNewReceivedFault();
+
+        // set the responder to the remote entity
+        if (context.getRemoteEntity() != null) {
+            activityType.addNewResponder().set(context.getRemoteEntity().toBaseIDType());
+        } else {
+            logger.warn("Error in context that was passed. " + "There was no remote entity defined (responder=NULL)");
+        }
+
+        sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Fault is received for invocation ]");
+    }
+
+    // /////////////////////////////////////////////////////////////////////////////
+    // //
+    // SERVICE NOTIFIER //
+    // //
+    // /////////////////////////////////////////////////////////////////////////////
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void serviceInitialized(WorkflowTrackingContext context, URI serviceID, String... descriptionAndAnnotation) {
+
+        ServiceInitializedDocument activity = ServiceInitializedDocument.Factory.newInstance();
+        activity.addNewServiceInitialized();
+        sendNotification(context, activity, descriptionAndAnnotation, "[Service is initialized; ready to be invoked]");
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void serviceTerminated(WorkflowTrackingContext context, URI serviceID, String... descriptionAndAnnotation) {
+
+        ServiceTerminatedDocument activity = ServiceTerminatedDocument.Factory.newInstance();
+        activity.addNewServiceTerminated();
+        sendNotification(context, activity, descriptionAndAnnotation,
+                "[Service is terminated; cannot be invoked anymore]");
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public InvocationContext serviceInvoked(WorkflowTrackingContext context, InvocationEntity initiator,
+            String... descriptionAndAnnotation) {
+        return serviceInvoked(context, initiator, null, null, descriptionAndAnnotation);
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public InvocationContext serviceInvoked(WorkflowTrackingContext context, InvocationEntity initiator,
+            XmlObject header, XmlObject body, String... descriptionAndAnnotation) {
+
+        ServiceInvokedDocument activity = ServiceInvokedDocument.Factory.newInstance();
+        RequestReceiverType activityType = activity.addNewServiceInvoked();
+
+        // create the invocation context; set the initiator to the remote entity
+        InvocationContextImpl invocationContext = new InvocationContextImpl(context.getMyself(), initiator);
+        if (initiator != null) {
+            activityType.addNewInitiator().set(initiator.toBaseIDType());
+        } else {
+            logger.warn("Possible Error in context that was passed. "
+                    + "There was no remote invoker defined (initiator=NULL)");
+        }
+
+        // add header and body fields
+        if (header != null || body != null) {
+            InvocationMessageType request = activityType.addNewRequest();
+            if (header != null)
+                request.addNewHeader().set(header);
+            if (body != null)
+                request.addNewBody().set(body);
+        }
+
+        sendNotification(context, activity, descriptionAndAnnotation, "[Service is invoked]");
+
+        return invocationContext;
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void sendingResult(WorkflowTrackingContext wtcontext, InvocationContext context,
+            String... descriptionAndAnnotation) {
+
+        sendingResult(wtcontext, context, null, null, descriptionAndAnnotation);
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void sendingResult(WorkflowTrackingContext wtcontext, InvocationContext context, XmlObject header,
+            XmlObject body, String... descriptionAndAnnotation) {
+
+        if (context == null)
+            throw new RuntimeException("Context passed was NULL.");
+
+        SendingResultDocument activity = SendingResultDocument.Factory.newInstance();
+        ResultResponderType activityType = activity.addNewSendingResult();
+
+        // set the receiver to the remote entity
+        if (context.getRemoteEntity() != null) {
+            activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
+        } else {
+            logger.warn("Possible Error in context that was passed. "
+                    + "There was no remote entity defined (responseReceiver=NULL)");
+        }
+
+        // add header and body fields
+        if (header != null || body != null) {
+            InvocationMessageType result = activityType.addNewResult();
+            if (header != null)
+                result.addNewHeader().set(header);
+            if (body != null)
+                result.addNewBody().set(body);
+        }
+
+        sendNotification(wtcontext, activity, descriptionAndAnnotation,
+                "[Trying to send successful result of invocation]");
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     * 
+     */
+    public void sendingFault(WorkflowTrackingContext wtcontext, InvocationContext context,
+            String... descriptionAndAnnotation) {
+
+        sendingFault(wtcontext, context, null, null, descriptionAndAnnotation);
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void sendingFault(WorkflowTrackingContext wtcontext, InvocationContext context, XmlObject header,
+            XmlObject faultBody, String... descriptionAndAnnotation) {
+
+        if (context == null)
+            throw new RuntimeException("Context passed was NULL.");
+
+        SendingFaultDocument activity = SendingFaultDocument.Factory.newInstance();
+        FaultResponderType activityType = activity.addNewSendingFault();
+
+        // set the receiver to the remote entity
+        if (context.getRemoteEntity() != null) {
+            activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
+        } else {
+            logger.warn("Error in context that was passed. "
+                    + "There was no remote entity defined (responseReceiver=NULL)");
+        }
+
+        // add header and body fields
+        if (header != null || faultBody != null) {
+            FaultMessageType result = activityType.addNewFault();
+            if (header != null)
+                result.addNewHeader().set(header);
+            if (faultBody != null)
+                result.addNewBody().set(faultBody);
+        }
+
+        sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Trying to sending fault from invocation]");
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void sendingResponseSucceeded(WorkflowTrackingContext wtcontext, InvocationContext context,
+            String... descriptionAndAnnotation) {
+
+        if (context == null)
+            throw new RuntimeException("Context passed was NULL.");
+
+        SendingResponseSucceededDocument activity = SendingResponseSucceededDocument.Factory.newInstance();
+        AcknowledgeSuccessType activityType = activity.addNewSendingResponseSucceeded();
+
+        // set the remote entity as receiver
+        if (context.getRemoteEntity() != null) {
+            activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
+        } else {
+            logger.warn("Error in context that was passed. "
+                    + "there was no remote entity defined (responseReceiver=NULL)");
+        }
+
+        sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Successfully sent response of invocation]");
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void sendingResponseFailed(WorkflowTrackingContext wtcontext, InvocationContext context,
+            String... descriptionAndAnnotation) {
+
+        sendingResponseFailed(wtcontext, context, null, descriptionAndAnnotation);
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public void sendingResponseFailed(WorkflowTrackingContext wtcontext, InvocationContext context, Throwable trace,
+            String... descriptionAndAnnotation) {
+
+        if (context == null)
+            throw new RuntimeException("Context passed was NULL.");
+
+        SendingResponseFailedDocument activity = SendingResponseFailedDocument.Factory.newInstance();
+        AcknowledgeFailureType activityType = activity.addNewSendingResponseFailed();
+
+        // set the remote entity as receiver
+        if (context.getRemoteEntity() != null) {
+            activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
+        } else {
+            logger.warn("Error in context that was passed. "
+                    + "there was no remote entity defined (responseReceiver=NULL)");
+        }
+
+        // set stack trace if present
+        if (trace != null) {
+            final StringWriter sw = new StringWriter();
+            trace.printStackTrace(new PrintWriter(sw));
+
+            XmlString traceXmlStr = XmlString.Factory.newInstance();
+            traceXmlStr.setStringValue(sw.toString());
+            activityType.addNewFailure().addNewTrace().set(traceXmlStr);
+        }
+
+        sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Unable to send result of invocation]");
+    }
+
+    // /////////////////////////////////////////////////////////////////////////////
+    // //
+    // DATA PROVENANCE //
+    // //
+    // /////////////////////////////////////////////////////////////////////////////
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public DataObj dataConsumed(WorkflowTrackingContext context, URI dataId, List<URI> locations,
+            String... descriptionAndAnnotation) {
+
+        DataObj dataObj = new DataObjImpl(dataId, locations);
+        return dataConsumed(context, dataObj, descriptionAndAnnotation);
+    }
+
+    public DataObj dataConsumed(WorkflowTrackingContext context, URI dataId, List<URI> locations, int sizeInBytes,
+            String... descriptionAndAnnotation) {
+
+        DataObj dataObj = new DataObjImpl(dataId, locations, sizeInBytes);
+        return dataConsumed(context, dataObj, descriptionAndAnnotation);
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public DataObj dataConsumed(WorkflowTrackingContext context, DataObj dataObj, String... descriptionAndAnnotation) {
+        InvocationEntity entity = context.getMyself();
+        if (entity == null)
+            throw new RuntimeException("Local entity passed was NULL.");
+        if (dataObj == null)
+            throw new RuntimeException("Data object passed was NULL.");
+        if (dataObj.getId() == null)
+            throw new RuntimeException("Data object's ID was NULL.");
+
+        DataConsumedDocument activity = DataConsumedDocument.Factory.newInstance();
+        DataProductNotificationType activityType = activity.addNewDataConsumed();
+
+        // set the data product to the consumed data
+        DataProductType dataProduct = activityType.addNewDataProduct();
+        // set data ID and size
+        dataProduct.setId(dataObj.getId().toString());
+        dataProduct.setSizeInBytes(dataObj.getSizeInBytes());
+        // set data URLs
+        List<URI> locations = dataObj.getLocations();
+        for (URI location : locations) {
+            dataProduct.addLocation(location.toString());
+        }
+        // set data timestampp
+        final Calendar cal = new GregorianCalendar();
+        cal.setTime(activityTimestamp != null ? activityTimestamp : new Date());
+        dataProduct.setTimestamp(cal);
+
+        sendNotification(context, activity, descriptionAndAnnotation, "[consumed: ID=<" + dataObj.getId().toString()
+                + ">; URL=<#" + locations.size() + "><" + (locations.size() > 0 ? locations.get(0) : "") + ">]");
+
+        return dataObj;
+    }
+
+    // /**
+    // * Adds the file/directory was used by this invocation to the current dataConsuemd
+    // * notification batch. If the notification batch did not exist, it is created. The notification
+    // * is not sent until {@link #flush()} is called.
+    // *
+    // * @param entity identity of the workflow/service's invocation that consumed this file
+    // * @param dataObj data object recording the dataId, local/remote URLs, timestamp of
+    // * the file/dir, that was returned by another data notification method
+    // * @param descriptionAndAnnotation optional vararg. The first element is used as the
+    // * human readable description for this notification. The subsequent strings need to be
+    // * serialized XML fragments that are added as annotation to the notification.
+    // *
+    // * @return the data object passed to this method with file/dir size filled in if not
+    // * already when passed.
+    // *
+    // */
+    // protected DataObj dataConsumedBatched(WorkflowTrackingContext context, InvocationEntity entity, DataObj dataObj,
+    // String...descriptionAndAnnotation) {
+    //
+    // if(entity == null) throw new RuntimeException("Local entity passed was NULL.");
+    // if(dataObj == null) throw new RuntimeException("Data object passed was NULL.");
+    // if(dataObj.getId() == null) throw new RuntimeException("Data object's ID was NULL.");
+    //
+    // if (dataConsumedBatchActivity == null) {
+    //
+    // // create initial consumed notification container
+    // dataConsumedBatchActivity = DataConsumedDocument.Factory.newInstance();
+    // DataProductNotificationType activityType = dataConsumedBatchActivity.addNewDataConsumed();
+    //
+    //
+    // }
+    //
+    // // get existing consumed notification container
+    // DataProductNotificationType activityType = dataConsumedBatchActivity.addNewDataConsumed();
+    //
+    // // add nre data product to the consumed data
+    // DataProductType dataProduct = activityType.addNewDataProduct();
+    // // set data ID and size
+    // dataProduct.setId(dataObj.getId().toString());
+    // dataProduct.setSizeInBytes(dataObj.getSizeInBytes());
+    // // set data URLs
+    // List<URI> locations = dataObj.getLocations();
+    // for(URI location : locations){
+    // dataProduct.addLocation(location.toString());
+    // }
+    // // set data timestampp
+    // final Calendar cal = new GregorianCalendar();
+    // cal.setTime(activityTimestamp != null ? activityTimestamp : new Date());
+    // dataProduct.setTimestamp(cal);
+    //
+    // sendNotification(context, activityType, descriptionAndAnnotation,
+    // "[consumed: ID=<" + dataObj.getId().toString() +
+    // ">; URL=<#" + locations.size() + "><" +
+    // (locations.size() > 0 ? locations.get(0) : "") +
+    // ">]"
+    // );
+    //
+    // return dataObj;
+    // }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public DataObj dataProduced(WorkflowTrackingContext context, URI dataId, List<URI> locations,
+            String... descriptionAndAnnotation) {
+
+        DataObj dataObj = new DataObjImpl(dataId, locations);
+        return dataProduced(context, dataObj, descriptionAndAnnotation);
+    }
+
+    public DataObj dataProduced(WorkflowTrackingContext context, URI dataId, List<URI> locations, int sizeInBytes,
+            String... descriptionAndAnnotation) {
+
+        DataObj dataObj = new DataObjImpl(dataId, locations, sizeInBytes);
+        return dataProduced(context, dataObj, descriptionAndAnnotation);
+    }
+
+    /**
+     * {@inheritDoc}
+     * 
+     */
+    public DataObj dataProduced(WorkflowTrackingContext context, DataObj dataObj, String... descriptionAndAnnotation) {
+        InvocationEntity entity = context.getMyself();
+        if (entity == null)
+            throw new RuntimeException("Local entity passed was NULL.");
+        if (dataObj == null)
+            throw new RuntimeException("Data object passed was NULL.");
+        if (dataObj.getId() == null)
+            throw new RuntimeException("Data object's ID was NULL.");
+
+        DataProducedDocument activity = DataProducedDocument.Factory.newInstance();
+        DataProductNotificationType activityType = activity.addNewDataProduced();
+
+        // set the data product to the produced data
+        DataProductType dataProduct = activityType.addNewDataProduct();
+        // set data ID and size
+        dataProduct.setId(dataObj.getId().toString());
+        dataProduct.setSizeInBytes(dataObj.getSizeInBytes());
+        // set data URLs
+        List<URI> locations = dataObj.getLocations();
+        for (URI location : locations) {
+            dataProduct.addLocation(location.toString());
+        }
+        // set data timestampp
+        final Calendar cal = new GregorianCalendar();
+        cal.setTime(activityTimestamp != null ? activityTimestamp : new Date());
+        dataProduct.setTimestamp(cal);
+
+        sendNotification(context, activity, descriptionAndAnnotation, "[produced: ID=<" + dataObj.getId().toString()
+                + ">; URL=<#" + locations.size() + "><" + (locations.size() > 0 ? locations.get(0) : "") + ">]");
+
+        return dataObj;
+    }
+
+    /**
+     * Adds the file/directory was used by this invocation to the current dataProduced notification batch. If the
+     * notification batch did not exist, it is created. The notification is not sent untill {@link #flush()} is called.
+     * 
+     * @param entity
+     *            identity of the workflow/service's invocation that produced this file
+     * @param dataObj
+     *            data object recording the dataId, local/remote URLs, timestamp of the file/dir, that was returned by
+     *            another data notification method
+     * @param descriptionAndAnnotation
+     *            optional vararg. The first element is used as the human readable description for this notification.
+     *            The subsequent strings need to be serialized XML fragments that are added as annotation to the
+     *            notification.
+     * 
+     * @return the data object passed to this method with file/dir size filled in if not already when passed.
+     * 
+     */
+    protected DataObj dataProducedBatched(WorkflowTrackingContext context, InvocationEntity entity, DataObj dataObj,
+            String... descriptionAndAnnotation) {
+
+        if (entity == null)
+            throw new RuntimeException("Local entity passed was NULL.");
+        if (dataObj == null)
+            throw new RuntimeException("Data object passed was NULL.");
+        if (dataObj.getId() == null)
+            throw new RuntimeException("Data object's ID was NULL.");
+
+        if (dataProducedBatchActivity == null) {
+
+            // create initial produced notification container
+            dataProducedBatchActivity = DataProducedDocument.Factory.newInstance();
+        }
+
+        // get existing produced notification container
+        DataProductNotificationType activityType = dataProducedBatchActivity.addNewDataProduced();
+
+        // add new data product to the produced data
+        DataProductType dataProduct = activityType.addNewDataProduct();
+        // set data ID and size
+        dataProduct.setId(dataObj.getId().toString());
+        dataProduct.setSizeInBytes(dataObj.getSizeInBytes());
+        // set data URLs
+        List<URI> locations = dataObj.getLocations();
+        for (URI location : locations) {
+            dataProduct.addLocation(location.toString());
+        }
+        // set data timestamp
+        final Calendar cal = new GregorianCalendar();
+        cal.setTime(activityTimestamp != null ? activityTimestamp : new Date());
+        dataProduct.setTimestamp(cal);
+
+        // add description, and annotation to DATA PRODUCT if present
+        sendNotification(context, dataProducedBatchActivity, descriptionAndAnnotation, "[produced: ID=<"
+                + dataObj.getId().toString() + ">; URL=<#" + locations.size() + "><"
+                + (locations.size() > 0 ? locations.get(0) : "") + ">]");
+
+        return dataObj;
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java Sun Jul  3 18:32:59 2011
@@ -0,0 +1,207 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.publish;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.airavata.workflow.tracking.util.LinkedMessageQueue;
+import org.apache.airavata.workflow.tracking.util.Timer;
+import org.apache.log4j.Logger;
+import org.apache.xmlbeans.XmlObject;
+
+/**
+ * Abstract method to publish messages in sync or async mode. In async mode, the messages are kept in an in-memory queue
+ * and published. Calling flush() blocks till all messages are sent and the queue is empty. In sync mode, the call
+ * blocks till the message is transmitted.
+ */
+public abstract class AbstractPublisher implements Runnable, NotificationPublisher {
+
+    protected static final org.apache.log4j.Logger logger = Logger.getLogger(AbstractPublisher.class);
+    protected static final boolean IS_LOG_FINEST = logger.isDebugEnabled();
+    private final LinkedMessageQueue<BrokerEntry> messageQueue;
+    protected static final boolean IS_TIMER = Boolean.getBoolean("ENABLE_TIMER");
+    protected static final Timer notifTP = Timer.init("PubNotif");
+
+    private boolean finished = false;
+    private final Lock LOCK = new ReentrantLock();
+    private final Condition CONDITION = LOCK.newCondition();
+    private static int PUB_ID = 0;
+
+    private final boolean IS_DEFAULT_MODE_ASYNC;
+
+    private boolean deleted = false;
+
+    private final Thread pubThread;
+
+    protected AbstractPublisher(int capacity, boolean defaultAsync) {
+
+        messageQueue = new LinkedMessageQueue<BrokerEntry>(capacity);
+        IS_DEFAULT_MODE_ASYNC = defaultAsync;
+        deleted = false;
+        pubThread = new Thread(this, "PUBLISHER #" + PUB_ID++);
+        pubThread.setDaemon(true);
+        pubThread.start();
+    }
+
+    // public abstract void publishSync(String leadMessage);
+    public final void delete() {
+        deleted = true;
+        pubThread.interrupt();
+    }
+
+    public final boolean isDeleted() {
+        return deleted;
+    }
+
+    public final void publish(String leadMessage) {
+
+        if (IS_DEFAULT_MODE_ASYNC) {
+            publishAsync(leadMessage);
+        } else {
+            publishSync(leadMessage);
+        }
+    }
+
+    public final void publish(XmlObject xmlMessage) {
+
+        if (IS_DEFAULT_MODE_ASYNC) {
+            publishAsync(xmlMessage);
+        } else {
+            publishSync(xmlMessage.xmlText());
+        }
+    }
+
+    public final void publishAsync(String leadMessage) {
+
+        if (IS_LOG_FINEST) {
+            logger.debug("ASYNC: adding to queue, notification: " + leadMessage);
+        }
+        final BrokerEntry brokerEntry = new BrokerEntry(leadMessage);
+        try {
+            messageQueue.put(brokerEntry);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Publisher interrupted. Is it being deleted!?");
+        }
+    }
+
+    public final void publishAsync(XmlObject xmlMessage) {
+
+        if (IS_LOG_FINEST) {
+            logger.debug("ASYNC: adding to queue, notification: " + xmlMessage);
+        }
+
+        final BrokerEntry brokerEntry = new BrokerEntry(xmlMessage);
+        try {
+            messageQueue.put(brokerEntry);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Publisher interrupted. Is it being deleted!?");
+        }
+    }
+
+    public final void publishSync(XmlObject xmlMessage) {
+
+        if (IS_LOG_FINEST) {
+            logger.debug("SYNC: sending notification: " + xmlMessage);
+        }
+        publishSync(xmlMessage.xmlText());
+    }
+
+    public final void flush() {
+
+        finished = true;
+        LOCK.lock();
+        while (messageQueue.size() > 0) {
+            try {
+                // wait to be signalled that all messages were sent...
+                CONDITION.await();
+            } catch (InterruptedException e) {
+                throw new RuntimeException("Publisher interrupted. Is it being deleted!?");
+            }
+        }
+        finished = false;
+        CONDITION.signal(); // send ack...
+        LOCK.unlock();
+        return;
+    }
+
+    public final void run() {
+
+        BrokerEntry brokerEntry = null;
+        while (true) {
+
+            try {
+                // get the head from queue, but dont remove it yet
+                // block for message to arrive only if not finished;
+                // if finished, dont block...just quit
+                brokerEntry = finished ? messageQueue.peek() : messageQueue.get();
+                if (brokerEntry == null) {
+
+                    // the queue has been flushed
+                    if (finished) {
+                        LOCK.lock();
+                        CONDITION.signal(); // signal flushed queue...
+                        try {
+                            CONDITION.await(); // and wait for ack.
+                        } catch (InterruptedException e) {
+                            throw e;
+                        }
+                        LOCK.unlock();
+                    } else { /* ignore...this should not happen */
+                    }
+
+                    // go back to to start and wait for new message in flushed queue...
+                    continue;
+
+                } else {
+
+                    if (IS_LOG_FINEST) {
+                        logger.debug("ASYNC: sending notification: " + brokerEntry.getMessage());
+                    }
+
+                    // publish message
+                    publishSync(brokerEntry.getMessage());
+
+                    // remove the published head from queue
+                    messageQueue.poll();
+                }
+
+            } catch (InterruptedException e) {
+                if (deleted)
+                    break;
+                else
+                    logger.fatal("Interrupted when queue size: " + messageQueue.size() + ". deleted == false", e);
+            } catch (RuntimeException e) {
+
+                logger.fatal("Runtime Error: " + e.getMessage());
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Runtime Error at message: "
+                            + (brokerEntry != null ? brokerEntry.getMessage() : "NULL") + "; queue size: "
+                            + messageQueue.size(), e);
+                }
+                // fixme: we should remove the entry from queue if it cannot be sent...
+                // otherwise, if broker is down, this may cause an infinite loop!!!
+            }
+        }
+    }
+}

Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/BrokerEntry.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/BrokerEntry.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/BrokerEntry.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/BrokerEntry.java Sun Jul  3 18:32:59 2011
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.publish;
+
+import org.apache.xmlbeans.XmlObject;
+
+final class BrokerEntry {
+
+    public BrokerEntry(String message) {
+        this.message = message;
+        this.xml = null;
+    }
+
+    public BrokerEntry(XmlObject xml) {
+        this.xml = xml;
+        this.message = null;
+    }
+
+    public final String getMessage() {
+        if (message == null)
+            return xml.xmlText();
+        else
+            return message;
+    }
+
+    public final XmlObject getXmlObject() {
+        return xml;
+    }
+
+    private final String message;
+    private final XmlObject xml;
+}

Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/LoopbackPublisher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/LoopbackPublisher.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/LoopbackPublisher.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/LoopbackPublisher.java Sun Jul  3 18:32:59 2011
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.publish;
+
+import java.io.PrintStream;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+
+import org.apache.airavata.workflow.tracking.client.Callback;
+import org.apache.airavata.workflow.tracking.client.NotificationType;
+import org.apache.airavata.workflow.tracking.common.ConstructorConsts;
+import org.apache.airavata.workflow.tracking.common.ConstructorProps;
+import org.apache.airavata.workflow.tracking.util.MessageUtil;
+import org.apache.xmlbeans.XmlException;
+import org.apache.xmlbeans.XmlObject;
+
+/**
+ * acts as a workflow tracking NotificationPublisher that calls a workflow tracking callback listener without having to
+ * pass though a real notification broker. Default listener prints to stdout
+ */
+public class LoopbackPublisher extends AbstractPublisher implements NotificationPublisher {
+
+    private Callback listener;
+    private String topic;
+    private static int globalCount = 0;
+
+    public LoopbackPublisher(Callback listener_, String topic_) {
+        super(10, false); // capacity, async
+        topic = topic_;
+        listener = listener_;
+        if (listener == null) {
+            listener = new Callback() {
+                int count = 0;
+
+                public void deliverMessage(String topic, NotificationType notificationType, XmlObject messageObj) {
+
+                    System.out
+                            .printf("----\nReceived Message [L:%d/G:%d] on topic [%s] of type [%s] with payload:\n[%s]\n====\n",
+                                    count, globalCount, topic, notificationType, messageObj);
+                    count++;
+                    globalCount++;
+                }
+            };
+        }
+    }
+
+    public LoopbackPublisher(final PrintStream out_, String topic_) {
+        this(new Callback() {
+            int count = 0;
+
+            public void deliverMessage(String topic, NotificationType notificationType, XmlObject messageObj) {
+
+                out_.printf(
+                        "----\nReceived Message [L:%d/G:%d] on topic [%s] of type [%s] with payload:\n[%s]\n====\n",
+                        count, globalCount, topic, notificationType, messageObj);
+                count++;
+                globalCount++;
+            }
+        }, topic_);
+    }
+
+    public LoopbackPublisher(String topic_) {
+        this(System.out, topic_);
+    }
+
+    public LoopbackPublisher(ConstructorProps props) {
+        this((Callback) props.get(ConstructorConsts.CALLBACK_LISTENER), (String) props.get(ConstructorConsts.TOPIC));
+    }
+
+    /**
+     * Method publishSync
+     * 
+     * @param message
+     *            a String message that should be a valid XML String (serialized XML document)
+     * 
+     */
+    public void publishSync(String message) {
+
+        try {
+            XmlObject xmlMessage = XmlObject.Factory.parse(message);
+            NotificationType type = MessageUtil.getType(xmlMessage);
+            listener.deliverMessage(topic, type, xmlMessage);
+        } catch (XmlException e) {
+            System.err.println("Error parsing workflow tracking message : [" + message + "]\n" + "as an XML Object");
+            e.printStackTrace();
+        }
+    }
+
+    public static void main(String args[]) {
+
+        LoopbackPublisher publisher = new LoopbackPublisher(new Callback() {
+            int count = 0;
+
+            public void deliverMessage(String topic, NotificationType notificationType, XmlObject messageObj) {
+
+                System.out.printf("----\nReceived Message [%d] on topic [%s] of type [%s] with payload:\n[%s]\n====\n",
+                        count++, topic, notificationType, messageObj);
+            }
+        }, "testTopic");
+
+        // create & publish log message
+        {
+            LogInfoDocument logMsg = LogInfoDocument.Factory.newInstance();
+            BaseNotificationType log = logMsg.addNewLogInfo();
+            // add timestamp
+            final Calendar cal = new GregorianCalendar();
+            cal.setTime(new Date());
+            log.setTimestamp(cal);
+            // add notification source
+            BaseIDType baseID = BaseIDType.Factory.newInstance();
+            baseID.setServiceID("http://tempuri.org/test_service");
+            log.addNewNotificationSource().set(baseID);
+            // add description
+            log.setDescription("A test message");
+
+            // publish message as XML Object
+            publisher.publish(logMsg);
+        }
+        // create & publish publishURl message
+        {
+            // create publish URL message
+            PublishURLDocument pubMsg = PublishURLDocument.Factory.newInstance();
+            PublishURLDocument.PublishURL pub = pubMsg.addNewPublishURL();
+            // add timestamp
+            final Calendar cal = new GregorianCalendar();
+            cal.setTime(new Date());
+            pub.setTimestamp(cal);
+            // add notification source
+            BaseIDType baseID = BaseIDType.Factory.newInstance();
+            baseID.setServiceID("http://tempuri.org/test_service");
+            pub.addNewNotificationSource().set(baseID);
+            pub.setTitle("Some URL's Title");
+            pub.setLocation("http://tempuri.org/published_url");
+
+            // publish message as XML string
+            publisher.publish(pubMsg.xmlText());
+        }
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/NotificationPublisher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/NotificationPublisher.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/NotificationPublisher.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/NotificationPublisher.java Sun Jul  3 18:32:59 2011
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.publish;
+
+import org.apache.xmlbeans.XmlObject;
+
+public interface NotificationPublisher {
+
+    public void publish(String leadMessage);
+
+    public void publish(XmlObject xmlMessage);
+
+    public void publishSync(String leadMessage);
+
+    public void publishSync(XmlObject xmlMessage);
+
+    public void publishAsync(String leadMessage);
+
+    public void publishAsync(XmlObject xmlMessage);
+
+    public void flush();
+
+    public void delete();
+}

Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/WSMPublisher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/WSMPublisher.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/WSMPublisher.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/WSMPublisher.java Sun Jul  3 18:32:59 2011
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.publish;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.URL;
+import java.util.Properties;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.commons.WorkFlowUtils;
+import org.apache.airavata.workflow.tracking.WorkflowTrackingException;
+import org.apache.airavata.workflow.tracking.common.WorkflowTrackingContext;
+import org.apache.airavata.workflow.tracking.util.ConfigKeys;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
+import org.apache.log4j.Logger;
+
+/**
+ * Publish WS-Eventing messages using WS-Messenger client API
+ * 
+ */
+public class WSMPublisher extends AbstractPublisher implements NotificationPublisher {
+
+    protected final WseMsgBrokerClient broker;
+    protected final EndpointReference brokerEpr;
+    protected Properties configs = new Properties();
+
+    private Logger log = Logger.getLogger(WSMPublisher.class);
+
+    public WSMPublisher(WorkflowTrackingContext context) {
+        this(10, context.isEnableAsyncPublishing(), context.getBrokerEpr());
+    }
+
+    public WSMPublisher(int capacity, boolean defaultAsync, String brokerLoc, String topic) throws IOException {
+        super(capacity, defaultAsync);
+        URL configURL = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+        configs.load(configURL.openStream());
+        broker = new WseMsgBrokerClient();
+        brokerEpr = broker.createEndpointReference(brokerLoc, topic);
+        broker.init(brokerEpr.getAddress());
+    }
+
+    public WSMPublisher(int capacity, boolean defaultAsync, EndpointReference brokerEpr_)
+            throws WorkflowTrackingException {
+        super(capacity, defaultAsync);
+        try {
+            URL configURL = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+            configs.load(configURL.openStream());
+            brokerEpr = brokerEpr_;
+            broker = new WseMsgBrokerClient();
+            broker.init(brokerEpr_.getAddress());
+        } catch (IOException e) {
+            throw new WorkflowTrackingException(e);
+        }
+    }
+
+    public WSMPublisher(int capacity, boolean defaultAsync, String brokerEpr_) throws IOException {
+
+        this(capacity, defaultAsync, brokerEpr_, false);
+
+    }
+
+    public WSMPublisher(int capacity, boolean defaultAsync, String brokerEpr_, boolean isXmlEpr) throws IOException {
+
+        super(capacity, defaultAsync);
+        URL configURL = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+        configs.load(configURL.openStream());
+
+        if (!isXmlEpr) {
+            brokerEpr = new EndpointReference(brokerEpr_);// EndpointReferenceHelper.fro(brokerEpr_);
+
+        } else {
+            brokerEpr = EndpointReferenceHelper.fromString(brokerEpr_);
+        }
+
+        broker = new WseMsgBrokerClient();
+        broker.init(brokerEpr.getAddress());
+    }
+
+    /**
+     * Method publishSync
+     * 
+     * @param leadMessage
+     *            a String
+     * 
+     */
+    public void publishSync(String leadMessage) {
+        if (isDeleted())
+            throw new RuntimeException("Publisher has been deleted!");
+        if (IS_LOG_FINEST) {
+            logger.debug("publishing notification to messenger broker: " + leadMessage);
+        }
+        try {
+            OMElement msg = WorkFlowUtils.reader2OMElement(new StringReader(leadMessage));
+            broker.publish(null, msg);
+
+        } catch (MsgBrokerClientException e) {
+            log.error("unablet to publish the lead message", e);
+        } catch (XMLStreamException e) {
+            log.error("unable to parse the load message - " + leadMessage, e);
+        }
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataDurationImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataDurationImpl.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataDurationImpl.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataDurationImpl.java Sun Jul  3 18:32:59 2011
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.state;
+
+import java.net.URI;
+
+import org.apache.airavata.workflow.tracking.common.DataDurationObj;
+import org.apache.airavata.workflow.tracking.common.DataObj;
+
+public class DataDurationImpl extends DurationImpl implements DataDurationObj {
+
+    protected DataObj dataObj;
+    protected URI remoteLocation;
+
+    public DataDurationImpl(DataObj dataObj_, URI remoteLocation_) {
+
+        super(); // set start time to now
+        dataObj = dataObj_;
+        remoteLocation = remoteLocation_;
+    }
+
+    public DataDurationImpl(DataObj dataObj_, URI remoteLocation_, long fixedDuration) {
+
+        super(fixedDuration); // set duration to passed value
+        dataObj = dataObj_;
+        remoteLocation = remoteLocation_;
+    }
+
+    public DataObj getDataObj() {
+
+        return dataObj;
+    }
+
+    public URI getRemoteLocation() {
+
+        return remoteLocation;
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataObjImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataObjImpl.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataObjImpl.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataObjImpl.java Sun Jul  3 18:32:59 2011
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.state;
+
+import java.io.File;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.airavata.workflow.tracking.common.DataObj;
+
+public class DataObjImpl implements DataObj {
+
+    protected URI dataId;
+    protected List<URI> locations;
+    protected long sizeInBytes = -1;
+
+    public DataObjImpl(URI dataId_, List<URI> location_) {
+
+        dataId = dataId_;
+        if (dataId == null || dataId.toString().length() == 0)
+            throw new RuntimeException("Data ID cannot be NULL or empty");
+
+        locations = location_;
+    }
+
+    public DataObjImpl(URI dataId_, List<URI> location_, long sizeInBytes_) {
+
+        this(dataId_, location_);
+        sizeInBytes = sizeInBytes_;
+    }
+
+    public URI getId() {
+
+        return dataId;
+    }
+
+    public URI getLocalLocation() {
+
+        return locations != null && locations.size() > 0 ? locations.get(0) : null;
+    }
+
+    public List<URI> getLocations() {
+
+        return locations;
+    }
+
+    public long getSizeInBytes() {
+
+        // skip getting bytes if already calculated or not possible to calculate
+        if (sizeInBytes >= 0 || locations == null || locations.size() == 0)
+            return sizeInBytes;
+
+        // check if the location is a local file. If so, we calculate the size.
+        URI location = locations.get(0);
+        String scheme = location.getScheme();
+        String authority = location.getAuthority();
+        if ((scheme == null && authority == null) || "file".equals(scheme)) {
+            sizeInBytes = getFileSize(new File(location.getPath()));
+        }
+        return sizeInBytes;
+    }
+
+    protected static final long getFileSize(File file) {
+        if (file.isDirectory()) {
+            return getDirSize(file, 0, true);
+        } else {
+            return file.length();
+        }
+    }
+
+    private static final long getDirSize(File dir, long size, boolean recurse) {
+        File[] files = dir.listFiles();
+        if (files == null)
+            return size;
+        for (int i = 0; i < files.length; i++) {
+            if (files[i].isDirectory()) {
+                if (recurse)
+                    size += getDirSize(files[i], size, recurse);
+            } else {
+                size += files[i].length();
+            }
+        }
+        return size;
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DurationImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DurationImpl.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DurationImpl.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DurationImpl.java Sun Jul  3 18:32:59 2011
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.state;
+
+import org.apache.airavata.workflow.tracking.common.DurationObj;
+
+/**
+ * Convinience class to record the state of computation related notifications.
+ */
+public class DurationImpl implements DurationObj {
+
+    protected long startTimeMillis = Long.MAX_VALUE;
+    protected long endTimeMillis = Long.MIN_VALUE;
+    protected long fixedDuration = Long.MIN_VALUE;
+    protected boolean isFixedDuration = false;
+
+    public DurationImpl() {
+        startTimeMillis = System.currentTimeMillis();
+    }
+
+    public DurationImpl(long fixedDuration_) {
+        isFixedDuration = true;
+        fixedDuration = fixedDuration_;
+    }
+
+    public long markStartTimeMillis() {
+
+        this.startTimeMillis = System.currentTimeMillis();
+        return startTimeMillis;
+    }
+
+    public long getStartTimeMillis() {
+
+        return startTimeMillis;
+    }
+
+    public long markEndTimeMillis() {
+
+        this.endTimeMillis = System.currentTimeMillis();
+        return endTimeMillis;
+    }
+
+    public long getEndTimeMillis() {
+
+        return endTimeMillis;
+    }
+
+    public long getDurationMillis() {
+
+        if (isFixedDuration)
+            return fixedDuration;
+        return endTimeMillis - startTimeMillis;
+    }
+}

Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationContextImpl.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationContextImpl.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationContextImpl.java Sun Jul  3 18:32:59 2011
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.state;
+
+import org.apache.airavata.workflow.tracking.common.InvocationContext;
+import org.apache.airavata.workflow.tracking.common.InvocationEntity;
+
+public class InvocationContextImpl extends InvocationEntityImpl implements InvocationContext {
+
+    InvocationEntity remoteEntity;
+
+    public InvocationContextImpl(InvocationEntity localEntity_, InvocationEntity remoteEntity_) {
+        super(localEntity_);
+        remoteEntity = remoteEntity_;
+    }
+
+    public InvocationEntity getRemoteEntity() {
+
+        return remoteEntity;
+    }
+}

Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationEntityImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationEntityImpl.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationEntityImpl.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationEntityImpl.java Sun Jul  3 18:32:59 2011
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.state;
+
+import java.net.URI;
+
+import org.apache.airavata.workflow.tracking.common.InvocationEntity;
+
+public class InvocationEntityImpl implements InvocationEntity {
+
+    protected URI workflowID;
+    protected URI serviceID;
+    protected String workflowNodeID;
+    protected Integer workflowTimestep;
+
+    /**
+     * Constructor used when only service ID is available (i.e. entity not in the context of an invocation)
+     * 
+     * @param serviceID_
+     *            an URI
+     * 
+     */
+    public InvocationEntityImpl(URI serviceID_) {
+
+        if (serviceID_ == null)
+            throw new RuntimeException("ServiceID passed was null!");
+
+        serviceID = serviceID_;
+
+        workflowID = null;
+        workflowNodeID = null;
+        workflowTimestep = null;
+    }
+
+    /**
+     * Constructor used when all IDs are potentially available (i.e. entity in the context of an invocation)
+     * 
+     * @param workflowID_
+     *            an URI
+     * @param serviceID_
+     *            an URI
+     * @param workflowNodeID_
+     *            a String
+     * @param workflowTimestep_
+     *            an int
+     * 
+     */
+    public InvocationEntityImpl(URI workflowID_, URI serviceID_, String workflowNodeID_, Integer workflowTimestep_) {
+
+        if (serviceID_ == null)
+            throw new RuntimeException("ServiceID passed was null!");
+
+        workflowID = workflowID_;
+        serviceID = serviceID_;
+        workflowNodeID = workflowNodeID_;
+        workflowTimestep = workflowTimestep_;
+    }
+
+    /**
+     * Copy Constructor
+     * 
+     * @param source
+     *            an InvocationEntity
+     * 
+     */
+    protected InvocationEntityImpl(InvocationEntity source) {
+        this(source.getWorkflowID(), source.getServiceID(), source.getWorkflowNodeID(), source.getWorkflowTimestep());
+    }
+
+    public String getWorkflowNodeID() {
+
+        return workflowNodeID;
+    }
+
+    public URI getServiceID() {
+
+        return serviceID;
+    }
+
+    public Integer getWorkflowTimestep() {
+
+        return workflowTimestep;
+    }
+
+    public URI getWorkflowID() {
+
+        return workflowID;
+    }
+
+    public BaseIDType toBaseIDType() {
+
+        BaseIDType baseID = BaseIDType.Factory.newInstance();
+        if (serviceID != null)
+            baseID.setServiceID(serviceID.toString());
+        if (workflowID != null)
+            baseID.setWorkflowID(workflowID.toString());
+        if (workflowTimestep != null)
+            baseID.setWorkflowTimestep(workflowTimestep);
+        if (workflowNodeID != null)
+            baseID.setWorkflowNodeID(workflowNodeID);
+
+        return baseID;
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/LeadNotificationHandler.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/LeadNotificationHandler.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/LeadNotificationHandler.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/LeadNotificationHandler.java Sun Jul  3 18:32:59 2011
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.subscription;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.workflow.tracking.client.Callback;
+import org.apache.airavata.workflow.tracking.client.NotificationType;
+import org.apache.airavata.workflow.tracking.client.Subscription;
+import org.apache.airavata.workflow.tracking.util.MessageUtil;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.log4j.Logger;
+import org.apache.xmlbeans.XmlCursor;
+import org.apache.xmlbeans.XmlException;
+import org.apache.xmlbeans.XmlObject;
+
+/**
+ * Utility for clients to subscribe and receive Lead notifications using new message schema. The agent implements the
+ * LeadNotificationHandler.Callback interface and starts the notification handler with the broker location, the topic,
+ * and an option to pull the messages (to get around firewalls) by providing a message box url. The deliverMessage
+ * method in the Callback interface in invoked when a message with the nes LEAD message type arrives. If a
+ * LeadEvent/NCSAEvent arrives, it is silently dropped after being logged. Check the main() method for sample usage.
+ */
+public class LeadNotificationHandler implements ConsumerNotificationHandler {
+
+    private final static org.apache.log4j.Logger logger = Logger.getLogger(LeadNotificationHandler.class);
+
+    private String topic;
+
+    private String brokerLoc;
+
+    private Callback callback;
+
+    private int consumerServerPort;
+
+    public LeadNotificationHandler(String brokerLoc, String topic, Callback callback, int port) {
+        if (port == 0)
+            this.consumerServerPort = 2222;
+        else
+            this.consumerServerPort = port;
+        this.brokerLoc = brokerLoc;
+        this.topic = topic;
+        this.callback = callback;
+
+    }
+
+    /**
+     * NON API Method. Use LeadNotificationManager.CreateSubscription() method to create a subscription
+     * 
+     * @param topic
+     * @param callback
+     * @return
+     * @throws Exception
+     */
+    public Subscription createSubscription() throws Exception {
+        WseMsgBrokerClient wseClient = new WseMsgBrokerClient();
+        wseClient.init(brokerLoc);
+        logger.debug("Starting Subscription for topic [" + topic + "]at the broker location:" + brokerLoc);
+        ConsumerServer xs = new ConsumerServer(consumerServerPort, this);
+        xs.start();
+        String subscriptionId = wseClient.subscribe(xs.getConsumerServiceEPRs()[0], topic, null);
+        logger.info("The consumer server started on EPR" + xs.getConsumerServiceEPRs()[0]);
+        Subscription subscription = new Subscription(xs, subscriptionId, topic, callback, brokerLoc);
+        return subscription;
+    }
+
+    /**
+     * NONAPI method Method handleNotification. Called by the message broker when a message arrives at the subscribed
+     * topic. Should NOT be called locally. This method will call the Callback interface's deliverMessage when a valid
+     * Lead Message is received.
+     * 
+     * @param messageBody
+     *            the soap message body containing the notification message
+     * 
+     */
+
+    public void handleNotification(SOAPEnvelope envelope) {
+        OMElement messageContent = envelope.getBody().getFirstElement();
+        SOAPHeader soapHeader = envelope.getHeader();
+        OMElement topicEl = soapHeader.getFirstChildWithName(new QName(null, "Topic"));
+        XmlObject messageObj = null;
+
+        if (topicEl != null) {
+            if (topicEl.getChildElements().hasNext()) {
+                OMElement widgetTopicOMEl = (OMElement) topicEl.getChildElements().next();
+                String widgetTopicString = null;
+                try {
+                    widgetTopicString = widgetTopicOMEl.toStringWithConsume();
+                } catch (XMLStreamException e) {
+                    // TODO add with throws
+                    e.printStackTrace();
+                }
+                String[] topicSubstrings = widgetTopicString.split(":");
+                if (topicSubstrings.length > 1) {
+                    topic = topicSubstrings[1];
+                }
+            }
+        }
+
+        if (topic != null) {
+            try {
+                try {
+                    messageObj = XmlObject.Factory.parse(messageContent.toStringWithConsume());
+                } catch (XMLStreamException e) {
+                    // TODO add with throws
+                    e.printStackTrace();
+                }
+                XmlCursor xc = messageObj.newCursor();
+                xc.toNextToken();
+
+                xc.dispose();
+            } catch (XmlException e) {
+                logger.fatal("error parsing message content: " + messageContent, e);
+                e.printStackTrace();
+            }
+            NotificationType type = MessageUtil.getType(messageObj);
+            this.callback.deliverMessage(topic, type, messageObj);
+
+        } else {
+            logger.info("Notification came without a Notification Topic:" + envelope);
+        }
+    }
+
+}