You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2011/10/27 21:41:44 UTC
svn commit: r1189967 [2/2] - in /incubator/airavata/trunk/modules:
commons/workflow-tracking/
commons/workflow-tracking/src/test/java/org/apache/airavata/workflow/tracking/tests/
commons/workflow-tracking/src/test/resources/ test-suite/src/test/java/or...
Added: incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/samples/workflow/SimpleWorkflowExecution.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/samples/workflow/SimpleWorkflowExecution.java?rev=1189967&view=auto
==============================================================================
--- incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/samples/workflow/SimpleWorkflowExecution.java (added)
+++ incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/samples/workflow/SimpleWorkflowExecution.java Thu Oct 27 19:41:43 2011
@@ -0,0 +1,473 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.test.suite.workflowtracking.tests.samples.workflow;
+
+import java.io.ByteArrayOutputStream;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+
+import org.apache.airavata.workflow.tracking.GenericNotifier;
+import org.apache.airavata.workflow.tracking.Notifier;
+import org.apache.airavata.workflow.tracking.NotifierFactory;
+import org.apache.airavata.workflow.tracking.ProvenanceNotifier;
+import org.apache.airavata.workflow.tracking.WorkflowNotifier;
+import org.apache.airavata.workflow.tracking.client.Subscription;
+import org.apache.airavata.workflow.tracking.common.AnnotationConsts;
+import org.apache.airavata.workflow.tracking.common.AnnotationProps;
+import org.apache.airavata.workflow.tracking.common.ConstructorConsts;
+import org.apache.airavata.workflow.tracking.common.ConstructorProps;
+import org.apache.airavata.workflow.tracking.common.InvocationContext;
+import org.apache.airavata.workflow.tracking.common.InvocationEntity;
+import org.apache.airavata.workflow.tracking.common.WorkflowTrackingContext;
+import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
+import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.util.XMLPrettyPrinter;
+import org.apache.xmlbeans.XmlException;
+import org.apache.xmlbeans.XmlObject;
+import org.apache.xmlbeans.impl.tool.XSTCTester.TestCase;
+import org.junit.*;
+
+public class SimpleWorkflowExecution extends TestCase {
+
+ /**
+ * This class is not instantiated. So have a private default constructor.
+ *
+ */
+ Subscription subscription;
+ Properties configs = new Properties();
+ String BROKER_URL = "http://127.0.0.1:8080/axis2/services/EventingService/topic/Foo";
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ // URL configURL = ClassLoader
+ // .getSystemResource(TestConfigKeys.CONFIG_FILE_NAME);
+ // configs.load(configURL.openStream());
+ // BROKER_URL = configs
+ // .getProperty(TestConfigKeys.BROKER_EVENTING_SERVICE_EPR);
+ // MESSAGEBOX_URL =
+ // configs.getProperty(TestConfigKeys.MSGBOX_SERVICE_EPR);
+ // consumerPort = Integer.parseInt(configs
+ // .getProperty(TestConfigKeys.CONSUMER_PORT));
+ // BROKER_EVENTING = configs
+ // .getProperty(TestConfigKeys.BROKER_EVENTING_SERVICE_EPR);
+
+ DATA_URI_1 = new URI("lead:djsdhgfsdf");
+ DATA_URI_2 = new URI("lead:skfdjhgkfg");
+ DATA_URI_3 = new URI("lead:hgkdfhkgfd");
+ DATA_URI_4 = new URI("lead:lshjkhdgdf");
+ DATA_URI_5 = new URI("lead:fghkfhgfdg");
+ DATA_URI_6 = new URI("lead:dshiuwekds");
+
+ DATA_URLS_1 = Arrays.asList(new URI[] { new URI("http://dataserver/foo/1") });
+ DATA_URLS_2 = Arrays.asList(new URI[] { new URI("http://dataserver/bar/2") });
+ DATA_URLS_3 = Arrays.asList(new URI[] { new URI("http://dataserver/fubar/3"),
+ new URI("http://datarepos/foobar/3") });
+ DATA_URLS_4 = Arrays.asList(new URI[] { new URI("http://datarepos/fee/4") });
+ DATA_URLS_5 = Arrays.asList(new URI[] { new URI("http://datarepos/fie/5") });
+ DATA_URLS_6 = Arrays.asList(new URI[] { new URI("http://datarepos/foe/fum/6") });
+
+ long now = System.currentTimeMillis();
+ SERVICE_0 = new URI("http://tempuri.org/root_service/" + now);
+ WORKFLOW_1 = new URI("http://tempuri.org/workflow1/" + now);
+ SERVICE_1 = new URI("http://tempuri.org/service1/" + now);
+ SERVICE_2 = new URI("http://tempuri.org/service2/" + now);
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ public URI WORKFLOW_1, SERVICE_1, SERVICE_2, SERVICE_0;
+
+ private URI DATA_URI_1, DATA_URI_2, DATA_URI_3, DATA_URI_4, DATA_URI_5, DATA_URI_6;
+ private List<URI> DATA_URLS_1, DATA_URLS_2, DATA_URLS_3, DATA_URLS_4, DATA_URLS_5, DATA_URLS_6;
+
+ static class Success {
+ XmlObject header;
+ XmlObject body;
+ }
+
+ static class Failure {
+ XmlObject header;
+ XmlObject body;
+ }
+
+ public Object runWorkflow1(InvocationEntity myInvoker, URI myWorkflowID, URI myServiceID, String myNodeID,
+ Integer myTimestep) throws XmlException {
+
+ assert WORKFLOW_1.equals(myServiceID);
+
+ WorkflowNotifier notifier = NotifierFactory.createWorkflowNotifier();
+ WorkflowTrackingContext context = notifier.createTrackingContext(new Properties(), BROKER_URL, myWorkflowID,
+ myServiceID, myNodeID, myTimestep);
+
+ InvocationContext myInvocation = notifier.workflowInvoked(context, myInvoker,
+ XmlObject.Factory.parse("<soapHeader/>"),
+ XmlObject.Factory.parse("<soapBody>input1,input2</soapBody>"), "This is the start of this workflow");
+
+ // BEGIN SERVICE1
+ {
+ // prepare to invoke service1
+ InvocationEntity service1 = notifier.createEntity(myServiceID, SERVICE_1, "NODE1", 1);
+ InvocationContext service1Invocation = notifier.invokingService(context, service1,
+ XmlObject.Factory.parse("<soapHeader/>"), XmlObject.Factory.parse("<soapBody>input1</soapBody>"),
+ "This workflow is invoking a service");
+
+ Object result = null;
+ try {
+ // prepare to invoke service1
+ result = runService1(service1, service1.getWorkflowID(), service1.getServiceID(),
+ service1.getWorkflowNodeID(), service1.getWorkflowTimestep());
+
+ // If this were an async invocation, we would have finished
+ // sending request.
+ // we acknowledge the successful request.
+ notifier.invokingServiceSucceeded(context, service1Invocation, "Invoked service1 successfully");
+
+ } catch (Exception ex) {
+ // If there had been a problem sending the request on the wire,
+ // we acknowledge a failed request.
+ notifier.invokingServiceFailed(context, service1Invocation, ex, "Failed to invoke service1");
+ }
+
+ // At this point, we would have waited for response from service1 if
+ // it were an async call.
+ // assume we received response at this point and continue.
+ if (result instanceof Success) {
+ notifier.receivedResult(context, service1Invocation, ((Success) result).header,
+ ((Success) result).body, "got success response from service1");
+ } else if (result instanceof Failure) {
+ notifier.receivedFault(context, service1Invocation, ((Failure) result).header, ((Failure) result).body,
+ "got fault response from service1");
+ }
+
+ }
+
+ // BEGIN SERVICE2
+ {
+ // prepare to invoke service1
+ InvocationEntity service2 = notifier.createEntity(myServiceID, SERVICE_2, "NODE2", 2);
+ InvocationContext service1Invocation = notifier.invokingService(context, service2,
+ XmlObject.Factory.parse("<soapHeader/>"),
+ XmlObject.Factory.parse("<soapBody>input2,input3</soapBody>"),
+ "This workflow is invoking another service");
+
+ Object result = null;
+ try {
+ // prepare to invoke service2
+ result = runService2(service2, service2.getWorkflowID(), service2.getServiceID(),
+ service2.getWorkflowNodeID(), service2.getWorkflowTimestep());
+
+ // If this were an async invocation, we would have finished
+ // sending request.
+ // we acknowledge the successful request.
+ notifier.invokingServiceSucceeded(context, service1Invocation, "Invoked service2 successfully");
+
+ } catch (Exception ex) {
+ // If there had been a problem sending the request on the wire,
+ // we acknowledge a failed request.
+ notifier.invokingServiceFailed(context, service1Invocation, ex, "Failed to invoke service2");
+ }
+
+ // At this point, we would have waited for response from service1 if
+ // it were an async call.
+ // assume we received response at this point and continue.
+ if (result instanceof Success) {
+ notifier.receivedResult(context, service1Invocation, ((Success) result).header,
+ ((Success) result).body, "got success response from service2");
+ } else if (result instanceof Failure) {
+ notifier.receivedFault(context, service1Invocation, ((Failure) result).header, ((Failure) result).body,
+ "got fault response from service2");
+ }
+
+ }
+
+ Object result = null;
+ notifier.sendingResult(context, myInvocation, "sending result back to the invoker of this workflow");
+ try {
+ result = new Success();
+ notifier.sendingResponseSucceeded(context, myInvocation, "sent result to invoker");
+ } catch (Exception ex) {
+ notifier.sendingResponseFailed(context, myInvocation, ex);
+ }
+
+ return result;
+ }
+
+ public Object runService1(InvocationEntity myInvoker, URI myWorkflowID, URI myServiceID, String myNodeID,
+ int myTimestep) throws XmlException {
+
+ // ensure the service ID as passed is what the service thinks it's
+ // service ID is
+ assert SERVICE_1.equals(myServiceID);
+
+ // if we were not publishing data products, a serviceNotifier would have
+ // sufficed
+ ProvenanceNotifier notifier = NotifierFactory.createProvenanceNotifier();
+ WorkflowTrackingContext context = notifier.createTrackingContext(null, BROKER_URL, myWorkflowID, myServiceID,
+ myNodeID, myTimestep);
+ InvocationContext invocationContext = notifier.serviceInvoked(context, myInvoker,
+ "I (service1) was invoked by my invoker",
+ AnnotationProps.newProps(AnnotationConsts.AbstractServiceID, myServiceID.toString() + "-abstract")
+ .toString());
+
+ notifier.dataConsumed(context, DATA_URI_1, DATA_URLS_1, "consuming a file");
+ notifier.dataConsumed(context, DATA_URI_2, DATA_URLS_2, "consuming another file");
+
+ // do stuff!
+
+ notifier.dataProduced(context, DATA_URI_3, DATA_URLS_3, "produced some file", "<someXml/>");
+
+ boolean successResult = true;
+
+ // produce response...either success or failure
+ Object result = null;
+ if (successResult) {
+ Success success = new Success();
+ success.header = XmlObject.Factory
+ .parse("<S:Header "
+ + "xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:wsa=\"http://www.w3.org/2005/08/addressing\" xmlns:wsp=\"http://schemas.xmlsoap.org/ws/2002/12/policy\" xmlns:S=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ + "<wsa:To>http://129.79.246.253:3456</wsa:To><wsa:RelatesTo>uuid:ee4d14d0-2262-11db-86d8-cd518af91949</wsa:RelatesTo>"
+ + "</S:Header>");
+ success.body = XmlObject.Factory
+ .parse("<S:Body "
+ + "xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:wsa=\"http://www.w3.org/2005/08/addressing\" xmlns:wsp=\"http://schemas.xmlsoap.org/ws/2002/12/policy\" xmlns:S=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ + "<gfac:Run_OutputParams xmlns:gfac=\"http://org.apache.airavata/namespaces/2004/01/gFac\">"
+ + "<gfac:WRF_Ininitialization_Files><value>gsiftp://grid-hg.ncsa.teragrid.org//scratch/hperera/Wed_Aug_02_15_10_23_EST_2006_ARPS2WRF/outputData/namelist.input</value><value>gsiftp://grid-hg.ncsa.teragrid.org//scratch/hperera/Wed_Aug_02_15_10_23_EST_2006_ARPS2WRF/outputData/wrfbdy_d01</value><value>gsiftp://grid-hg.ncsa.teragrid.org//scratch/hperera/Wed_Aug_02_15_10_23_EST_2006_ARPS2WRF/outputData/wrfinput_d01</value></gfac:WRF_Ininitialization_Files>"
+ + "</gfac:Run_OutputParams>" + "</S:Body>");
+
+ // notify that invocation produced a result and send result to
+ // invoker
+ notifier.sendingResult(context, invocationContext, success.header, success.body, "sending success");
+ try {
+ // since this is a sync call, we mimic an async response by
+ // seting the result object
+ result = success;
+ // acknowledge that the result was sent successfully
+ notifier.sendingResponseSucceeded(context, invocationContext);
+ } catch (Exception ex) {
+ // acknowledge that sending the result failed
+ notifier.sendingResponseFailed(context, invocationContext, ex, "error sending response");
+ }
+
+ } else {
+
+ Failure failure = new Failure();
+ failure.header = XmlObject.Factory.parse("<faultHeader/>");
+ failure.body = XmlObject.Factory.parse("<faultBody>fault1</faultBody>");
+
+ // notify that invocation produced a fault and send fault to invoker
+ notifier.sendingFault(context, invocationContext, failure.header, failure.body, "sending fault");
+ try {
+ // since this is a sync call, we mimic an async response by
+ // seting the result object
+ result = failure;
+ // acknowledge that the fault was sent successfully
+ notifier.sendingResponseSucceeded(context, invocationContext);
+ } catch (Exception ex) {
+ // acknowledge that sending the fault failed
+ notifier.sendingResponseFailed(context, invocationContext, ex, "error sending response");
+ }
+ }
+
+ // send result
+ return result;
+ }
+
+ public Object runService2(InvocationEntity myInvoker, URI myWorkflowID, URI myServiceID, String myNodeID,
+ int myTimestep) throws XmlException {
+
+ // ensure the service ID as passed is what the service thinks it's
+ // service ID is
+ assert SERVICE_2.equals(myServiceID);
+
+ // if we were not publishing data products, a serviceNotifier would have
+ // sufficed
+ ProvenanceNotifier notifier = NotifierFactory.createProvenanceNotifier();
+
+ // received request
+ WorkflowTrackingContext context = notifier.createTrackingContext(null, BROKER_URL, myWorkflowID, myServiceID,
+ myNodeID, myTimestep);
+ InvocationContext invocationContext = notifier.serviceInvoked(context, myInvoker,
+ "I (service2) was invoked by my invoker");
+
+ notifier.dataConsumed(context, DATA_URI_2, DATA_URLS_2, "consuming file used by service1");
+ notifier.dataConsumed(context, DATA_URI_3, DATA_URLS_3);
+ notifier.dataConsumed(context, DATA_URI_4, DATA_URLS_4, null, "<dataXml>boo</dataXml>");
+
+ // do stuff!
+
+ notifier.dataProduced(context, DATA_URI_5, DATA_URLS_5);
+ notifier.dataProduced(context, DATA_URI_6, DATA_URLS_6);
+
+ boolean successResult = true;
+
+ // produce response...either success or failure
+ Object result = null;
+ if (successResult) {
+ Success success = new Success();
+ success.header = XmlObject.Factory.parse("<resultHeader/>");
+ success.body = XmlObject.Factory.parse("<resultBody>output2,output3</resultBody>");
+
+ // notify that invocation produced a result and send result to
+ // invoker
+ notifier.sendingResult(context, invocationContext, success.header, success.body, "sending success");
+ try {
+ // since this is a sync call, we mimic an async response by
+ // seting the result object
+ result = success;
+ // acknowledge that the result was sent successfully
+ notifier.sendingResponseSucceeded(context, invocationContext);
+ } catch (Exception ex) {
+ // acknowledge that sending the result failed
+ notifier.sendingResponseFailed(context, invocationContext, ex, "error sending response");
+ }
+
+ } else {
+
+ Failure failure = new Failure();
+ failure.header = XmlObject.Factory.parse("<faultHeader/>");
+ failure.body = XmlObject.Factory.parse("<faultBody>fault2</faultBody>");
+
+ // notify that invocation produced a fault and send fault to invoker
+ notifier.sendingFault(context, invocationContext, failure.header, failure.body, "sending fault");
+ try {
+ // since this is a sync call, we mimic an async response by
+ // seting the result object
+ result = failure;
+ // acknowledge that the fault was sent successfully
+ notifier.sendingResponseSucceeded(context, invocationContext);
+ } catch (Exception ex) {
+ // acknowledge that sending the fault failed
+ notifier.sendingResponseFailed(context, invocationContext, ex, "error sending response");
+ }
+ }
+
+ // send result
+ return result;
+ }
+
+ // used to override notifier creation to use an external notifier, instead
+ // of setting the
+ // properties to create it in the main method
+ private Notifier notifier;
+
+ public void runSample() throws Exception {
+ notifier = NotifierFactory.createNotifier();
+ WorkflowTrackingContext context = notifier.createTrackingContext(null, BROKER_URL, WORKFLOW_1, SERVICE_0, null,
+ null);
+ // create workflow and service instances
+ {
+
+ WorkflowNotifier notifier = NotifierFactory.createWorkflowNotifier();
+ notifier.workflowInitialized(context, WORKFLOW_1, "Workflow ready to start",
+ "<wfInfo>some annotation about the workflow</wfInfo>",
+ "<dummy>just to check annotations list</dummy>");
+ notifier.serviceInitialized(context, SERVICE_1);
+ notifier.serviceInitialized(context, SERVICE_2);
+ }
+
+ {
+ GenericNotifier notifier = NotifierFactory.createGenericNotifier();
+ InvocationEntity initiatingService = notifier.createEntity(null, SERVICE_0, null, null);
+
+ runWorkflow1(initiatingService, null, WORKFLOW_1, null, null);
+ }
+
+ // terminate workflow and service instances
+ {
+ WorkflowNotifier notifier = NotifierFactory.createWorkflowNotifier();
+ notifier.workflowTerminated(context, WORKFLOW_1);
+ notifier.serviceTerminated(context, SERVICE_1);
+ notifier.serviceTerminated(context, SERVICE_2);
+ }
+ }
+
+ @Test
+ public void testSimpleTest() throws Exception {
+
+ System.out.println("USAGE: run org.apache.airavata.workflow.tracking.samples.workflow.SimpleWorkflowExecution "
+ + " [WSMessaging broker URL (default: print to stdout)]");
+
+ ConsumerNotificationHandler handler = new ConsumerNotificationHandler() {
+
+ public void handleNotification(SOAPEnvelope msgEnvelope) {
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ XMLPrettyPrinter.prettify(msgEnvelope, out);
+ System.out.println(new String(out.toByteArray()));
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ };
+
+ WseMsgBrokerClient api = new WseMsgBrokerClient();
+ api.init(BROKER_URL);
+ int consumerPort = 5555;
+
+ String[] consumerServiceEprs = api.startConsumerService(consumerPort, handler);
+
+ api.subscribe(consumerServiceEprs[0], ">", null);
+
+ String topic = "Foo";
+ // if remote broker location specified, use WSMessaging publisher;
+ ConstructorProps props = ConstructorProps
+ .newProps(ConstructorConsts.ENABLE_ASYNC_PUBLISH, "false")
+ .set(ConstructorConsts.ENABLE_BATCH_PROVENANCE, "true")
+ .set(ConstructorConsts.ANNOTATIONS,
+ AnnotationProps.newProps(AnnotationConsts.ExperimentID,
+ "experiment-id-" + System.currentTimeMillis()).set(AnnotationConsts.UserDN,
+ "/O=IU/OU=Extreme Lab/CN=drlead"));
+ if (BROKER_URL != null) {
+ EndpointReference brokerEpr = api.createEndpointReference(BROKER_URL, topic);
+
+ props.set(ConstructorConsts.BROKER_EPR, brokerEpr.getAddress());
+ } else {
+ props.set(ConstructorConsts.PUBLISHER_IMPL_CLASS,
+ "org.apache.airavata.workflow.tracking.impl.publish.LoopbackPublisher");
+ props.set(ConstructorConsts.TOPIC, topic);
+ }
+
+ System.out.println(ConstructorConsts.ANNOTATIONS);
+ System.out.println(ConstructorConsts.KARMA_IMPL);
+ runSample();
+ new Semaphore(0).acquire();
+ }
+
+}
Added: incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/samples/workflow/WorkflowNotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/samples/workflow/WorkflowNotificationListener.java?rev=1189967&view=auto
==============================================================================
--- incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/samples/workflow/WorkflowNotificationListener.java (added)
+++ incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/samples/workflow/WorkflowNotificationListener.java Thu Oct 27 19:41:43 2011
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.test.suite.workflowtracking.tests.samples.workflow;
+
+import java.net.URL;
+import java.rmi.RemoteException;
+import java.util.Properties;
+
+import org.apache.airavata.workflow.tracking.client.Callback;
+import org.apache.airavata.workflow.tracking.client.LeadNotificationManager;
+import org.apache.airavata.workflow.tracking.client.NotificationType;
+import org.apache.airavata.workflow.tracking.client.Subscription;
+import org.apache.airavata.test.suite.workflowtracking.tests.util.TestConfigKeys;
+import org.apache.airavata.workflow.tracking.types.WorkflowTerminatedDocument;
+import org.apache.xmlbeans.XmlException;
+import org.apache.xmlbeans.XmlObject;
+import org.junit.*;
+
+public class WorkflowNotificationListener implements Callback {
+
+ Subscription subscription;
+ Properties configs = new Properties();
+ String BROKER_URL;
+ String MESSAGEBOX_URL;
+ int consumerPort;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ URL configURL = ClassLoader.getSystemResource(TestConfigKeys.CONFIG_FILE_NAME);
+ configs.load(configURL.openStream());
+ BROKER_URL = configs.getProperty(TestConfigKeys.BROKER_EVENTING_SERVICE_EPR);
+ MESSAGEBOX_URL = configs.getProperty(TestConfigKeys.MSGBOX_SERVICE_EPR);
+ consumerPort = Integer.parseInt(configs.getProperty(TestConfigKeys.CONSUMER_PORT));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ public WorkflowNotificationListener() {
+ }
+
+ /**
+ * Method deliverMessage is called when a Lead Message is received on the subscribed topic.
+ *
+ * @param topic
+ * the topic to which this message was sent. This can also be retrieved from the messageObj XMlObject
+ * directly after typecasting.
+ * @param messageObj
+ * the XmlObject representing one of the LeadMessages, This needs to be typecast to the correct message
+ * type before being used.
+ *
+ */
+ public void deliverMessage(String topic, NotificationType type, XmlObject messageObj) {
+
+ System.out.println("Received Notification Type [" + type + "] on topic [" + topic + "]\n" + messageObj
+ + "\n---");
+
+ if (type == NotificationType.WorkflowTerminated) {
+ System.out.println("Workflow terminated. Unsubscribing...");
+ WorkflowTerminatedDocument obj = (WorkflowTerminatedDocument) messageObj;
+ try {
+ obj.getWorkflowTerminated().getAnnotation()
+ .set(XmlObject.Factory.parse("<something>someval</something>"));
+ System.out.println(obj.toString());
+ } catch (XmlException e) {
+ e.printStackTrace();
+ }
+ try {
+ subscription.destroy();
+ } catch (RemoteException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Test
+ public void testWokflowNotificationListener() throws Exception {
+
+ String topic = "somerandomtopic";
+
+ WorkflowNotificationListener subscriber = new WorkflowNotificationListener();
+ boolean useMessageBox = true;
+ if (!useMessageBox) {
+ subscription = LeadNotificationManager.createSubscription(BROKER_URL, topic, subscriber, consumerPort);
+ } else {
+ subscription = LeadNotificationManager.createMessageBoxSubscription(MESSAGEBOX_URL, BROKER_URL, topic,
+ null, subscriber);
+ }
+ System.out.println("Subscribing to broker: " + BROKER_URL);
+ System.out.println("Started listening on topic: " + subscription.getTopic());
+ while (true) {
+ Thread.sleep(10000);
+ }
+
+ }
+
+}
Added: incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/util/CommonUtils.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/util/CommonUtils.java?rev=1189967&view=auto
==============================================================================
--- incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/util/CommonUtils.java (added)
+++ incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/util/CommonUtils.java Thu Oct 27 19:41:43 2011
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.test.suite.workflowtracking.tests.util;
+
+public class CommonUtils {
+ public static final String WORKFLOW_INITIALIZED_NOTIFICATION = "<wor:workflowInitialized xmlns:wor=\"http://lead.extreme.indiana.edu/namespaces/2006/06/workflow_tracking\"><wor:notificationSource wor:serviceID=\"http://tempuri.org/workflow1\" /><wor:timestamp>2006-06-04T00:53:41.296-04:00</wor:timestamp></wor:workflowInitialized>";
+}
Added: incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/util/SubscriberThread.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/util/SubscriberThread.java?rev=1189967&view=auto
==============================================================================
--- incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/util/SubscriberThread.java (added)
+++ incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/util/SubscriberThread.java Thu Oct 27 19:41:43 2011
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.test.suite.workflowtracking.tests.util;
+
+import java.rmi.RemoteException;
+
+import org.apache.airavata.workflow.tracking.client.Callback;
+import org.apache.airavata.workflow.tracking.client.LeadNotificationManager;
+import org.apache.airavata.workflow.tracking.client.NotificationType;
+import org.apache.airavata.workflow.tracking.client.Subscription;
+import org.apache.airavata.test.suite.workflowtracking.tests.MultipleSubscriptionTest;
+import org.apache.airavata.test.suite.workflowtracking.tests.ThreadMessagePassingCallback;
+import org.apache.xmlbeans.XmlObject;
+
+public class SubscriberThread extends Thread {
+
+ private ThreadMessagePassingCallback callback;
+
+ public static int count = 0;
+
+ private int subCount = 0;
+
+ private Subscription subscription;
+ private String brokerURL;
+ private String topic;
+ private int consumerServerPort;
+
+ public SubscriberThread(ThreadMessagePassingCallback callback, String brokerURL, String topic,
+ int consumerServerPort) {
+ this.callback = callback;
+ this.brokerURL = brokerURL;
+ this.topic = topic;
+ this.consumerServerPort = consumerServerPort;
+ }
+
+ @Override
+ public void run() {
+ try {
+ subscription = LeadNotificationManager.createSubscription(brokerURL, topic, new Callback() {
+
+ public void deliverMessage(String topic, NotificationType type, XmlObject messageObj) {
+
+ subCount++;
+ count++;
+ System.out.println("Subscription received " + subCount + "th notification of type:" + type
+ + " Total is :" + count);
+ assert (type == NotificationType.WorkflowInitialized);
+
+ if (subCount == MultipleSubscriptionTest.NOTIFICATIONS_PUBLISHED) {
+ try {
+ subscription.destroy();
+ } catch (RemoteException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ callback.done();
+ }
+ }
+ }, consumerServerPort);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ while (true) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
Added: incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/util/TestConfigKeys.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/util/TestConfigKeys.java?rev=1189967&view=auto
==============================================================================
--- incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/util/TestConfigKeys.java (added)
+++ incubator/airavata/trunk/modules/test-suite/src/test/java/org/apache/airavata/test/suite/workflowtracking/tests/util/TestConfigKeys.java Thu Oct 27 19:41:43 2011
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.test.suite.workflowtracking.tests.util;
+
+public class TestConfigKeys {
+ public final static String CONFIG_FILE_NAME = "unit_test.properties";
+ public final static String BROKER_EVENTING_SERVICE_EPR = "broker.eventing.service.epr";
+ public final static String BROKER_NOTIFICATIONS_SERVICE_EPR = "broker.notification.service.epr";
+ public final static String MSGBOX_SERVICE_EPR = "msgbox.service.epr";
+ public final static String CONSUMER_EPR = "consumer.location";
+ public final static String CONSUMER_PORT = "consumer.port";
+ public final static String TOPIC_SIME = "topic.simple";
+ public final static String TOPIC_XPATH = "topic.xpath";
+ public final static String AXIS2_REPO = "axis2.repo";
+}