You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2011/07/03 17:51:46 UTC

svn commit: r1142453 [9/12] - in /incubator/airavata/ws-messaging/trunk/messagebroker: ./ .settings/ customLibs/ customLibs/activeMQ/ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/airavata/ src/main...

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.TreeSet;
+
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.WsmgVersion;
+
+public class RunTimeStatistics {
+    public static long totalMessageSize = 0;
+    public static long totalReceivedNotification = 0;
+    public static long totalSentOutNotification = 0;
+    public static long totalFailedNotification = 0;
+    public static long totalSubscriptions = 0;
+    public static long totalSubscriptionsAtStartUp = 0;
+    public static long totalUnSubscriptions = 0;
+    public static long minMessageSize = Long.MAX_VALUE;
+    public static long maxMessageSize = 0;
+    public static String startUpTime = "";
+    public static long totalSuccessfulDeliveryTime = 0;
+    public static long totalFailedDeliveryTime = 0;
+    public static long minSuccessfulDeliveryTime = Long.MAX_VALUE;
+    public static long maxSuccessfulDeliveryTime = 0;
+    public static long minFailedDeliveryTime = Long.MAX_VALUE;
+    public static long maxFailedDeliveryTime = 0;
+    public static HashMap<String, Integer> failConsumerList = new HashMap<String, Integer>();
+
+    // public static TreeSet currentBlackList=new TreeSet();
+    // public static TreeSet previousBlackList=new TreeSet();
+
+    private static long startUpTimeInMillis;
+
+    public static synchronized void addNewNotificationMessageSize(int size) {
+        if (size < minMessageSize) {
+            minMessageSize = size;
+        }
+        if (size > maxMessageSize) {
+            maxMessageSize = size;
+        }
+        totalMessageSize += size;
+        totalReceivedNotification++;
+    }
+
+    public static synchronized void addNewSuccessfulDeliverTime(long deliveryTime) {
+        if (deliveryTime < minSuccessfulDeliveryTime) {
+            minSuccessfulDeliveryTime = deliveryTime;
+        }
+        if (deliveryTime > maxSuccessfulDeliveryTime) {
+            maxSuccessfulDeliveryTime = deliveryTime;
+        }
+        totalSuccessfulDeliveryTime += deliveryTime;
+        totalSentOutNotification++;
+    }
+
+    public static synchronized void addNewFailedDeliverTime(long deliveryTime) {
+        if (deliveryTime < minFailedDeliveryTime) {
+            minFailedDeliveryTime = deliveryTime;
+        }
+        if (deliveryTime > maxFailedDeliveryTime) {
+            maxFailedDeliveryTime = deliveryTime;
+        }
+        totalFailedDeliveryTime += deliveryTime;
+        totalFailedNotification++;
+    }
+
+    public static synchronized void addFailedConsumerURL(String url) {
+        Integer previousCount = failConsumerList.get(url);
+        if (previousCount == null) {
+            failConsumerList.put(url, 1);
+        } else {
+            previousCount++;
+            failConsumerList.put(url, previousCount);
+        }
+    }
+
+    public static void setStartUpTime() {
+        Date currentDate = new Date(); // Current date
+        startUpTime = CommonRoutines.getXsdDateTime(currentDate);
+        startUpTimeInMillis = currentDate.getTime();
+    }
+
+    public static String getHtmlString() {
+        String htmlString = "";
+
+        htmlString += "<p>Total incoming message number: <span class=\"xml-requests-count\">"
+                + totalReceivedNotification + "</span><br />\n";
+        htmlString += "Total successful outgoing message number: " + totalSentOutNotification + "<br>\n";
+        htmlString += "Total unreachable outgoing message number: " + totalFailedNotification + "<br>\n";
+        htmlString += "Total subscriptions requested: " + totalSubscriptions + "(+" + totalSubscriptionsAtStartUp
+                + " startUp)<br>\n";
+        htmlString += "Total Unsubscriptions requested: " + totalUnSubscriptions + "<br>\n";
+        htmlString += "</p>\n";
+        int averageMessageSize = 0;
+        if (totalReceivedNotification != 0) {
+            averageMessageSize = (int) (totalMessageSize / totalReceivedNotification);
+        }
+        htmlString += "<p>Average message size: " + averageMessageSize + " bytes<br>\n";
+        htmlString += "Max message size: " + maxMessageSize + " bytes<br>\n";
+        htmlString += "Min message size: " + minMessageSize + " bytes<br>\n";
+        htmlString += "</p>\n";
+        long averageSuccessfulDeliveryTime = 0;
+        if (totalSuccessfulDeliveryTime != 0) {
+            averageSuccessfulDeliveryTime = (totalSuccessfulDeliveryTime / totalSentOutNotification);
+        }
+        htmlString += "<p>Average Successful Delivery Time: " + averageSuccessfulDeliveryTime + " ms<br>\n";
+        htmlString += "Max Successful Delivery Time: " + maxSuccessfulDeliveryTime + " ms<br>\n";
+        htmlString += "Min Successful Delivery Time: " + minSuccessfulDeliveryTime + " ms<br>\n";
+        htmlString += "</p>\n";
+        long averageFailedDeliveryTime = 0;
+        if (totalFailedDeliveryTime != 0) {
+            averageFailedDeliveryTime = (totalFailedDeliveryTime / totalFailedNotification);
+        }
+        htmlString += "<p>Average Unreachable Delivery Time: " + averageFailedDeliveryTime + " ms<br>\n";
+        htmlString += "Max Unreachable Delivery Time: " + maxFailedDeliveryTime + " ms<br>\n";
+        htmlString += "Min Unreachable Delivery Time: " + minFailedDeliveryTime + " ms<br>\n";
+        htmlString += "</p>\n";
+        htmlString += "<p>Service started at: " + startUpTime + " <span class=\"starttime-seconds\">"
+                + startUpTimeInMillis + "</span> [seconds] since UNIX epoch)" + "<br />\n";
+
+        htmlString += "Version: <span class=\"service-name\">" + WsmgVersion.getImplementationVersion()
+                + "</span></p>\n";
+
+        htmlString += "<p>Total unreachable consumerUrl: " + failConsumerList.size() + " <br>\n";
+        TreeSet<String> consumerUrlList = new TreeSet<String>(failConsumerList.keySet());
+        Iterator<String> iter = consumerUrlList.iterator();
+        while (iter.hasNext()) {
+            String url = iter.next();
+            int failedCount = failConsumerList.get(url);
+            htmlString += "  " + url + " -->" + failedCount + " <br>\n";
+        }
+        htmlString += "</p>\n";
+        return htmlString;
+    }
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        // TODO Auto-generated method stub
+        // setStartUpTime();
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+public class TimerThread implements Runnable {
+    Counter counter;
+
+    long counterValue = 0;
+
+    long seqNum = 0;
+
+    String comment = "";
+
+    public TimerThread(Counter counter) {
+        this.counter = counter;
+    }
+
+    public TimerThread(Counter counter, String comment) {
+        this.counter = counter;
+        this.comment = comment;
+    }
+
+    public void run() {
+        long currentTime = 0;
+        long interval = 1000;
+        long lastCounter = 0;
+        long idleCount = 0;
+        // wait for about 5 sec and start from 000 time so that other thread can
+        // start together
+        currentTime = System.currentTimeMillis();
+        long launchTime = ((currentTime + 2000) / 1000) * 1000;
+        long sleepTime = launchTime - currentTime;
+        System.out.println("launchTime=" + launchTime + " SleepTime=" + sleepTime);
+        try {
+            Thread.sleep(sleepTime);
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        while (true) {
+            currentTime = System.currentTimeMillis();
+            counterValue = counter.getCounterValue();
+            long receivedCount = counterValue - lastCounter;
+            lastCounter = counterValue;
+            if (receivedCount == 0) {
+                idleCount++;
+            } else {
+                idleCount = 0;
+            }
+            if (receivedCount > 0 || (receivedCount == 0 && idleCount < 3)) {
+                // System.out.println("time="+currentTime+" counter="+
+                // counter.getCounterValue()+"
+                // received="+receivedCount+comment);
+                System.out.println(seqNum + " " + counter.getCounterValue() + " " + receivedCount + comment
+                        + counter.getOtherValueString());
+            }
+            seqNum++;
+            launchTime = launchTime + interval;
+            sleepTime = launchTime - currentTime;
+            // System.out.println("launchTime="+launchTime+"
+            // SleepTime="+sleepTime);
+            if (sleepTime < 0)
+                sleepTime = 0;
+            try {
+                Thread.sleep(sleepTime);
+            } catch (InterruptedException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+    }
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+public enum WsEventingOperations {
+
+    RENEW("renew"), PUBLISH("publish"), GET_STATUS("getStatus"), SUBSCRIPTION_END("subscriptionEnd"), SUBSCRIBE(
+            "subscribe"), UNSUBSCRIBE("unsubscribe");
+
+    private final String name;
+
+    private WsEventingOperations(String n) {
+        name = n;
+    }
+
+    public String toString() {
+        return name;
+    }
+
+    public boolean equals(String s) {
+        return name.equals(s);
+    }
+
+    public static WsEventingOperations valueFrom(String s) {
+        for (WsEventingOperations status : WsEventingOperations.values()) {
+            if (status.toString().equalsIgnoreCase(s)) {
+                return status;
+            }
+
+        }
+
+        throw new RuntimeException("invalid WsEventingOperation:- " + s);
+
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+public enum WsNotificationOperations {
+
+    NOTIFY("notify"), SUBSCRIBE("subscribe"), GET_CURRENT_MSG("getCurrentMessage"), PAUSE_REQUEST("gause"), RESUME_REQUEST(
+            "resume"), PAUSE_SUBSCRIPTION("pauseSubscription"), RESUME_SUBSCRIPTION("resumeSubscription"), REGISTER_PUBLISHER(
+            "registerPublisher"), UNSUBSCRIBE("unsubscribe");
+
+    private final String name;
+
+    private WsNotificationOperations(String n) {
+        name = n;
+    }
+
+    public String toString() {
+        return name;
+    }
+
+    public boolean equals(String s) {
+        return name.equals(s);
+    }
+
+    public static WsNotificationOperations valueFrom(String s) {
+        for (WsNotificationOperations status : WsNotificationOperations.values()) {
+            if (status.toString().equalsIgnoreCase(s)) {
+                return status;
+            }
+
+        }
+
+        throw new RuntimeException("invalid Ws notification Operation:- " + s);
+
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsmgUtil.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsmgUtil.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsmgUtil.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsmgUtil.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,192 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+
+public class WsmgUtil {
+    private static final String BODY = "Body";
+
+    private static final String ACTION = "Action";
+
+    private static final String MESSAGE_ID = "MessageID";
+
+    private static final String XMLNS_WIDGET = "xmlns:widget";
+
+    private static final String DIALECT = "Dialect";
+
+    private static final String TOPIC = "Topic";
+
+    private static final String TO = "To";
+
+    private static final String HEADER = "Header";
+
+    private static final String ENVELOPE = "Envelope";
+
+    private static final String WSNT = "wsnt";
+
+    private static final String XSI = "xsi";
+
+    private static final String WA48 = "wa48";
+
+    private static final String S = "S";
+
+    private static final String HTTP_WIDGETS_COM = "http://widgets.com";
+
+    private static final String HTTP_WWW_IBM_COM_XMLNS_STDWIP_WEB_SERVICES_WS_TOPICS_TOPIC_EXPRESSION_SIMPLE = "http://www.ibm.com/xmlns/stdwip/web-services/WS-Topics/TopicExpression/simple";
+
+    private static final String HTTP_WWW_IBM_COM_XMLNS_STDWIP_WEB_SERVICES_WS_BASE_NOTIFICATION = "http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification";
+
+    private static final String HTTP_WWW_W3_ORG_2001_XMLSCHEMA_INSTANCE = "http://www.w3.org/2001/XMLSchema-instance";
+
+    private static final String HTTP_SCHEMAS_XMLSOAP_ORG_WS_2004_08_ADDRESSING = "http://schemas.xmlsoap.org/ws/2004/08/addressing";
+
+    private static final String HTTP_SCHEMAS_XMLSOAP_ORG_SOAP_ENVELOPE = "http://schemas.xmlsoap.org/soap/envelope/";
+
+    public static String formatURLString(String url) {
+
+        if (url == null) {
+            throw new IllegalArgumentException("url can't be null");
+        }
+
+        if (url.indexOf("//") < 0) {
+            url = "http://" + url; // use default http
+        }
+        return url;
+    }
+
+    public static boolean sameStringValue(String stringA, String stringB) {
+        if (stringA == null) {
+            if (stringB == null) {
+                return true;
+            }
+            return false;
+
+        }
+        // StringA!=null
+        if (stringB == null)
+            return false;
+        if (stringA.compareTo(stringB) == 0) {
+            return true;
+        }
+        return false;
+
+    }
+
+    public static String getHostIP() {
+        InetAddress localAddress = null;
+        try {
+            localAddress = InetAddress.getLocalHost();
+        } catch (UnknownHostException ex) {
+            System.out.println("Error - unable to resolve localhost");
+        }
+        // Use IP address since DNS entry cannot update the laptop's entry
+        // promptly
+        String hostIP = localAddress.getHostAddress();
+        return hostIP;
+    }
+
+    public static String getTopicLocalString(String filterText) {
+
+        if (filterText == null)
+            throw new IllegalArgumentException("filter text can't be null");
+
+        String localName = null;
+
+        int pos = filterText.indexOf(':');
+
+        if (pos != -1) {
+            localName = filterText.substring(pos + 1);
+
+        } else {
+
+            localName = filterText;
+        }
+
+        return localName;
+    }
+
+    /**
+     * 
+     * @return localString
+     * @throws AxisFault
+     */
+    public static String getXPathString(OMElement xpathEl) throws AxisFault {
+
+        if (xpathEl == null) {
+            throw new IllegalArgumentException("xpath element can't be null");
+        }
+
+        OMAttribute dialectAttribute = xpathEl.getAttribute(new QName("Dialect"));
+
+        if (dialectAttribute == null) {
+            dialectAttribute = xpathEl.getAttribute(new QName("DIALECT"));
+
+        }
+        if (dialectAttribute == null) {
+            throw new AxisFault("dialect is required for subscribe");
+        }
+        String dialectString = dialectAttribute.getAttributeValue();
+        if (!dialectString.equals(WsmgCommonConstants.XPATH_DIALECT)) {
+            // System.out.println("***Unkown dialect: " + dialectString);
+            throw new AxisFault("Unkown dialect: " + dialectString);
+        }
+        String xpathLocalString = xpathEl.getText();
+        return xpathLocalString;
+    }
+
+    public static String getTopicFromRequestPath(String topicPath) {
+        if (topicPath == null)
+            return null;
+        if (topicPath.length() == 0)
+            return null;
+        if (topicPath.startsWith("/")) {
+            topicPath = topicPath.substring(1);
+            if (topicPath.length() == 0)
+                return null;
+        }
+
+        String ret = null;
+
+        int index = topicPath.indexOf(WsmgCommonConstants.TOPIC_PREFIX);
+        if (index >= 0) {
+
+            ret = topicPath.substring(index + WsmgCommonConstants.TOPIC_PREFIX.length());
+
+            if (ret.length() == 0) {
+                ret = null;
+            }
+
+        }
+
+        return ret;
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/test/TestUtilServer.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/test/TestUtilServer.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/test/TestUtilServer.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/test/TestUtilServer.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,346 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util.test;
+
+import java.io.File;
+import java.io.FilenameFilter;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.broker.BrokerServiceLifeCycle;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ConfigurationContextFactory;
+import org.apache.axis2.context.ServiceContext;
+import org.apache.axis2.context.ServiceGroupContext;
+import org.apache.axis2.deployment.DeploymentEngine;
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.InOutAxisOperation;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.engine.ListenerManager;
+import org.apache.axis2.engine.MessageReceiver;
+import org.apache.axis2.engine.ServiceLifeCycle;
+import org.apache.axis2.transport.http.SimpleHTTPServer;
+
+public class TestUtilServer {
+    private static int count = 0;
+
+    private static SimpleHTTPServer receiver;
+
+    public static final int TESTING_PORT = 5555;
+
+    public static final String FAILURE_MESSAGE = "Intentional Failure";
+
+    public static synchronized void deployService(AxisService service) throws AxisFault {
+        receiver.getConfigurationContext().getAxisConfiguration().addService(service);
+    }
+
+    public static synchronized void unDeployService(QName service) throws AxisFault {
+        receiver.getConfigurationContext().getAxisConfiguration().removeService(service.getLocalPart());
+    }
+
+    public static synchronized void unDeployClientService() throws AxisFault {
+        if (receiver.getConfigurationContext().getAxisConfiguration() != null) {
+            receiver.getConfigurationContext().getAxisConfiguration().removeService("AnonymousService");
+        }
+    }
+
+    public static synchronized void start() throws Exception {
+        start(prefixBaseDirectory(Constants.TESTING_REPOSITORY));
+    }
+
+    public static synchronized void start(String repository) throws Exception {
+        if (count == 0) {
+            ConfigurationContext er = getNewConfigurationContext(repository);
+
+            receiver = new SimpleHTTPServer(er, TESTING_PORT);
+
+            try {
+                receiver.start();
+                ListenerManager listenerManager = er.getListenerManager();
+                TransportInDescription trsIn = new TransportInDescription(Constants.TRANSPORT_HTTP);
+                trsIn.setReceiver(receiver);
+                if (listenerManager == null) {
+                    listenerManager = new ListenerManager();
+                    listenerManager.init(er);
+                }
+                listenerManager.addListener(trsIn, true);
+                System.out.print("Server started on port " + TESTING_PORT + ".....");
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        try {
+            Thread.sleep(2000);
+        } catch (InterruptedException e1) {
+            throw new AxisFault("Thread interuptted", e1);
+        }
+
+        count++;
+    }
+
+    public static synchronized void start(String repository, String axis2xml) throws Exception {
+        if (count == 0) {
+            ConfigurationContext er = getNewConfigurationContext(repository, axis2xml);
+
+            receiver = new SimpleHTTPServer(er, TESTING_PORT);
+
+            try {
+                receiver.start();
+                System.out.print("Server started on port " + TESTING_PORT + ".....");
+            } catch (Exception e) {
+                throw AxisFault.makeFault(e);
+            }
+
+            try {
+                Thread.sleep(2000);
+            } catch (InterruptedException e1) {
+                throw new AxisFault("Thread interuptted", e1);
+            }
+            startBroker();
+        }
+        count++;
+    }
+
+    public static void startBroker() throws Exception {
+
+        ServiceLifeCycle brokerLifeCycle = new BrokerServiceLifeCycle();
+
+        final String configFileParam = "configuration.file.name";
+        final String configFileValue = "org.apache.airavata.wsmg.broker.properties";
+
+        AxisService notificationService = getNotificationService();
+        notificationService.addParameter(configFileParam, configFileValue);
+        notificationService.setServiceLifeCycle(brokerLifeCycle);
+        TestUtilServer.deployService(notificationService);
+        brokerLifeCycle.startUp(TestUtilServer.getConfigurationContext(), notificationService);
+
+        AxisService eventingService = getEventingService();
+        eventingService.addParameter(configFileParam, configFileValue);
+        eventingService.setServiceLifeCycle(brokerLifeCycle);
+        TestUtilServer.deployService(eventingService);
+        brokerLifeCycle.startUp(TestUtilServer.getConfigurationContext(), eventingService);
+
+    }
+
+    public static AxisService getEventingService() {
+
+        AxisService eventingService = new AxisService("EventingService");
+
+        createOperation(eventingService, "renew",
+                new org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver(),
+                "http://schemas.xmlsoap.org/ws/2004/08/eventing/Renew",
+                "http://schemas.xmlsoap.org/ws/2004/08/eventing/RenewResponse");
+        createOperation(eventingService, "getStatus",
+                new org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver(),
+                "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus",
+                "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatusResponse");
+
+        createOperation(eventingService, "subscriptionEnd",
+                new org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver(),
+                "http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscriptionEnd", null);
+
+        createOperation(eventingService, "subscribe",
+                new org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver(),
+                "http://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe",
+                "http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscribeResponse");
+        createOperation(eventingService, "unsubscribe",
+                new org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver(),
+                "http://schemas.xmlsoap.org/ws/2004/08/eventing/Unsubscribe",
+                "http://schemas.xmlsoap.org/ws/2004/08/eventing/UnsubscribeResponse");
+        createOperation(eventingService, "publish",
+                new org.apache.airavata.wsmg.broker.wseventing.WSEventingPublishMsgReceiver(),
+                "http://www.extreme.indiana.edu/WseNotification", null);
+
+        return eventingService;
+    }
+
+    public static AxisService getNotificationService() {
+
+        AxisService notificationService = new AxisService("NotificationService");
+
+        createOperation(notificationService, "notify",
+                new org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver(),
+                "http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/Notify",
+                "http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/NotifyResponse");
+
+        createOperation(notificationService, "subscribe",
+                new org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver(),
+                "http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/SubscribeRequest",
+                "http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/SubscribeRequestResponse");
+
+        createOperation(notificationService, "getCurrentMessage",
+                new org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver(),
+                "http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/GetCurrentMessageRequest",
+                "http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/GetCurrentMessageResponse");
+
+        createOperation(notificationService, "pauseSubscription",
+                new org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver(),
+                "http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/PauseSubsriptionRequest",
+                "http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/PauseSubscriptionResponse");
+
+        createOperation(notificationService, "resumeSubscription",
+                new org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver(),
+                "http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/ResumeSubsriptionRequest",
+                "http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/ResumeSubscriptionResponse");
+
+        createOperation(notificationService, "unsubscribe",
+                new org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver(),
+                "http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/UnsubsribeRequest",
+                "http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/UnsubscribeResponse");
+
+        return notificationService;
+
+    }
+
+    public static void createOperation(AxisService axisService, String name, MessageReceiver messageReceiver,
+            String inputAction, String outputAction) {
+        InOutAxisOperation operation1 = new InOutAxisOperation(new QName(name));
+        operation1.setMessageReceiver(messageReceiver);
+        operation1.setOutputAction(outputAction);
+        axisService.addOperation(operation1);
+        if (inputAction != null) {
+            axisService.mapActionToOperation(inputAction, operation1);
+        }
+    }
+
+    public static ConfigurationContext getNewConfigurationContext(String repository) throws Exception {
+        File file = new File(repository);
+        if (!file.exists()) {
+            throw new Exception("repository directory " + file.getAbsolutePath() + " does not exists");
+        }
+        return ConfigurationContextFactory.createConfigurationContextFromFileSystem(file.getAbsolutePath(),
+                file.getAbsolutePath() + "/conf/axis2.xml");
+    }
+
+    public static ConfigurationContext getNewConfigurationContext(String repository, String axis2xml) throws Exception {
+        // File file = new File(prefixBaseDirectory(repository));
+        // if (!file.exists()) {
+        // throw new Exception("repository directory "
+        // + file.getAbsolutePath() + " does not exists");
+        // }
+        return ConfigurationContextFactory.createConfigurationContextFromFileSystem(repository, axis2xml);
+    }
+
+    public static synchronized void stop() throws AxisFault {
+        if (count == 1) {
+            receiver.stop();
+            while (receiver.isRunning()) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e1) {
+                    // nothing to do here
+                }
+            }
+            count = 0;
+            // tp.doStop();
+            System.out.print("Server stopped .....");
+        } else {
+            count--;
+        }
+        receiver.getConfigurationContext().terminate();
+    }
+
+    public static ConfigurationContext getConfigurationContext() {
+        return receiver.getConfigurationContext();
+    }
+
+    public static ServiceContext createAdressedEnabledClientSide(AxisService service) throws AxisFault {
+        File file = getAddressingMARFile();
+        assert (file.exists());
+        ConfigurationContext configContext = ConfigurationContextFactory.createConfigurationContextFromFileSystem(
+                prefixBaseDirectory("target/test-resources/integrationRepo"), null);
+        AxisModule axisModule = DeploymentEngine.buildModule(file, configContext.getAxisConfiguration());
+        configContext.getAxisConfiguration().addModule(axisModule);
+
+        configContext.getAxisConfiguration().addService(service);
+
+        ServiceGroupContext serviceGroupContext = configContext
+                .createServiceGroupContext(service.getAxisServiceGroup());
+        return serviceGroupContext.getServiceContext(service);
+    }
+
+    static class AddressingFilter implements FilenameFilter {
+        public boolean accept(File dir, String name) {
+            return name.startsWith("addressing") && name.endsWith(".mar");
+        }
+    }
+
+    private static File getAddressingMARFile() {
+        File dir = new File(prefixBaseDirectory(Constants.TESTING_REPOSITORY + "/modules"));
+        File[] files = dir.listFiles(new AddressingFilter());
+        assert ((files.length == 1));
+        File file = files[0];
+        assert (file.exists());
+        return file;
+    }
+
+    public static ConfigurationContext createClientConfigurationContext() throws AxisFault {
+        File file = getAddressingMARFile();
+        assert (file.exists());
+
+        ConfigurationContext configContext = ConfigurationContextFactory.createConfigurationContextFromFileSystem(
+                prefixBaseDirectory(Constants.TESTING_PATH + "/integrationRepo"),
+                prefixBaseDirectory(Constants.TESTING_PATH + "/integrationRepo/conf/axis2.xml"));
+        AxisModule axisModule = DeploymentEngine.buildModule(file, configContext.getAxisConfiguration());
+        configContext.getAxisConfiguration().addModule(axisModule);
+        return configContext;
+    }
+
+    public static ConfigurationContext createClientConfigurationContext(String repo) throws AxisFault {
+        return ConfigurationContextFactory.createConfigurationContextFromFileSystem(repo, repo + "/conf/axis2.xml");
+    }
+
+    public static ServiceContext createAdressedEnabledClientSide(AxisService service, String clientHome)
+            throws AxisFault {
+        File file = getAddressingMARFile();
+        assert (file.exists());
+
+        ConfigurationContext configContext = ConfigurationContextFactory.createConfigurationContextFromFileSystem(
+                clientHome, null);
+        AxisModule axisModule = DeploymentEngine.buildModule(file, configContext.getAxisConfiguration());
+
+        configContext.getAxisConfiguration().addModule(axisModule);
+        // sysContext.getAxisConfiguration().engageModule(moduleDesc.getName());
+
+        configContext.getAxisConfiguration().addService(service);
+        ServiceGroupContext serviceGroupContext = configContext
+                .createServiceGroupContext(service.getAxisServiceGroup());
+        return serviceGroupContext.getServiceContext(service);
+    }
+
+    public static String prefixBaseDirectory(String path) {
+        // String baseDir;
+        // try {
+        // baseDir = new File(System.getProperty("basedir",
+        // ".")).getCanonicalPath();
+        // } catch (IOException e) {
+        // throw new RuntimeException(e);
+        // }
+        // return baseDir + "/" + path;
+        return path;
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/addressing-1.5.mar
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/addressing-1.5.mar?rev=1142453&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/addressing-1.5.mar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql Sun Jul  3 15:51:36 2011
@@ -0,0 +1,8 @@
+delete from disQ;
+delete from MaxIDTable;
+delete from MinIDTable;
+delete from specialSubscription;
+delete from subscription;
+delete from msgbox;
+
+

Propchange: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/database_scripts/mysqlCreationScript.sql
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/database_scripts/mysqlCreationScript.sql?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/database_scripts/mysqlCreationScript.sql (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/database_scripts/mysqlCreationScript.sql Sun Jul  3 15:51:36 2011
@@ -0,0 +1,40 @@
+##Used for mySQL database
+CREATE TABLE `subscription` (                                     
+                `SubscriptionId` varchar(200) NOT NULL default '',          
+                `Topics` varchar(255) default '',                               
+                `XPath` varchar(200) default '',                       
+                `ConsumerAddress` varchar(100) default '',                      
+                `ReferenceProperties` blob,
+                `xml` blob,                                                     
+                `wsrm` tinyint(1) NOT NULL default '0',                                  
+                `CreationTime` datetime NOT NULL default '0000-00-00 00:00:00'  
+              );
+CREATE TABLE `specialSubscription` (                              
+                       `SubscriptionId` varchar(200) NOT NULL default '',              
+                       `Topics` varchar(255) default '',                               
+                       `XPath` varchar(200) default '',                                
+                       `ConsumerAddress` varchar(100) default '',                      
+                       `ReferenceProperties` blob,                                     
+                       `xml` blob,                                                     
+                       `wsrm` tinyint(1) NOT NULL default '0',                         
+                       `CreationTime` datetime NOT NULL default '0000-00-00 00:00:00'  
+                     );               
+
+	
+CREATE TABLE `disQ` (                       
+          `id` bigint(11) NOT NULL auto_increment,  
+          `trackId` varchar(100) default NULL,      
+          `message` longblob,                           
+          `status` int(11) default NULL,    
+          `topic` varchar(255) default '',     
+          PRIMARY KEY  (`id`)                       
+        );	
+	
+CREATE TABLE MaxIDTable(
+	maxID integer
+	);
+	
+CREATE TABLE MinIDTable(
+	minID integer
+	);
+	
\ No newline at end of file

Propchange: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/database_scripts/mysqlCreationScript.sql
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/log4j.properties.txt
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/log4j.properties.txt?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/log4j.properties.txt (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/log4j.properties.txt Sun Jul  3 15:51:36 2011
@@ -0,0 +1,28 @@
+# Set root category priority to INFO and its only appender to CONSOLE.
+#log4j.rootCategory=INFO, CONSOLE
+log4j.rootCategory=DEBUG, CONSOLE, LOGFILE, BrokerLog
+
+# Set the enterprise logger priority to FATAL
+log4j.logger.org.apache.axis2.enterprise=FATAL
+log4j.logger.de.hunsicker.jalopy.io=FATAL
+log4j.logger.httpclient.wire.header=FATAL
+log4j.logger.org.apache.commons.httpclient=FATAL
+
+# CONSOLE is set to be a ConsoleAppender using a PatternLayout.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=[%p] %m%n
+
+# LOGFILE is set to be a File appender using a PatternLayout.
+log4j.appender.LOGFILE=org.apache.log4j.FileAppender
+log4j.appender.LOGFILE.File=${catalina.home}/logs/axis2.log
+log4j.appender.LOGFILE.Append=true
+log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.LOGFILE.layout.ConversionPattern=%d [%t] %-5p %c %x - %m%n
+
+log4j.appender.BrokerLog=org.apache.log4j.RollingFileAppender
+log4j.appender.BrokerLog.layout=org.apache.log4j.PatternLayout
+log4j.appender.BrokerLog.layout.ConversionPattern=%d [%c] - %m%n
+
+log4j.appender.BrokerLog.File=${catalina.home}/logs/broker.log 
+

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/services.xml
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/services.xml?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/services.xml (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/resources/services.xml Sun Jul  3 15:51:36 2011
@@ -0,0 +1,158 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<?xml version="1.0" encoding="UTF-8"?>
+<serviceGroup>
+	<service name="EventingService" class="org.apache.airavata.wsmg.broker.BrokerServiceLifeCycle">
+
+		<operation name="renew">
+			<messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+
+			<actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Renew
+			</actionMapping>
+			<outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/RenewResponse
+			</outputActionMapping>
+		</operation>
+
+		<operation name="getStatus">
+
+			<messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+
+			<actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus
+			</actionMapping>
+			<outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatusResponse
+			</outputActionMapping>
+		</operation>
+
+		<operation name="subscriptionEnd">
+
+			<messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+
+			<actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscriptionEnd
+			</actionMapping>
+		</operation>
+
+		<operation name="subscribe">
+
+			<messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+
+			<actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe
+			</actionMapping>
+			<outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscribeResponse
+			</outputActionMapping>
+		</operation>
+
+		<operation name="unsubscribe">
+
+			<messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+			<actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Unsubscribe
+			</actionMapping>
+
+			<outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/UnsubscribeResponse
+			</outputActionMapping>
+		</operation>
+
+		<operation name="publish">
+			<messageReceiver
+				class="org.apache.airavata.wsmg.broker.wseventing.WSEventingPublishMsgReceiver" />
+			<actionMapping>http://www.extreme.indiana.edu/WseNotification
+			</actionMapping>
+		</operation>
+
+	</service>
+
+	<service name="NotificationService" class="org.apache.airavata.wsmg.broker.BrokerServiceLifeCycle">
+
+		<operation name="notify">
+			<messageReceiver
+				class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+
+			<actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/Notify
+			</actionMapping>
+			<outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/NotifyResponse
+			</outputActionMapping>
+		</operation>
+		<operation name="subscribe">
+
+			<messageReceiver
+				class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+
+			<actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/SubscribeRequest
+			</actionMapping>
+			<outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/SubscribeRequestResponse
+			</outputActionMapping>
+		</operation>
+
+		<operation name="getCurrentMessage">
+
+			<messageReceiver
+				class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+
+			<actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/GetCurrentMessageRequest
+			</actionMapping>
+
+			<outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/GetCurrentMessageResponse
+			</outputActionMapping>
+
+		</operation>
+		<operation name="pauseSubscription">
+
+			<messageReceiver
+				class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+
+			<actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/PauseSubsriptionRequest
+			</actionMapping>
+
+			<outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/PauseSubscriptionResponse
+			</outputActionMapping>
+
+		</operation>
+		<operation name="resumeSubscription">
+
+			<messageReceiver
+				class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+
+			<actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/ResumeSubsriptionRequest
+			</actionMapping>
+
+			<outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/ResumeSubscriptionResponse
+			</outputActionMapping>
+
+		</operation>
+
+		<operation name="unsubscribe">
+
+			<messageReceiver
+				class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+
+			<actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/UnsubsribeRequest
+			</actionMapping>
+
+			<outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/UnsubscribeResponse
+			</outputActionMapping>
+
+		</operation>
+
+	</service>
+	
+	 <parameter name="configuration.file.name" locked="false">org.apache.airavata.wsmg.broker.properties</parameter>
+	  
+</serviceGroup>

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/README.txt
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/README.txt?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/README.txt (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/README.txt Sun Jul  3 15:51:36 2011
@@ -0,0 +1,25 @@
+OGCE-WS messenger Quick Start Guide- Sample 1 
+=================================
+
+This sample demonstrates a scenarios where multiple producers publish messages under a topic, while multiple consumers receive them.
+
+
+Pre-Requisites
+==============
+
+Apache Ant 1.7.1 or later
+Apache Axis2 1.5 or later
+
+
+
+Steps:
+======
+
+1) configure and run ws-messenger in any mode. please refer ws-messenger user guide to know how to run the ws-messenger.
+
+2) configure 'build.properties' located in the sample directory.
+
+3) set configurations in './conf/configuration.properties' file.
+
+4) run following command:
+      ant run

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/build.properties
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/build.properties?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/build.properties (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/build.properties Sun Jul  3 15:51:36 2011
@@ -0,0 +1,23 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+
+axis2.home=

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/build.xml
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/build.xml?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/build.xml (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/build.xml Sun Jul  3 15:51:36 2011
@@ -0,0 +1,87 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<?xml version="1.0"?>
+
+
+<project name="wsmgsamples" default="run" basedir=".">
+
+	<property file="build.properties" />
+	<property name="lib.path" value="../../" />
+	<property name="dest.dir" value="bin" />
+	<property name="src.dir" value="src" />
+	<property name="conf.dir" location="conf" />
+
+	<path id="broker.libs.path">
+		<fileset dir="${lib.path}">
+			<include name="*.jar" />
+		</fileset>
+
+		<fileset dir="${axis2.home}/lib">
+			<include name="*.jar" />
+		</fileset>
+	</path>
+
+
+	<path id="broker.class.path">
+		<fileset dir="${lib.path}">
+			<include name="*.jar" />
+		</fileset>
+
+		<fileset dir="${axis2.home}/lib">
+			<include name="*.jar" />
+		</fileset>
+		
+		<path location="${conf.dir}" />
+		
+		<pathelement location="${dest.dir}" />
+	</path>
+
+	<target name="clean">
+		<delete dir="${dest.dir}" />
+	</target>
+
+	<target name="build" depends="makeDest">
+		<antcall target="compile" />
+	</target>
+
+	<target name="makeDest">
+		<mkdir dir="${dest.dir}" />
+	</target>
+
+
+	<target name="compile" depends="makeDest">
+		<javac debug="true" srcdir="${src.dir}" destdir="${dest.dir}">
+			<classpath refid="broker.libs.path" />
+		</javac>
+	</target>
+
+
+	<target name="run" depends="build">
+
+		<java classname="org.apache.airavata.wsmg.samples.wse.MultipleProducersConsumersTopics" fork="true">
+			<classpath refid="broker.class.path" />
+		</java>
+
+	</target>
+
+
+</project>

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/conf/configurations.properties
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/conf/configurations.properties?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/conf/configurations.properties (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/conf/configurations.properties Sun Jul  3 15:51:36 2011
@@ -0,0 +1,29 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+broker.eventing.service.epr=http://localhost:8080/axis2/services/EventingService
+broker.notification.service.epr=http://localhost:8080/axis2/services/NotificationService
+consumer.port=6060
+topic.prefix=topic_prefix_
+publish.time.interval=6
+producer.count.per.topic=1
+consumer.count.per.topic=1
+number.of.topics=1

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/util/ConfigKeys.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/util/ConfigKeys.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/util/ConfigKeys.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/util/ConfigKeys.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.wsmg.samples.util;
+
+public interface ConfigKeys {
+
+	String CONFIG_FILE_NAME = "configurations.properties";
+
+	String BROKER_EVENTING_SERVICE_EPR = "broker.eventing.service.epr";
+	String BROKER_NOTIFICATIONS_SERVICE_EPR = "broker.notification.service.epr";
+
+	String CONSUMER_PORT_OFFSET = "consumer.port";
+	String TOPIC_PREFIX = "topic.prefix";
+	String PUBLISH_TIME_INTERVAL = "publish.time.interval";
+	String PRODUCER_COUNT_PER_TOPIC = "producer.count.per.topic";
+	String CONSUMER_COUNT_PER_TOPIC = "consumer.count.per.topic";
+	String NUMBER_OF_TOPICS = "number.of.topics";
+	String LOG_FILE_PATH = "logfile.path";
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/Consumer.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/Consumer.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/Consumer.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/Consumer.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.wsmg.samples.wse;
+
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+
+import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
+import org.apache.airavata.wsmg.client.MsgBrokerClientException;
+import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
+import org.apache.airavata.wsmg.samples.util.ConfigKeys;
+
+public class Consumer extends Thread {
+
+	class NotificationMsgReciever implements ConsumerNotificationHandler {
+
+		private BlockingQueue<SOAPEnvelope> queue = new LinkedBlockingQueue<SOAPEnvelope>();
+
+		public void handleNotification(SOAPEnvelope msgEnvelope) {
+
+			queue.add(msgEnvelope);
+		}
+
+		public BlockingQueue<SOAPEnvelope> getQueue() {
+			return queue;
+		}
+
+	}
+
+	private Properties configurations;
+	private int consumerPort;
+	private String topic;
+
+	AtomicLong numberOfMsgRecieved;
+
+	public Consumer(String consumerName, int port, String topic,
+			Properties config) {
+		super(consumerName);
+		consumerPort = port;
+		configurations = config;
+		this.topic = topic;
+		numberOfMsgRecieved = new AtomicLong(0);
+	}
+
+	public void run() {
+
+		String brokerLocation = configurations
+				.getProperty(ConfigKeys.BROKER_EVENTING_SERVICE_EPR);
+
+		NotificationMsgReciever msgReciever = new NotificationMsgReciever();
+
+		String[] consumerEprs = null;
+
+		String subscriptionId = null;
+
+		WseMsgBrokerClient client = new WseMsgBrokerClient();
+		client.init(brokerLocation);
+		try {
+			consumerEprs = client.startConsumerService(consumerPort,
+					msgReciever);
+
+		} catch (MsgBrokerClientException e) {
+
+			e.printStackTrace();
+
+			System.out.println("unable to start consumer service, exiting");
+			return;
+		}
+
+		try {
+
+			subscriptionId = client.subscribe(consumerEprs[0], topic, null);
+			System.out.println(getName() + "got the subscription id :"
+					+ subscriptionId);
+
+		} catch (MsgBrokerClientException e) {
+
+			e.printStackTrace();
+
+			System.out
+					.println("unable to subscribe for the xpath consumer exiting");
+			return;
+		}
+
+		try {
+
+			do {
+				SOAPEnvelope env = msgReciever.getQueue().take();
+
+				try {
+					OMElement msg = env.getBody().getFirstElement();
+
+					numberOfMsgRecieved.incrementAndGet();
+
+					/*
+					 * System.out .println(String.format(
+					 * "consumer [%s] recieved: %s", getName(),
+					 * numberOfMsgRecieved));
+					 */
+				} catch (Exception e) {
+					System.err.print("invalid msg recieved");
+				}
+
+			} while (true);
+
+		} catch (InterruptedException ie) {
+
+			try {
+				// unsubscribe from the topic.
+				client.unSubscribe(subscriptionId);
+
+			} catch (MsgBrokerClientException e) {
+
+				e.printStackTrace();
+				System.out.println("unable to unsubscribe, ignoring");
+			}
+
+			// shutdown the consumer service.
+			client.shutdownConsumerService();
+
+			System.out.println("interrupted");
+
+		}
+
+	}
+
+	public long getLatestSeq() {
+		return numberOfMsgRecieved.get();
+	}
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/MsgUtil.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/MsgUtil.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/MsgUtil.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/MsgUtil.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.wsmg.samples.wse;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+
+public class MsgUtil {
+
+	static final String TAG_MSG = "msg";
+	static final String TAG_SEQ = "seq";
+	static final String TAG_SRC = "src";
+	static final String TAG_UUID = "uuid";
+
+	public static OMElement createMsg(long seq, String src, String uuid) {
+
+		// "<msg><seq>%d</seq><src>%s</src><uuid>%s</uuid></msg>"
+
+		OMFactory factory = OMAbstractFactory.getOMFactory();
+
+		OMElement omMsg = factory.createOMElement(TAG_MSG, null);
+
+		OMElement omSeq = factory.createOMElement(TAG_SEQ, null, omMsg);
+		omSeq.setText("" + seq);
+
+		OMElement omSrc = factory.createOMElement(TAG_SRC, null, omMsg);
+		omSrc.setText(src);
+
+		OMElement omUUID = factory.createOMElement(TAG_UUID, null, omMsg);
+		omUUID.setText(uuid);
+
+		return omMsg;
+	}
+
+	public static String getSeq(OMElement msg) {
+
+		return msg.getFirstElement().getText();
+
+	}
+
+	public static void print(String msg, OMElement ele){
+		
+		try{
+			System.out.println(msg + "-" + ele.toStringWithConsume());
+		}catch(Exception e){
+			e.printStackTrace();
+		}
+		
+	}
+	
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/MultipleProducersConsumersTopics.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/MultipleProducersConsumersTopics.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/MultipleProducersConsumersTopics.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/MultipleProducersConsumersTopics.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,170 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.wsmg.samples.wse;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URL;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.airavata.wsmg.samples.util.ConfigKeys;
+
+public class MultipleProducersConsumersTopics {
+
+	static DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+
+	private static Properties getDefaults() {
+
+		Properties defaults = new Properties();
+		defaults.setProperty(ConfigKeys.BROKER_EVENTING_SERVICE_EPR,
+				"http://localhost:8080/axis2/services/EventingService");
+		defaults.setProperty(ConfigKeys.CONSUMER_PORT_OFFSET, "2222");
+
+		defaults.setProperty(ConfigKeys.PUBLISH_TIME_INTERVAL, "5");
+		defaults.setProperty(ConfigKeys.PRODUCER_COUNT_PER_TOPIC, "2");
+		defaults.setProperty(ConfigKeys.CONSUMER_COUNT_PER_TOPIC, "3");
+		defaults.setProperty(ConfigKeys.NUMBER_OF_TOPICS, "5");
+		defaults.setProperty(ConfigKeys.TOPIC_PREFIX, "topic_prefix_");
+		defaults.setProperty(ConfigKeys.LOG_FILE_PATH, "stats.log");
+
+		return defaults;
+	}
+
+	public static void main(String[] args) throws InterruptedException {
+
+		Properties configurations = new Properties(getDefaults());
+		try {
+
+			URL url = ClassLoader
+					.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+			if (url == null) {
+				throw new IOException("configuration file not found");
+			}
+			configurations.load(url.openStream());
+
+		} catch (IOException ioe) {
+
+			System.out.println("unable to load configuration file, "
+					+ "default settings will be used");
+		}
+
+		int numberOfProducers = Integer.parseInt(configurations
+				.getProperty(ConfigKeys.PRODUCER_COUNT_PER_TOPIC));
+
+		int numberOfConsumers = Integer.parseInt(configurations
+				.getProperty(ConfigKeys.CONSUMER_COUNT_PER_TOPIC));
+
+		int numberOfTopics = Integer.parseInt(configurations
+				.getProperty(ConfigKeys.NUMBER_OF_TOPICS));
+
+		String topicPrefix = configurations
+				.getProperty(ConfigKeys.TOPIC_PREFIX);
+
+		int portOffset = Integer.parseInt(configurations
+				.getProperty(ConfigKeys.CONSUMER_PORT_OFFSET));
+
+		List<List<Consumer>> consumers = new ArrayList<List<Consumer>>();
+		List<List<Producer>> producers = new ArrayList<List<Producer>>();
+
+		for (int i = 0; i < numberOfTopics; i++) {
+			String topic = topicPrefix + i;
+			consumers.add(createConsumers(numberOfConsumers, portOffset
+					+ (i * numberOfConsumers), topic, configurations));
+
+			TimeUnit.SECONDS.sleep(1);
+
+			producers.add(createProducers(numberOfProducers, topic,
+					configurations));
+		}
+
+		PrintStream printStream = null;
+		try {
+			FileOutputStream outputStream = new FileOutputStream(configurations
+					.getProperty(ConfigKeys.LOG_FILE_PATH), true);
+
+			printStream = new PrintStream(outputStream, true);
+
+		} catch (FileNotFoundException e) {
+			System.out
+					.println("unable to open the file - stats will be printed to console");
+			printStream = System.out;
+		}
+
+		while (true) {
+
+			Date date = new Date();
+			printStream.println("---- statistics at : ["
+					+ dateFormat.format(date) + "]------");
+			for (List<Consumer> l : consumers) {
+
+				for (Consumer c : l) {
+					printStream.println(c.getName() + " latest seq: "
+							+ c.getLatestSeq());
+
+				}
+			}
+
+			TimeUnit.SECONDS.sleep(5);
+		}
+
+	}
+
+	private static List<Consumer> createConsumers(int number, int portOffset,
+			String topic, Properties config) {
+
+		List<Consumer> ret = new ArrayList<Consumer>();
+
+		for (int i = 0; i < number; i++) {
+
+			int port = portOffset + i;
+			Consumer c = new Consumer("consumer_" + port, port, topic, config);
+			c.start();
+			ret.add(c);
+		}
+
+		return ret;
+	}
+
+	private static List<Producer> createProducers(int number, String topic,
+			Properties config) {
+
+		List<Producer> ret = new ArrayList<Producer>();
+
+		for (int i = 0; i < number; i++) {
+			Producer p = new Producer(
+					String.format("producer_%s_%d", topic, i), topic, config);
+			p.start();
+			ret.add(p);
+		}
+
+		return ret;
+	}
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/Producer.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/Producer.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/Producer.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-topics/src/wsmg/samples/wse/Producer.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.wsmg.samples.wse;
+
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.axiom.om.OMElement;
+
+import org.apache.airavata.wsmg.client.MsgBrokerClientException;
+import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
+import org.apache.airavata.wsmg.samples.util.ConfigKeys;
+
+public class Producer extends Thread {
+
+	private Properties configurations;
+	private String topicExpression;
+
+	private Random random;
+
+	public Producer(String producerId, String topic, Properties config) {
+		super(producerId);
+		configurations = config;
+		topicExpression = topic;
+		random = new Random();
+	}
+
+	public void run() {
+
+		System.out.println(String
+				.format("producer [%s] starting...", getName()));
+
+		String brokerLocation = configurations
+				.getProperty(ConfigKeys.BROKER_EVENTING_SERVICE_EPR);
+
+		int timeIntervalUpperLimit = Integer.parseInt(configurations
+				.getProperty(ConfigKeys.PUBLISH_TIME_INTERVAL));
+
+		WseMsgBrokerClient client = new WseMsgBrokerClient();
+		client.init(brokerLocation);
+
+		try {
+
+			long count = 0;
+			while (true) {
+				UUID uuid = UUID.randomUUID();
+				count++;
+				OMElement omMsg = MsgUtil.createMsg(count, getName(), uuid
+						.toString());
+				//MsgUtil.print("seding msg", omMsg);
+				client.publish(topicExpression, omMsg);
+
+				TimeUnit.SECONDS
+						.sleep(getRandomSleepTime(timeIntervalUpperLimit));
+			}
+
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+			System.out.println("interruped");
+		} catch (MsgBrokerClientException f) {
+			f.printStackTrace();
+			System.out
+					.println("unable to publish messages - producer will stop.");
+
+		}
+	}
+
+	private int getRandomSleepTime(int upperLimit) {
+
+		int rnd = random.nextInt() % upperLimit;
+
+		return rnd < 2 ? 2 : rnd;
+	}
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/README.txt
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/README.txt?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/README.txt (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/README.txt Sun Jul  3 15:51:36 2011
@@ -0,0 +1,25 @@
+OGCE-WS messenger Quick Start Guide- Sample 1 
+=================================
+
+This sample demonstrates a scenarios where multiple producers publish messages under a topic, while multiple consumers receive them.
+
+
+Pre-Requisites
+==============
+
+Apache Ant 1.7.1 or later
+Apache Axis2 1.5 or later
+
+
+
+Steps:
+======
+
+1) configure and run ws-messenger in any mode. please refer ws-messenger user guide to know how to run the ws-messenger.
+
+2) configure 'build.properties' located in the sample directory.
+
+3) set configurations in './conf/configuration.properties' file.
+
+4) run following command:
+      ant run

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/build.properties
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/build.properties?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/build.properties (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/build.properties Sun Jul  3 15:51:36 2011
@@ -0,0 +1,23 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+
+axis2.home=

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/build.xml
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/build.xml?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/build.xml (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/build.xml Sun Jul  3 15:51:36 2011
@@ -0,0 +1,87 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<?xml version="1.0"?>
+
+
+<project name="wsmgsamples" default="run" basedir=".">
+
+	<property file="build.properties" />
+	<property name="lib.path" value="../../" />
+	<property name="dest.dir" value="bin" />
+	<property name="src.dir" value="src" />
+	<property name="conf.dir" location="conf" />
+
+	<path id="broker.libs.path">
+		<fileset dir="${lib.path}">
+			<include name="*.jar" />
+		</fileset>
+
+		<fileset dir="${axis2.home}/lib">
+			<include name="*.jar" />
+		</fileset>
+	</path>
+
+
+	<path id="broker.class.path">
+		<fileset dir="${lib.path}">
+			<include name="*.jar" />
+		</fileset>
+
+		<fileset dir="${axis2.home}/lib">
+			<include name="*.jar" />
+		</fileset>
+		
+		<path location="${conf.dir}" />
+		
+		<pathelement location="${dest.dir}" />
+	</path>
+
+	<target name="clean">
+		<delete dir="${dest.dir}" />
+	</target>
+
+	<target name="build" depends="makeDest">
+		<antcall target="compile" />
+	</target>
+
+	<target name="makeDest">
+		<mkdir dir="${dest.dir}" />
+	</target>
+
+
+	<target name="compile" depends="makeDest">
+		<javac debug="true" srcdir="${src.dir}" destdir="${dest.dir}">
+			<classpath refid="broker.libs.path" />
+		</javac>
+	</target>
+
+
+	<target name="run" depends="build">
+
+		<java classname="org.apache.airavata.wsmg.samples.wse.MultipleProducersConsumers" fork="true">
+			<classpath refid="broker.class.path" />
+		</java>
+
+	</target>
+
+
+</project>

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/conf/configurations.properties
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/conf/configurations.properties?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/conf/configurations.properties (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/conf/configurations.properties Sun Jul  3 15:51:36 2011
@@ -0,0 +1,28 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+broker.eventing.service.epr=http://localhost:8080/axis2/services/EventingService
+broker.notification.service.epr=http://localhost:8080/axis2/services/NotificationService
+consumer.port=6060
+topic.xpath=/msg/src
+publish.time.interval=5
+producer.count=2
+consumer.count=3

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/src/wsmg/samples/util/ConfigKeys.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/src/wsmg/samples/util/ConfigKeys.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/src/wsmg/samples/util/ConfigKeys.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/samples/wse-multiple-producers-consumers-xpath/src/wsmg/samples/util/ConfigKeys.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.wsmg.samples.util;
+
+public interface ConfigKeys {
+
+	String CONFIG_FILE_NAME = "configurations.properties";
+
+	String BROKER_EVENTING_SERVICE_EPR = "broker.eventing.service.epr";
+	String BROKER_NOTIFICATIONS_SERVICE_EPR = "broker.notification.service.epr";
+
+	String CONSUMER_EPR = "consumer.location";
+	String CONSUMER_PORT_OFFSET = "consumer.port";
+	String TOPIC_SIMPLE = "topic.simple";
+	String TOPIC_XPATH = "topic.xpath";
+	String AXIS2_REPO = "axis2.repo";
+	String PUBLISH_TIME_INTERVAL = "publish.time.interval";
+	String PRODUCER_COUNT = "producer.count";
+	String CONSUMER_COUNT = "consumer.count";
+}