You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2011/07/03 20:33:03 UTC
svn commit: r1142475 [4/7] - in
/incubator/airavata/ws-messaging/trunk/workflow-tracking: ./ .settings/
src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/airavata/ src/main/java/org/apache/airavata/comm...
Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/ProvenanceNotifierImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/ProvenanceNotifierImpl.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/ProvenanceNotifierImpl.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/ProvenanceNotifierImpl.java Sun Jul 3 18:32:59 2011
@@ -0,0 +1,795 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.URI;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+import org.apache.airavata.workflow.tracking.ProvenanceNotifier;
+import org.apache.airavata.workflow.tracking.common.DataObj;
+import org.apache.airavata.workflow.tracking.common.InvocationContext;
+import org.apache.airavata.workflow.tracking.common.InvocationEntity;
+import org.apache.airavata.workflow.tracking.common.WorkflowTrackingContext;
+import org.apache.airavata.workflow.tracking.impl.state.DataObjImpl;
+import org.apache.airavata.workflow.tracking.impl.state.InvocationContextImpl;
+import org.apache.airavata.workflow.tracking.impl.state.InvocationEntityImpl;
+import org.apache.xmlbeans.XmlObject;
+import org.apache.xmlbeans.XmlString;
+
+/**
+ * DOES NOT SUPPORT MULTI_THREADING -- PUBLISHER QUEUE, DATA CONSUMED/PRODUCED BATCHING
+ *
+ * Utility to create and send Lead notification messages using new notification schema from a Workflow Engine
+ *
+ * The constructor of this class uses the following properties from CONSTS: BROKER_URL, TOPIC, WORKFLOW_ID, NODE_ID,
+ * TIMESTEP, SERVICE_ID, ASYNC_PUB_MODE
+ */
+public class ProvenanceNotifierImpl extends GenericNotifierImpl implements ProvenanceNotifier {
+
+ private DataConsumedDocument dataConsumedBatchActivity;
+ private DataProducedDocument dataProducedBatchActivity;
+
+ // public ProvenanceNotifierImpl(ConstructorProps props) throws XMLStreamException, IOException {
+ // super(props);
+ // DATA_BATCHED = Boolean.parseBoolean((String)props.get(ENABLE_BATCH_PROVENANCE));
+ // }
+
+ public ProvenanceNotifierImpl() {
+ super();
+ }
+
+ /**
+ * this method allows us to override the default timestamp with a user supplied one
+ *
+ * @param msg
+ * a BaseNotificationType
+ * @param entity
+ * an InvocationEntity
+ *
+ */
+ // @Override
+ // protected void setIDAndTimestamp(WorkfloBaseNotificationType msg, InvocationEntity entity) {
+ // if(activityTimestamp == null)
+ // super.setIDAndTimestamp(msg, entity);
+ // else
+ // super.setIDAndTimestamp(msg, entity, activityTimestamp);
+ // }
+
+ // protected void setIDAndTimestamp(BaseNotificationType msg, URI serviceID) {
+ // setIDAndTimestamp(msg, createEntity(serviceID));
+ // }
+
+ protected InvocationEntity createEntity(URI serviceID) {
+
+ return new InvocationEntityImpl(serviceID);
+ }
+
+ // /////////////////////////////////////////////////////////////////////////////
+ // //
+ // WORKFLOW NOTIFIER //
+ // //
+ // /////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void workflowInitialized(WorkflowTrackingContext context, URI serviceID, String... descriptionAndAnnotation) {
+ WorkflowInitializedDocument activity = WorkflowInitializedDocument.Factory.newInstance();
+ activity.addNewWorkflowInitialized();
+ // add timestamp and notification source; add description, and annotation if present
+ sendNotification(context, activity, descriptionAndAnnotation, "[Workflow is initialized; ready to be invoked]");
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void workflowTerminated(WorkflowTrackingContext context, URI serviceID, String... descriptionAndAnnotation) {
+ WorkflowTerminatedDocument activity = WorkflowTerminatedDocument.Factory.newInstance();
+ BaseNotificationType activityType = activity.addNewWorkflowTerminated();
+ sendNotification(context, activity, descriptionAndAnnotation,
+ "[Workflow is terminated; cannot be invoked anymore]");
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public InvocationContext workflowInvoked(WorkflowTrackingContext context, InvocationEntity initiator,
+ String... descriptionAndAnnotation) {
+ return workflowInvoked(context, initiator, null, null, descriptionAndAnnotation);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public InvocationContext workflowInvoked(WorkflowTrackingContext context, InvocationEntity initiator,
+ XmlObject header, XmlObject body, String... descriptionAndAnnotation) {
+
+ WorkflowInvokedDocument activity = WorkflowInvokedDocument.Factory.newInstance();
+ RequestReceiverType activityType = activity.addNewWorkflowInvoked();
+
+ // create the invocation context; set the initiator to the remote entity
+ InvocationContextImpl invocationContext = new InvocationContextImpl(context.getMyself(), initiator);
+ if (initiator != null) {
+ activityType.addNewInitiator().set(initiator.toBaseIDType());
+ } else {
+ logger.warn("Possible Error in context that was passed. "
+ + "There was no remote invoker defined for workflow invoked (initiator=NULL)");
+ }
+
+ // add header and body fields
+ if (header != null || body != null) {
+ InvocationMessageType request = activityType.addNewRequest();
+ if (header != null)
+ request.addNewHeader().set(header);
+ if (body != null)
+ request.addNewBody().set(body);
+ }
+ sendNotification(context, activity, descriptionAndAnnotation, "[Workflow is invoked]");
+ return invocationContext;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public InvocationContext invokingService(WorkflowTrackingContext context, InvocationEntity receiver,
+ String... descriptionAndAnnotation) {
+ return invokingService(context, receiver, null, null, descriptionAndAnnotation);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public InvocationContext invokingService(WorkflowTrackingContext context, InvocationEntity receiver,
+ XmlObject header, XmlObject body, String... descriptionAndAnnotation) {
+
+ InvokingServiceDocument activity = InvokingServiceDocument.Factory.newInstance();
+ RequestInitiatorType activityType = activity.addNewInvokingService();
+
+ // create the invocation context; set the receiver to the remote entity
+ InvocationContextImpl invocationContext = new InvocationContextImpl(context.getMyself(), receiver);
+ activityType.addNewReceiver().set(receiver.toBaseIDType());
+
+ // add header and body fields
+ if (header != null || body != null) {
+ InvocationMessageType request = activityType.addNewRequest();
+ if (header != null)
+ request.addNewHeader().set(header);
+ if (body != null)
+ request.addNewBody().set(body);
+ }
+ sendNotification(context, activity, descriptionAndAnnotation, "[Service is invoked]");
+ return invocationContext;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void invokingServiceSucceeded(WorkflowTrackingContext wtcontext, InvocationContext context,
+ String... descriptionAndAnnotation) {
+
+ if (context == null)
+ throw new RuntimeException("Context passed was NULL.");
+
+ InvokingServiceSucceededDocument activity = InvokingServiceSucceededDocument.Factory.newInstance();
+ AcknowledgeSuccessType activityType = activity.addNewInvokingServiceSucceeded();
+
+ // set the remote entity as receiver
+ if (context.getRemoteEntity() != null) {
+ activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
+ } else {
+ logger.warn("Error in context that was passed. "
+ + "there was no remote entity defined (requestReceiver=NULL)");
+ }
+ sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Service finished successfully]");
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void invokingServiceFailed(WorkflowTrackingContext wtcontext, InvocationContext context,
+ String... descriptionAndAnnotation) {
+
+ invokingServiceFailed(wtcontext, context, null, descriptionAndAnnotation);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void invokingServiceFailed(WorkflowTrackingContext wtcontext, InvocationContext context, Throwable trace,
+ String... descriptionAndAnnotation) {
+
+ if (context == null)
+ throw new RuntimeException("Context passed was NULL.");
+
+ final InvokingServiceFailedDocument activity = InvokingServiceFailedDocument.Factory.newInstance();
+ final AcknowledgeFailureType activityType = activity.addNewInvokingServiceFailed();
+
+ // set the remote entity as receiver
+ if (context.getRemoteEntity() != null) {
+ activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
+ } else {
+ logger.warn("Error in context that was passed. "
+ + "there was no remote entity defined (requestReceiver=NULL)");
+ }
+
+ // set stack trace if present
+ if (trace != null) {
+ final StringWriter sw = new StringWriter();
+ trace.printStackTrace(new PrintWriter(sw));
+
+ XmlString traceXmlStr = XmlString.Factory.newInstance();
+ traceXmlStr.setStringValue(sw.toString());
+ activityType.addNewFailure().addNewTrace().set(traceXmlStr);
+ }
+ sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Service failed]");
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void receivedResult(WorkflowTrackingContext wtcontext, InvocationContext context,
+ String... descriptionAndAnnotation) {
+
+ receivedResult(wtcontext, context, null, null, descriptionAndAnnotation);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void receivedResult(WorkflowTrackingContext wtcontext, InvocationContext context, XmlObject header,
+ XmlObject body, String... descriptionAndAnnotation) {
+
+ if (context == null)
+ throw new RuntimeException("Context passed was NULL.");
+
+ ReceivedResultDocument activity = ReceivedResultDocument.Factory.newInstance();
+ ResultReceiverType activityType = activity.addNewReceivedResult();
+
+ // set the responder to the remote entity
+ if (context.getRemoteEntity() != null) {
+ activityType.addNewResponder().set(context.getRemoteEntity().toBaseIDType());
+ } else {
+ logger.warn("Error in context that was passed. " + "There was no remote entity defined (responder=NULL)");
+ }
+
+ // add header and body fields
+ if (header != null || body != null) {
+ InvocationMessageType result = activityType.addNewResult();
+ if (header != null)
+ result.addNewHeader().set(header);
+ if (body != null)
+ result.addNewBody().set(body);
+ }
+ sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Service failed]");
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void receivedFault(WorkflowTrackingContext wtcontext, InvocationContext context,
+ String... descriptionAndAnnotation) {
+
+ receivedFault(wtcontext, context, null, null, descriptionAndAnnotation);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void receivedFault(WorkflowTrackingContext wtcontext, InvocationContext context, XmlObject header,
+ XmlObject faultBody, String... descriptionAndAnnotation) {
+
+ if (context == null)
+ throw new RuntimeException("Context passed was NULL.");
+
+ ReceivedFaultDocument activity = ReceivedFaultDocument.Factory.newInstance();
+ FaultReceiverType activityType = activity.addNewReceivedFault();
+
+ // set the responder to the remote entity
+ if (context.getRemoteEntity() != null) {
+ activityType.addNewResponder().set(context.getRemoteEntity().toBaseIDType());
+ } else {
+ logger.warn("Error in context that was passed. " + "There was no remote entity defined (responder=NULL)");
+ }
+
+ sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Fault is received for invocation ]");
+ }
+
+ // /////////////////////////////////////////////////////////////////////////////
+ // //
+ // SERVICE NOTIFIER //
+ // //
+ // /////////////////////////////////////////////////////////////////////////////
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void serviceInitialized(WorkflowTrackingContext context, URI serviceID, String... descriptionAndAnnotation) {
+
+ ServiceInitializedDocument activity = ServiceInitializedDocument.Factory.newInstance();
+ activity.addNewServiceInitialized();
+ sendNotification(context, activity, descriptionAndAnnotation, "[Service is initialized; ready to be invoked]");
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void serviceTerminated(WorkflowTrackingContext context, URI serviceID, String... descriptionAndAnnotation) {
+
+ ServiceTerminatedDocument activity = ServiceTerminatedDocument.Factory.newInstance();
+ activity.addNewServiceTerminated();
+ sendNotification(context, activity, descriptionAndAnnotation,
+ "[Service is terminated; cannot be invoked anymore]");
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public InvocationContext serviceInvoked(WorkflowTrackingContext context, InvocationEntity initiator,
+ String... descriptionAndAnnotation) {
+ return serviceInvoked(context, initiator, null, null, descriptionAndAnnotation);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public InvocationContext serviceInvoked(WorkflowTrackingContext context, InvocationEntity initiator,
+ XmlObject header, XmlObject body, String... descriptionAndAnnotation) {
+
+ ServiceInvokedDocument activity = ServiceInvokedDocument.Factory.newInstance();
+ RequestReceiverType activityType = activity.addNewServiceInvoked();
+
+ // create the invocation context; set the initiator to the remote entity
+ InvocationContextImpl invocationContext = new InvocationContextImpl(context.getMyself(), initiator);
+ if (initiator != null) {
+ activityType.addNewInitiator().set(initiator.toBaseIDType());
+ } else {
+ logger.warn("Possible Error in context that was passed. "
+ + "There was no remote invoker defined (initiator=NULL)");
+ }
+
+ // add header and body fields
+ if (header != null || body != null) {
+ InvocationMessageType request = activityType.addNewRequest();
+ if (header != null)
+ request.addNewHeader().set(header);
+ if (body != null)
+ request.addNewBody().set(body);
+ }
+
+ sendNotification(context, activity, descriptionAndAnnotation, "[Service is invoked]");
+
+ return invocationContext;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void sendingResult(WorkflowTrackingContext wtcontext, InvocationContext context,
+ String... descriptionAndAnnotation) {
+
+ sendingResult(wtcontext, context, null, null, descriptionAndAnnotation);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void sendingResult(WorkflowTrackingContext wtcontext, InvocationContext context, XmlObject header,
+ XmlObject body, String... descriptionAndAnnotation) {
+
+ if (context == null)
+ throw new RuntimeException("Context passed was NULL.");
+
+ SendingResultDocument activity = SendingResultDocument.Factory.newInstance();
+ ResultResponderType activityType = activity.addNewSendingResult();
+
+ // set the receiver to the remote entity
+ if (context.getRemoteEntity() != null) {
+ activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
+ } else {
+ logger.warn("Possible Error in context that was passed. "
+ + "There was no remote entity defined (responseReceiver=NULL)");
+ }
+
+ // add header and body fields
+ if (header != null || body != null) {
+ InvocationMessageType result = activityType.addNewResult();
+ if (header != null)
+ result.addNewHeader().set(header);
+ if (body != null)
+ result.addNewBody().set(body);
+ }
+
+ sendNotification(wtcontext, activity, descriptionAndAnnotation,
+ "[Trying to send successful result of invocation]");
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ *
+ */
+ public void sendingFault(WorkflowTrackingContext wtcontext, InvocationContext context,
+ String... descriptionAndAnnotation) {
+
+ sendingFault(wtcontext, context, null, null, descriptionAndAnnotation);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void sendingFault(WorkflowTrackingContext wtcontext, InvocationContext context, XmlObject header,
+ XmlObject faultBody, String... descriptionAndAnnotation) {
+
+ if (context == null)
+ throw new RuntimeException("Context passed was NULL.");
+
+ SendingFaultDocument activity = SendingFaultDocument.Factory.newInstance();
+ FaultResponderType activityType = activity.addNewSendingFault();
+
+ // set the receiver to the remote entity
+ if (context.getRemoteEntity() != null) {
+ activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
+ } else {
+ logger.warn("Error in context that was passed. "
+ + "There was no remote entity defined (responseReceiver=NULL)");
+ }
+
+ // add header and body fields
+ if (header != null || faultBody != null) {
+ FaultMessageType result = activityType.addNewFault();
+ if (header != null)
+ result.addNewHeader().set(header);
+ if (faultBody != null)
+ result.addNewBody().set(faultBody);
+ }
+
+ sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Trying to sending fault from invocation]");
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void sendingResponseSucceeded(WorkflowTrackingContext wtcontext, InvocationContext context,
+ String... descriptionAndAnnotation) {
+
+ if (context == null)
+ throw new RuntimeException("Context passed was NULL.");
+
+ SendingResponseSucceededDocument activity = SendingResponseSucceededDocument.Factory.newInstance();
+ AcknowledgeSuccessType activityType = activity.addNewSendingResponseSucceeded();
+
+ // set the remote entity as receiver
+ if (context.getRemoteEntity() != null) {
+ activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
+ } else {
+ logger.warn("Error in context that was passed. "
+ + "there was no remote entity defined (responseReceiver=NULL)");
+ }
+
+ sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Successfully sent response of invocation]");
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void sendingResponseFailed(WorkflowTrackingContext wtcontext, InvocationContext context,
+ String... descriptionAndAnnotation) {
+
+ sendingResponseFailed(wtcontext, context, null, descriptionAndAnnotation);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public void sendingResponseFailed(WorkflowTrackingContext wtcontext, InvocationContext context, Throwable trace,
+ String... descriptionAndAnnotation) {
+
+ if (context == null)
+ throw new RuntimeException("Context passed was NULL.");
+
+ SendingResponseFailedDocument activity = SendingResponseFailedDocument.Factory.newInstance();
+ AcknowledgeFailureType activityType = activity.addNewSendingResponseFailed();
+
+ // set the remote entity as receiver
+ if (context.getRemoteEntity() != null) {
+ activityType.addNewReceiver().set(context.getRemoteEntity().toBaseIDType());
+ } else {
+ logger.warn("Error in context that was passed. "
+ + "there was no remote entity defined (responseReceiver=NULL)");
+ }
+
+ // set stack trace if present
+ if (trace != null) {
+ final StringWriter sw = new StringWriter();
+ trace.printStackTrace(new PrintWriter(sw));
+
+ XmlString traceXmlStr = XmlString.Factory.newInstance();
+ traceXmlStr.setStringValue(sw.toString());
+ activityType.addNewFailure().addNewTrace().set(traceXmlStr);
+ }
+
+ sendNotification(wtcontext, activity, descriptionAndAnnotation, "[Unable to send result of invocation]");
+ }
+
+ // /////////////////////////////////////////////////////////////////////////////
+ // //
+ // DATA PROVENANCE //
+ // //
+ // /////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public DataObj dataConsumed(WorkflowTrackingContext context, URI dataId, List<URI> locations,
+ String... descriptionAndAnnotation) {
+
+ DataObj dataObj = new DataObjImpl(dataId, locations);
+ return dataConsumed(context, dataObj, descriptionAndAnnotation);
+ }
+
+ public DataObj dataConsumed(WorkflowTrackingContext context, URI dataId, List<URI> locations, int sizeInBytes,
+ String... descriptionAndAnnotation) {
+
+ DataObj dataObj = new DataObjImpl(dataId, locations, sizeInBytes);
+ return dataConsumed(context, dataObj, descriptionAndAnnotation);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public DataObj dataConsumed(WorkflowTrackingContext context, DataObj dataObj, String... descriptionAndAnnotation) {
+ InvocationEntity entity = context.getMyself();
+ if (entity == null)
+ throw new RuntimeException("Local entity passed was NULL.");
+ if (dataObj == null)
+ throw new RuntimeException("Data object passed was NULL.");
+ if (dataObj.getId() == null)
+ throw new RuntimeException("Data object's ID was NULL.");
+
+ DataConsumedDocument activity = DataConsumedDocument.Factory.newInstance();
+ DataProductNotificationType activityType = activity.addNewDataConsumed();
+
+ // set the data product to the consumed data
+ DataProductType dataProduct = activityType.addNewDataProduct();
+ // set data ID and size
+ dataProduct.setId(dataObj.getId().toString());
+ dataProduct.setSizeInBytes(dataObj.getSizeInBytes());
+ // set data URLs
+ List<URI> locations = dataObj.getLocations();
+ for (URI location : locations) {
+ dataProduct.addLocation(location.toString());
+ }
+ // set data timestampp
+ final Calendar cal = new GregorianCalendar();
+ cal.setTime(activityTimestamp != null ? activityTimestamp : new Date());
+ dataProduct.setTimestamp(cal);
+
+ sendNotification(context, activity, descriptionAndAnnotation, "[consumed: ID=<" + dataObj.getId().toString()
+ + ">; URL=<#" + locations.size() + "><" + (locations.size() > 0 ? locations.get(0) : "") + ">]");
+
+ return dataObj;
+ }
+
+ // /**
+ // * Adds the file/directory was used by this invocation to the current dataConsuemd
+ // * notification batch. If the notification batch did not exist, it is created. The notification
+ // * is not sent until {@link #flush()} is called.
+ // *
+ // * @param entity identity of the workflow/service's invocation that consumed this file
+ // * @param dataObj data object recording the dataId, local/remote URLs, timestamp of
+ // * the file/dir, that was returned by another data notification method
+ // * @param descriptionAndAnnotation optional vararg. The first element is used as the
+ // * human readable description for this notification. The subsequent strings need to be
+ // * serialized XML fragments that are added as annotation to the notification.
+ // *
+ // * @return the data object passed to this method with file/dir size filled in if not
+ // * already when passed.
+ // *
+ // */
+ // protected DataObj dataConsumedBatched(WorkflowTrackingContext context, InvocationEntity entity, DataObj dataObj,
+ // String...descriptionAndAnnotation) {
+ //
+ // if(entity == null) throw new RuntimeException("Local entity passed was NULL.");
+ // if(dataObj == null) throw new RuntimeException("Data object passed was NULL.");
+ // if(dataObj.getId() == null) throw new RuntimeException("Data object's ID was NULL.");
+ //
+ // if (dataConsumedBatchActivity == null) {
+ //
+ // // create initial consumed notification container
+ // dataConsumedBatchActivity = DataConsumedDocument.Factory.newInstance();
+ // DataProductNotificationType activityType = dataConsumedBatchActivity.addNewDataConsumed();
+ //
+ //
+ // }
+ //
+ // // get existing consumed notification container
+ // DataProductNotificationType activityType = dataConsumedBatchActivity.addNewDataConsumed();
+ //
+ // // add nre data product to the consumed data
+ // DataProductType dataProduct = activityType.addNewDataProduct();
+ // // set data ID and size
+ // dataProduct.setId(dataObj.getId().toString());
+ // dataProduct.setSizeInBytes(dataObj.getSizeInBytes());
+ // // set data URLs
+ // List<URI> locations = dataObj.getLocations();
+ // for(URI location : locations){
+ // dataProduct.addLocation(location.toString());
+ // }
+ // // set data timestampp
+ // final Calendar cal = new GregorianCalendar();
+ // cal.setTime(activityTimestamp != null ? activityTimestamp : new Date());
+ // dataProduct.setTimestamp(cal);
+ //
+ // sendNotification(context, activityType, descriptionAndAnnotation,
+ // "[consumed: ID=<" + dataObj.getId().toString() +
+ // ">; URL=<#" + locations.size() + "><" +
+ // (locations.size() > 0 ? locations.get(0) : "") +
+ // ">]"
+ // );
+ //
+ // return dataObj;
+ // }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public DataObj dataProduced(WorkflowTrackingContext context, URI dataId, List<URI> locations,
+ String... descriptionAndAnnotation) {
+
+ DataObj dataObj = new DataObjImpl(dataId, locations);
+ return dataProduced(context, dataObj, descriptionAndAnnotation);
+ }
+
+ public DataObj dataProduced(WorkflowTrackingContext context, URI dataId, List<URI> locations, int sizeInBytes,
+ String... descriptionAndAnnotation) {
+
+ DataObj dataObj = new DataObjImpl(dataId, locations, sizeInBytes);
+ return dataProduced(context, dataObj, descriptionAndAnnotation);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ public DataObj dataProduced(WorkflowTrackingContext context, DataObj dataObj, String... descriptionAndAnnotation) {
+ InvocationEntity entity = context.getMyself();
+ if (entity == null)
+ throw new RuntimeException("Local entity passed was NULL.");
+ if (dataObj == null)
+ throw new RuntimeException("Data object passed was NULL.");
+ if (dataObj.getId() == null)
+ throw new RuntimeException("Data object's ID was NULL.");
+
+ DataProducedDocument activity = DataProducedDocument.Factory.newInstance();
+ DataProductNotificationType activityType = activity.addNewDataProduced();
+
+ // set the data product to the produced data
+ DataProductType dataProduct = activityType.addNewDataProduct();
+ // set data ID and size
+ dataProduct.setId(dataObj.getId().toString());
+ dataProduct.setSizeInBytes(dataObj.getSizeInBytes());
+ // set data URLs
+ List<URI> locations = dataObj.getLocations();
+ for (URI location : locations) {
+ dataProduct.addLocation(location.toString());
+ }
+ // set data timestampp
+ final Calendar cal = new GregorianCalendar();
+ cal.setTime(activityTimestamp != null ? activityTimestamp : new Date());
+ dataProduct.setTimestamp(cal);
+
+ sendNotification(context, activity, descriptionAndAnnotation, "[produced: ID=<" + dataObj.getId().toString()
+ + ">; URL=<#" + locations.size() + "><" + (locations.size() > 0 ? locations.get(0) : "") + ">]");
+
+ return dataObj;
+ }
+
+ /**
+ * Adds the file/directory was used by this invocation to the current dataProduced notification batch. If the
+ * notification batch did not exist, it is created. The notification is not sent untill {@link #flush()} is called.
+ *
+ * @param entity
+ * identity of the workflow/service's invocation that produced this file
+ * @param dataObj
+ * data object recording the dataId, local/remote URLs, timestamp of the file/dir, that was returned by
+ * another data notification method
+ * @param descriptionAndAnnotation
+ * optional vararg. The first element is used as the human readable description for this notification.
+ * The subsequent strings need to be serialized XML fragments that are added as annotation to the
+ * notification.
+ *
+ * @return the data object passed to this method with file/dir size filled in if not already when passed.
+ *
+ */
+ protected DataObj dataProducedBatched(WorkflowTrackingContext context, InvocationEntity entity, DataObj dataObj,
+ String... descriptionAndAnnotation) {
+
+ if (entity == null)
+ throw new RuntimeException("Local entity passed was NULL.");
+ if (dataObj == null)
+ throw new RuntimeException("Data object passed was NULL.");
+ if (dataObj.getId() == null)
+ throw new RuntimeException("Data object's ID was NULL.");
+
+ if (dataProducedBatchActivity == null) {
+
+ // create initial produced notification container
+ dataProducedBatchActivity = DataProducedDocument.Factory.newInstance();
+ }
+
+ // get existing produced notification container
+ DataProductNotificationType activityType = dataProducedBatchActivity.addNewDataProduced();
+
+ // add new data product to the produced data
+ DataProductType dataProduct = activityType.addNewDataProduct();
+ // set data ID and size
+ dataProduct.setId(dataObj.getId().toString());
+ dataProduct.setSizeInBytes(dataObj.getSizeInBytes());
+ // set data URLs
+ List<URI> locations = dataObj.getLocations();
+ for (URI location : locations) {
+ dataProduct.addLocation(location.toString());
+ }
+ // set data timestamp
+ final Calendar cal = new GregorianCalendar();
+ cal.setTime(activityTimestamp != null ? activityTimestamp : new Date());
+ dataProduct.setTimestamp(cal);
+
+ // add description, and annotation to DATA PRODUCT if present
+ sendNotification(context, dataProducedBatchActivity, descriptionAndAnnotation, "[produced: ID=<"
+ + dataObj.getId().toString() + ">; URL=<#" + locations.size() + "><"
+ + (locations.size() > 0 ? locations.get(0) : "") + ">]");
+
+ return dataObj;
+ }
+
+}
Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java Sun Jul 3 18:32:59 2011
@@ -0,0 +1,207 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.publish;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.airavata.workflow.tracking.util.LinkedMessageQueue;
+import org.apache.airavata.workflow.tracking.util.Timer;
+import org.apache.log4j.Logger;
+import org.apache.xmlbeans.XmlObject;
+
+/**
+ * Abstract method to publish messages in sync or async mode. In async mode, the messages are kept in an in-memory queue
+ * and published. Calling flush() blocks till all messages are sent and the queue is empty. In sync mode, the call
+ * blocks till the message is transmitted.
+ */
+public abstract class AbstractPublisher implements Runnable, NotificationPublisher {
+
+ protected static final org.apache.log4j.Logger logger = Logger.getLogger(AbstractPublisher.class);
+ protected static final boolean IS_LOG_FINEST = logger.isDebugEnabled();
+ private final LinkedMessageQueue<BrokerEntry> messageQueue;
+ protected static final boolean IS_TIMER = Boolean.getBoolean("ENABLE_TIMER");
+ protected static final Timer notifTP = Timer.init("PubNotif");
+
+ private boolean finished = false;
+ private final Lock LOCK = new ReentrantLock();
+ private final Condition CONDITION = LOCK.newCondition();
+ private static int PUB_ID = 0;
+
+ private final boolean IS_DEFAULT_MODE_ASYNC;
+
+ private boolean deleted = false;
+
+ private final Thread pubThread;
+
+ protected AbstractPublisher(int capacity, boolean defaultAsync) {
+
+ messageQueue = new LinkedMessageQueue<BrokerEntry>(capacity);
+ IS_DEFAULT_MODE_ASYNC = defaultAsync;
+ deleted = false;
+ pubThread = new Thread(this, "PUBLISHER #" + PUB_ID++);
+ pubThread.setDaemon(true);
+ pubThread.start();
+ }
+
+ // public abstract void publishSync(String leadMessage);
+ public final void delete() {
+ deleted = true;
+ pubThread.interrupt();
+ }
+
+ public final boolean isDeleted() {
+ return deleted;
+ }
+
+ public final void publish(String leadMessage) {
+
+ if (IS_DEFAULT_MODE_ASYNC) {
+ publishAsync(leadMessage);
+ } else {
+ publishSync(leadMessage);
+ }
+ }
+
+ public final void publish(XmlObject xmlMessage) {
+
+ if (IS_DEFAULT_MODE_ASYNC) {
+ publishAsync(xmlMessage);
+ } else {
+ publishSync(xmlMessage.xmlText());
+ }
+ }
+
+ public final void publishAsync(String leadMessage) {
+
+ if (IS_LOG_FINEST) {
+ logger.debug("ASYNC: adding to queue, notification: " + leadMessage);
+ }
+ final BrokerEntry brokerEntry = new BrokerEntry(leadMessage);
+ try {
+ messageQueue.put(brokerEntry);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Publisher interrupted. Is it being deleted!?");
+ }
+ }
+
+ public final void publishAsync(XmlObject xmlMessage) {
+
+ if (IS_LOG_FINEST) {
+ logger.debug("ASYNC: adding to queue, notification: " + xmlMessage);
+ }
+
+ final BrokerEntry brokerEntry = new BrokerEntry(xmlMessage);
+ try {
+ messageQueue.put(brokerEntry);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Publisher interrupted. Is it being deleted!?");
+ }
+ }
+
+ public final void publishSync(XmlObject xmlMessage) {
+
+ if (IS_LOG_FINEST) {
+ logger.debug("SYNC: sending notification: " + xmlMessage);
+ }
+ publishSync(xmlMessage.xmlText());
+ }
+
+ public final void flush() {
+
+ finished = true;
+ LOCK.lock();
+ while (messageQueue.size() > 0) {
+ try {
+ // wait to be signalled that all messages were sent...
+ CONDITION.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Publisher interrupted. Is it being deleted!?");
+ }
+ }
+ finished = false;
+ CONDITION.signal(); // send ack...
+ LOCK.unlock();
+ return;
+ }
+
+ public final void run() {
+
+ BrokerEntry brokerEntry = null;
+ while (true) {
+
+ try {
+ // get the head from queue, but dont remove it yet
+ // block for message to arrive only if not finished;
+ // if finished, dont block...just quit
+ brokerEntry = finished ? messageQueue.peek() : messageQueue.get();
+ if (brokerEntry == null) {
+
+ // the queue has been flushed
+ if (finished) {
+ LOCK.lock();
+ CONDITION.signal(); // signal flushed queue...
+ try {
+ CONDITION.await(); // and wait for ack.
+ } catch (InterruptedException e) {
+ throw e;
+ }
+ LOCK.unlock();
+ } else { /* ignore...this should not happen */
+ }
+
+ // go back to to start and wait for new message in flushed queue...
+ continue;
+
+ } else {
+
+ if (IS_LOG_FINEST) {
+ logger.debug("ASYNC: sending notification: " + brokerEntry.getMessage());
+ }
+
+ // publish message
+ publishSync(brokerEntry.getMessage());
+
+ // remove the published head from queue
+ messageQueue.poll();
+ }
+
+ } catch (InterruptedException e) {
+ if (deleted)
+ break;
+ else
+ logger.fatal("Interrupted when queue size: " + messageQueue.size() + ". deleted == false", e);
+ } catch (RuntimeException e) {
+
+ logger.fatal("Runtime Error: " + e.getMessage());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Runtime Error at message: "
+ + (brokerEntry != null ? brokerEntry.getMessage() : "NULL") + "; queue size: "
+ + messageQueue.size(), e);
+ }
+ // fixme: we should remove the entry from queue if it cannot be sent...
+ // otherwise, if broker is down, this may cause an infinite loop!!!
+ }
+ }
+ }
+}
Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/BrokerEntry.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/BrokerEntry.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/BrokerEntry.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/BrokerEntry.java Sun Jul 3 18:32:59 2011
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.publish;
+
+import org.apache.xmlbeans.XmlObject;
+
+final class BrokerEntry {
+
+ public BrokerEntry(String message) {
+ this.message = message;
+ this.xml = null;
+ }
+
+ public BrokerEntry(XmlObject xml) {
+ this.xml = xml;
+ this.message = null;
+ }
+
+ public final String getMessage() {
+ if (message == null)
+ return xml.xmlText();
+ else
+ return message;
+ }
+
+ public final XmlObject getXmlObject() {
+ return xml;
+ }
+
+ private final String message;
+ private final XmlObject xml;
+}
Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/LoopbackPublisher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/LoopbackPublisher.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/LoopbackPublisher.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/LoopbackPublisher.java Sun Jul 3 18:32:59 2011
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.publish;
+
+import java.io.PrintStream;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+
+import org.apache.airavata.workflow.tracking.client.Callback;
+import org.apache.airavata.workflow.tracking.client.NotificationType;
+import org.apache.airavata.workflow.tracking.common.ConstructorConsts;
+import org.apache.airavata.workflow.tracking.common.ConstructorProps;
+import org.apache.airavata.workflow.tracking.util.MessageUtil;
+import org.apache.xmlbeans.XmlException;
+import org.apache.xmlbeans.XmlObject;
+
+/**
+ * acts as a workflow tracking NotificationPublisher that calls a workflow tracking callback listener without having to
+ * pass though a real notification broker. Default listener prints to stdout
+ */
+public class LoopbackPublisher extends AbstractPublisher implements NotificationPublisher {
+
+ private Callback listener;
+ private String topic;
+ private static int globalCount = 0;
+
+ public LoopbackPublisher(Callback listener_, String topic_) {
+ super(10, false); // capacity, async
+ topic = topic_;
+ listener = listener_;
+ if (listener == null) {
+ listener = new Callback() {
+ int count = 0;
+
+ public void deliverMessage(String topic, NotificationType notificationType, XmlObject messageObj) {
+
+ System.out
+ .printf("----\nReceived Message [L:%d/G:%d] on topic [%s] of type [%s] with payload:\n[%s]\n====\n",
+ count, globalCount, topic, notificationType, messageObj);
+ count++;
+ globalCount++;
+ }
+ };
+ }
+ }
+
+ public LoopbackPublisher(final PrintStream out_, String topic_) {
+ this(new Callback() {
+ int count = 0;
+
+ public void deliverMessage(String topic, NotificationType notificationType, XmlObject messageObj) {
+
+ out_.printf(
+ "----\nReceived Message [L:%d/G:%d] on topic [%s] of type [%s] with payload:\n[%s]\n====\n",
+ count, globalCount, topic, notificationType, messageObj);
+ count++;
+ globalCount++;
+ }
+ }, topic_);
+ }
+
+ public LoopbackPublisher(String topic_) {
+ this(System.out, topic_);
+ }
+
+ public LoopbackPublisher(ConstructorProps props) {
+ this((Callback) props.get(ConstructorConsts.CALLBACK_LISTENER), (String) props.get(ConstructorConsts.TOPIC));
+ }
+
+ /**
+ * Method publishSync
+ *
+ * @param message
+ * a String message that should be a valid XML String (serialized XML document)
+ *
+ */
+ public void publishSync(String message) {
+
+ try {
+ XmlObject xmlMessage = XmlObject.Factory.parse(message);
+ NotificationType type = MessageUtil.getType(xmlMessage);
+ listener.deliverMessage(topic, type, xmlMessage);
+ } catch (XmlException e) {
+ System.err.println("Error parsing workflow tracking message : [" + message + "]\n" + "as an XML Object");
+ e.printStackTrace();
+ }
+ }
+
+ public static void main(String args[]) {
+
+ LoopbackPublisher publisher = new LoopbackPublisher(new Callback() {
+ int count = 0;
+
+ public void deliverMessage(String topic, NotificationType notificationType, XmlObject messageObj) {
+
+ System.out.printf("----\nReceived Message [%d] on topic [%s] of type [%s] with payload:\n[%s]\n====\n",
+ count++, topic, notificationType, messageObj);
+ }
+ }, "testTopic");
+
+ // create & publish log message
+ {
+ LogInfoDocument logMsg = LogInfoDocument.Factory.newInstance();
+ BaseNotificationType log = logMsg.addNewLogInfo();
+ // add timestamp
+ final Calendar cal = new GregorianCalendar();
+ cal.setTime(new Date());
+ log.setTimestamp(cal);
+ // add notification source
+ BaseIDType baseID = BaseIDType.Factory.newInstance();
+ baseID.setServiceID("http://tempuri.org/test_service");
+ log.addNewNotificationSource().set(baseID);
+ // add description
+ log.setDescription("A test message");
+
+ // publish message as XML Object
+ publisher.publish(logMsg);
+ }
+ // create & publish publishURl message
+ {
+ // create publish URL message
+ PublishURLDocument pubMsg = PublishURLDocument.Factory.newInstance();
+ PublishURLDocument.PublishURL pub = pubMsg.addNewPublishURL();
+ // add timestamp
+ final Calendar cal = new GregorianCalendar();
+ cal.setTime(new Date());
+ pub.setTimestamp(cal);
+ // add notification source
+ BaseIDType baseID = BaseIDType.Factory.newInstance();
+ baseID.setServiceID("http://tempuri.org/test_service");
+ pub.addNewNotificationSource().set(baseID);
+ pub.setTitle("Some URL's Title");
+ pub.setLocation("http://tempuri.org/published_url");
+
+ // publish message as XML string
+ publisher.publish(pubMsg.xmlText());
+ }
+ }
+
+}
Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/NotificationPublisher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/NotificationPublisher.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/NotificationPublisher.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/NotificationPublisher.java Sun Jul 3 18:32:59 2011
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.publish;
+
+import org.apache.xmlbeans.XmlObject;
+
+public interface NotificationPublisher {
+
+ public void publish(String leadMessage);
+
+ public void publish(XmlObject xmlMessage);
+
+ public void publishSync(String leadMessage);
+
+ public void publishSync(XmlObject xmlMessage);
+
+ public void publishAsync(String leadMessage);
+
+ public void publishAsync(XmlObject xmlMessage);
+
+ public void flush();
+
+ public void delete();
+}
Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/WSMPublisher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/WSMPublisher.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/WSMPublisher.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/WSMPublisher.java Sun Jul 3 18:32:59 2011
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.publish;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.URL;
+import java.util.Properties;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.commons.WorkFlowUtils;
+import org.apache.airavata.workflow.tracking.WorkflowTrackingException;
+import org.apache.airavata.workflow.tracking.common.WorkflowTrackingContext;
+import org.apache.airavata.workflow.tracking.util.ConfigKeys;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
+import org.apache.log4j.Logger;
+
+/**
+ * Publish WS-Eventing messages using WS-Messenger client API
+ *
+ */
+public class WSMPublisher extends AbstractPublisher implements NotificationPublisher {
+
+ protected final WseMsgBrokerClient broker;
+ protected final EndpointReference brokerEpr;
+ protected Properties configs = new Properties();
+
+ private Logger log = Logger.getLogger(WSMPublisher.class);
+
+ public WSMPublisher(WorkflowTrackingContext context) {
+ this(10, context.isEnableAsyncPublishing(), context.getBrokerEpr());
+ }
+
+ public WSMPublisher(int capacity, boolean defaultAsync, String brokerLoc, String topic) throws IOException {
+ super(capacity, defaultAsync);
+ URL configURL = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+ configs.load(configURL.openStream());
+ broker = new WseMsgBrokerClient();
+ brokerEpr = broker.createEndpointReference(brokerLoc, topic);
+ broker.init(brokerEpr.getAddress());
+ }
+
+ public WSMPublisher(int capacity, boolean defaultAsync, EndpointReference brokerEpr_)
+ throws WorkflowTrackingException {
+ super(capacity, defaultAsync);
+ try {
+ URL configURL = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+ configs.load(configURL.openStream());
+ brokerEpr = brokerEpr_;
+ broker = new WseMsgBrokerClient();
+ broker.init(brokerEpr_.getAddress());
+ } catch (IOException e) {
+ throw new WorkflowTrackingException(e);
+ }
+ }
+
+ public WSMPublisher(int capacity, boolean defaultAsync, String brokerEpr_) throws IOException {
+
+ this(capacity, defaultAsync, brokerEpr_, false);
+
+ }
+
+ public WSMPublisher(int capacity, boolean defaultAsync, String brokerEpr_, boolean isXmlEpr) throws IOException {
+
+ super(capacity, defaultAsync);
+ URL configURL = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+ configs.load(configURL.openStream());
+
+ if (!isXmlEpr) {
+ brokerEpr = new EndpointReference(brokerEpr_);// EndpointReferenceHelper.fro(brokerEpr_);
+
+ } else {
+ brokerEpr = EndpointReferenceHelper.fromString(brokerEpr_);
+ }
+
+ broker = new WseMsgBrokerClient();
+ broker.init(brokerEpr.getAddress());
+ }
+
+ /**
+ * Method publishSync
+ *
+ * @param leadMessage
+ * a String
+ *
+ */
+ public void publishSync(String leadMessage) {
+ if (isDeleted())
+ throw new RuntimeException("Publisher has been deleted!");
+ if (IS_LOG_FINEST) {
+ logger.debug("publishing notification to messenger broker: " + leadMessage);
+ }
+ try {
+ OMElement msg = WorkFlowUtils.reader2OMElement(new StringReader(leadMessage));
+ broker.publish(null, msg);
+
+ } catch (MsgBrokerClientException e) {
+ log.error("unablet to publish the lead message", e);
+ } catch (XMLStreamException e) {
+ log.error("unable to parse the load message - " + leadMessage, e);
+ }
+ }
+
+}
Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataDurationImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataDurationImpl.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataDurationImpl.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataDurationImpl.java Sun Jul 3 18:32:59 2011
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.state;
+
+import java.net.URI;
+
+import org.apache.airavata.workflow.tracking.common.DataDurationObj;
+import org.apache.airavata.workflow.tracking.common.DataObj;
+
+public class DataDurationImpl extends DurationImpl implements DataDurationObj {
+
+ protected DataObj dataObj;
+ protected URI remoteLocation;
+
+ public DataDurationImpl(DataObj dataObj_, URI remoteLocation_) {
+
+ super(); // set start time to now
+ dataObj = dataObj_;
+ remoteLocation = remoteLocation_;
+ }
+
+ public DataDurationImpl(DataObj dataObj_, URI remoteLocation_, long fixedDuration) {
+
+ super(fixedDuration); // set duration to passed value
+ dataObj = dataObj_;
+ remoteLocation = remoteLocation_;
+ }
+
+ public DataObj getDataObj() {
+
+ return dataObj;
+ }
+
+ public URI getRemoteLocation() {
+
+ return remoteLocation;
+ }
+
+}
Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataObjImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataObjImpl.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataObjImpl.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DataObjImpl.java Sun Jul 3 18:32:59 2011
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.state;
+
+import java.io.File;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.airavata.workflow.tracking.common.DataObj;
+
+public class DataObjImpl implements DataObj {
+
+ protected URI dataId;
+ protected List<URI> locations;
+ protected long sizeInBytes = -1;
+
+ public DataObjImpl(URI dataId_, List<URI> location_) {
+
+ dataId = dataId_;
+ if (dataId == null || dataId.toString().length() == 0)
+ throw new RuntimeException("Data ID cannot be NULL or empty");
+
+ locations = location_;
+ }
+
+ public DataObjImpl(URI dataId_, List<URI> location_, long sizeInBytes_) {
+
+ this(dataId_, location_);
+ sizeInBytes = sizeInBytes_;
+ }
+
+ public URI getId() {
+
+ return dataId;
+ }
+
+ public URI getLocalLocation() {
+
+ return locations != null && locations.size() > 0 ? locations.get(0) : null;
+ }
+
+ public List<URI> getLocations() {
+
+ return locations;
+ }
+
+ public long getSizeInBytes() {
+
+ // skip getting bytes if already calculated or not possible to calculate
+ if (sizeInBytes >= 0 || locations == null || locations.size() == 0)
+ return sizeInBytes;
+
+ // check if the location is a local file. If so, we calculate the size.
+ URI location = locations.get(0);
+ String scheme = location.getScheme();
+ String authority = location.getAuthority();
+ if ((scheme == null && authority == null) || "file".equals(scheme)) {
+ sizeInBytes = getFileSize(new File(location.getPath()));
+ }
+ return sizeInBytes;
+ }
+
+ protected static final long getFileSize(File file) {
+ if (file.isDirectory()) {
+ return getDirSize(file, 0, true);
+ } else {
+ return file.length();
+ }
+ }
+
+ private static final long getDirSize(File dir, long size, boolean recurse) {
+ File[] files = dir.listFiles();
+ if (files == null)
+ return size;
+ for (int i = 0; i < files.length; i++) {
+ if (files[i].isDirectory()) {
+ if (recurse)
+ size += getDirSize(files[i], size, recurse);
+ } else {
+ size += files[i].length();
+ }
+ }
+ return size;
+ }
+
+}
Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DurationImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DurationImpl.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DurationImpl.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/DurationImpl.java Sun Jul 3 18:32:59 2011
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.state;
+
+import org.apache.airavata.workflow.tracking.common.DurationObj;
+
+/**
+ * Convinience class to record the state of computation related notifications.
+ */
+public class DurationImpl implements DurationObj {
+
+ protected long startTimeMillis = Long.MAX_VALUE;
+ protected long endTimeMillis = Long.MIN_VALUE;
+ protected long fixedDuration = Long.MIN_VALUE;
+ protected boolean isFixedDuration = false;
+
+ public DurationImpl() {
+ startTimeMillis = System.currentTimeMillis();
+ }
+
+ public DurationImpl(long fixedDuration_) {
+ isFixedDuration = true;
+ fixedDuration = fixedDuration_;
+ }
+
+ public long markStartTimeMillis() {
+
+ this.startTimeMillis = System.currentTimeMillis();
+ return startTimeMillis;
+ }
+
+ public long getStartTimeMillis() {
+
+ return startTimeMillis;
+ }
+
+ public long markEndTimeMillis() {
+
+ this.endTimeMillis = System.currentTimeMillis();
+ return endTimeMillis;
+ }
+
+ public long getEndTimeMillis() {
+
+ return endTimeMillis;
+ }
+
+ public long getDurationMillis() {
+
+ if (isFixedDuration)
+ return fixedDuration;
+ return endTimeMillis - startTimeMillis;
+ }
+}
Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationContextImpl.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationContextImpl.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationContextImpl.java Sun Jul 3 18:32:59 2011
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.state;
+
+import org.apache.airavata.workflow.tracking.common.InvocationContext;
+import org.apache.airavata.workflow.tracking.common.InvocationEntity;
+
+public class InvocationContextImpl extends InvocationEntityImpl implements InvocationContext {
+
+ InvocationEntity remoteEntity;
+
+ public InvocationContextImpl(InvocationEntity localEntity_, InvocationEntity remoteEntity_) {
+ super(localEntity_);
+ remoteEntity = remoteEntity_;
+ }
+
+ public InvocationEntity getRemoteEntity() {
+
+ return remoteEntity;
+ }
+}
Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationEntityImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationEntityImpl.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationEntityImpl.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/state/InvocationEntityImpl.java Sun Jul 3 18:32:59 2011
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.state;
+
+import java.net.URI;
+
+import org.apache.airavata.workflow.tracking.common.InvocationEntity;
+
+public class InvocationEntityImpl implements InvocationEntity {
+
+ protected URI workflowID;
+ protected URI serviceID;
+ protected String workflowNodeID;
+ protected Integer workflowTimestep;
+
+ /**
+ * Constructor used when only service ID is available (i.e. entity not in the context of an invocation)
+ *
+ * @param serviceID_
+ * an URI
+ *
+ */
+ public InvocationEntityImpl(URI serviceID_) {
+
+ if (serviceID_ == null)
+ throw new RuntimeException("ServiceID passed was null!");
+
+ serviceID = serviceID_;
+
+ workflowID = null;
+ workflowNodeID = null;
+ workflowTimestep = null;
+ }
+
+ /**
+ * Constructor used when all IDs are potentially available (i.e. entity in the context of an invocation)
+ *
+ * @param workflowID_
+ * an URI
+ * @param serviceID_
+ * an URI
+ * @param workflowNodeID_
+ * a String
+ * @param workflowTimestep_
+ * an int
+ *
+ */
+ public InvocationEntityImpl(URI workflowID_, URI serviceID_, String workflowNodeID_, Integer workflowTimestep_) {
+
+ if (serviceID_ == null)
+ throw new RuntimeException("ServiceID passed was null!");
+
+ workflowID = workflowID_;
+ serviceID = serviceID_;
+ workflowNodeID = workflowNodeID_;
+ workflowTimestep = workflowTimestep_;
+ }
+
+ /**
+ * Copy Constructor
+ *
+ * @param source
+ * an InvocationEntity
+ *
+ */
+ protected InvocationEntityImpl(InvocationEntity source) {
+ this(source.getWorkflowID(), source.getServiceID(), source.getWorkflowNodeID(), source.getWorkflowTimestep());
+ }
+
+ public String getWorkflowNodeID() {
+
+ return workflowNodeID;
+ }
+
+ public URI getServiceID() {
+
+ return serviceID;
+ }
+
+ public Integer getWorkflowTimestep() {
+
+ return workflowTimestep;
+ }
+
+ public URI getWorkflowID() {
+
+ return workflowID;
+ }
+
+ public BaseIDType toBaseIDType() {
+
+ BaseIDType baseID = BaseIDType.Factory.newInstance();
+ if (serviceID != null)
+ baseID.setServiceID(serviceID.toString());
+ if (workflowID != null)
+ baseID.setWorkflowID(workflowID.toString());
+ if (workflowTimestep != null)
+ baseID.setWorkflowTimestep(workflowTimestep);
+ if (workflowNodeID != null)
+ baseID.setWorkflowNodeID(workflowNodeID);
+
+ return baseID;
+ }
+
+}
Added: incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/LeadNotificationHandler.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/LeadNotificationHandler.java?rev=1142475&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/LeadNotificationHandler.java (added)
+++ incubator/airavata/ws-messaging/trunk/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/LeadNotificationHandler.java Sun Jul 3 18:32:59 2011
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.tracking.impl.subscription;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.workflow.tracking.client.Callback;
+import org.apache.airavata.workflow.tracking.client.NotificationType;
+import org.apache.airavata.workflow.tracking.client.Subscription;
+import org.apache.airavata.workflow.tracking.util.MessageUtil;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.log4j.Logger;
+import org.apache.xmlbeans.XmlCursor;
+import org.apache.xmlbeans.XmlException;
+import org.apache.xmlbeans.XmlObject;
+
+/**
+ * Utility for clients to subscribe and receive Lead notifications using new message schema. The agent implements the
+ * LeadNotificationHandler.Callback interface and starts the notification handler with the broker location, the topic,
+ * and an option to pull the messages (to get around firewalls) by providing a message box url. The deliverMessage
+ * method in the Callback interface in invoked when a message with the nes LEAD message type arrives. If a
+ * LeadEvent/NCSAEvent arrives, it is silently dropped after being logged. Check the main() method for sample usage.
+ */
+public class LeadNotificationHandler implements ConsumerNotificationHandler {
+
+ private final static org.apache.log4j.Logger logger = Logger.getLogger(LeadNotificationHandler.class);
+
+ private String topic;
+
+ private String brokerLoc;
+
+ private Callback callback;
+
+ private int consumerServerPort;
+
+ public LeadNotificationHandler(String brokerLoc, String topic, Callback callback, int port) {
+ if (port == 0)
+ this.consumerServerPort = 2222;
+ else
+ this.consumerServerPort = port;
+ this.brokerLoc = brokerLoc;
+ this.topic = topic;
+ this.callback = callback;
+
+ }
+
+ /**
+ * NON API Method. Use LeadNotificationManager.CreateSubscription() method to create a subscription
+ *
+ * @param topic
+ * @param callback
+ * @return
+ * @throws Exception
+ */
+ public Subscription createSubscription() throws Exception {
+ WseMsgBrokerClient wseClient = new WseMsgBrokerClient();
+ wseClient.init(brokerLoc);
+ logger.debug("Starting Subscription for topic [" + topic + "]at the broker location:" + brokerLoc);
+ ConsumerServer xs = new ConsumerServer(consumerServerPort, this);
+ xs.start();
+ String subscriptionId = wseClient.subscribe(xs.getConsumerServiceEPRs()[0], topic, null);
+ logger.info("The consumer server started on EPR" + xs.getConsumerServiceEPRs()[0]);
+ Subscription subscription = new Subscription(xs, subscriptionId, topic, callback, brokerLoc);
+ return subscription;
+ }
+
+ /**
+ * NONAPI method Method handleNotification. Called by the message broker when a message arrives at the subscribed
+ * topic. Should NOT be called locally. This method will call the Callback interface's deliverMessage when a valid
+ * Lead Message is received.
+ *
+ * @param messageBody
+ * the soap message body containing the notification message
+ *
+ */
+
+ public void handleNotification(SOAPEnvelope envelope) {
+ OMElement messageContent = envelope.getBody().getFirstElement();
+ SOAPHeader soapHeader = envelope.getHeader();
+ OMElement topicEl = soapHeader.getFirstChildWithName(new QName(null, "Topic"));
+ XmlObject messageObj = null;
+
+ if (topicEl != null) {
+ if (topicEl.getChildElements().hasNext()) {
+ OMElement widgetTopicOMEl = (OMElement) topicEl.getChildElements().next();
+ String widgetTopicString = null;
+ try {
+ widgetTopicString = widgetTopicOMEl.toStringWithConsume();
+ } catch (XMLStreamException e) {
+ // TODO add with throws
+ e.printStackTrace();
+ }
+ String[] topicSubstrings = widgetTopicString.split(":");
+ if (topicSubstrings.length > 1) {
+ topic = topicSubstrings[1];
+ }
+ }
+ }
+
+ if (topic != null) {
+ try {
+ try {
+ messageObj = XmlObject.Factory.parse(messageContent.toStringWithConsume());
+ } catch (XMLStreamException e) {
+ // TODO add with throws
+ e.printStackTrace();
+ }
+ XmlCursor xc = messageObj.newCursor();
+ xc.toNextToken();
+
+ xc.dispose();
+ } catch (XmlException e) {
+ logger.fatal("error parsing message content: " + messageContent, e);
+ e.printStackTrace();
+ }
+ NotificationType type = MessageUtil.getType(messageObj);
+ this.callback.deliverMessage(topic, type, messageObj);
+
+ } else {
+ logger.info("Notification came without a Notification Topic:" + envelope);
+ }
+ }
+
+}