You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2011/08/01 22:14:39 UTC
svn commit: r1152917 - in
/incubator/airavata/trunk/ws-messaging/messagebroker/src:
main/java/org/apache/airavata/wsmg/client/
main/java/org/apache/airavata/wsmg/messenger/
samples/wse-multiple-producers-consumers/src/wsmg/samples/wse/
samples/wse-topi...
Author: lahiru
Date: Mon Aug 1 20:14:36 2011
New Revision: 1152917
URL: http://svn.apache.org/viewvc?rev=1152917&view=rev
Log:
unifying ws-messenger client implementation, removing deprecated client and use new client in all the locations including samples and test.
Added:
incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/CommonMsgBrokerClient.java
Removed:
incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/CommonClientProcessing.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/WseClientAPI.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/WsmgClientAPI.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/WsntClientAPI.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/client/WseClientAPIConsumerTest.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/client/WseClientAPIPublishTest.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/client/WseClientAPIRoundTripTests.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/client/WseClientAPISubscriptionsTest.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/client/WseClientAPITest.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/client/WsntClientAPIConsumerTest.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/client/WsntClientAPIPublishTest.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/client/WsntClientAPIRoundTripTests.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/client/WsntClientAPISubscriptionTests.java
Modified:
incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/MessageBrokerClient.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/MsgBrokerClientException.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/WseMsgBrokerClient.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/WsntMsgBrokerClient.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/samples/wse-multiple-producers-consumers/src/wsmg/samples/wse/Consumer.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/samples/wse-topic-subscription/src/wsmg/samples/wse/TopicSubscribe.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/NotificationManager.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/PublisherThread.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/Subscription.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/broker/BrokerWSETest.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/broker/BrokerWSNTTest.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestAddtionalWseXpathAndTopicScenarios.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestWseXpathAndTopicSubscription.java
incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestWseXpathSubscription.java
Added: incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/CommonMsgBrokerClient.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/CommonMsgBrokerClient.java?rev=1152917&view=auto
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/CommonMsgBrokerClient.java (added)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/CommonMsgBrokerClient.java Mon Aug 1 20:14:36 2011
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.wsmg.client;
+
+import org.apache.airavata.wsmg.client.msgbox.MessagePuller;
+import org.apache.airavata.wsmg.client.msgbox.MsgboxHandler;
+import org.apache.airavata.wsmg.commons.WsmgVersion;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.log4j.Logger;
+
+import javax.xml.namespace.QName;
+
+
+abstract class CommonMsgBrokerClient implements MessageBrokerClient {
+
+ protected final static OMFactory factory = OMAbstractFactory.getOMFactory();
+ private final static SOAPFactory soapfactory = OMAbstractFactory.getSOAP11Factory();
+
+ private final static org.apache.log4j.Logger logger = Logger.getLogger(CommonMsgBrokerClient.class);
+ protected ConsumerServer xs;
+ protected MsgboxHandler msgboxHandler = new MsgboxHandler();
+
+ private long socketTimeout = 200000L;
+
+ public CommonMsgBrokerClient(long timeout) {
+ socketTimeout = timeout;
+ WsmgVersion.requireVersionOrExit(WsmgVersion.getSpecVersion());
+ }
+
+
+ public String[] getConsumerServiceEndpointReference() {
+ if (xs == null) {
+ throw new RuntimeException("Consumer server is not started yet");
+ }
+ return xs.getConsumerServiceEPRs();
+ }
+
+
+ public void setTimeOutInMilliSeconds(long timeout) {
+ socketTimeout = timeout;
+ }
+
+ public CommonMsgBrokerClient() {
+ WsmgVersion.requireVersionOrExit(WsmgVersion.getSpecVersion());
+
+ }
+
+ public long getTimeOutInMilliSeconds() {
+ return socketTimeout;
+ }
+
+
+ public String subscribeMsgBox(String brokerService, EndpointReference msgBoxEpr, String topic, String xpath)
+ throws MsgBrokerClientException {
+
+ String msgBoxId = null;
+ String msgBoxUrl = msgBoxEpr.getAddress();
+ int biginIndex = msgBoxUrl.indexOf("clientid");
+ msgBoxId = msgBoxUrl.substring(biginIndex + "clientid".length() + 1);
+
+ if (msgBoxId == null)
+ throw new RuntimeException("Invalid Message Box EPR, message box ID is missing");
+
+ return subscribe(msgBoxEpr.getAddress(), topic, xpath);
+ }
+
+ public String subscribeMsgBox(EndpointReference msgBoxEpr, String topicExpression, String xpathExpression,
+ long expireTime) throws MsgBrokerClientException {
+
+ String msgBoxEventSink = msgBoxEpr.getAddress();
+
+ String formattedEventSink = null;
+
+ if (msgBoxEpr.getAddress().contains("clientid")) {
+ formattedEventSink = msgBoxEventSink;
+ } else {
+ if (msgBoxEpr.getAllReferenceParameters() == null)
+ throw new MsgBrokerClientException("Invalid Message Box EPR, no reference parameters found");
+ String msgBoxId = msgBoxEpr.getAllReferenceParameters()
+ .get(new QName("http://org.apache.airavata/xgws/msgbox/2004/", "MsgBoxAddr")).getText();
+ if (msgBoxId == null)
+ throw new MsgBrokerClientException("Invalid Message Box EPR, reference parameter MsgBoxAddr is missing");
+ String format = msgBoxEventSink.endsWith("/") ? "%sclientid/%s" : "%s/clientid/%s";
+
+ formattedEventSink = String.format(format, msgBoxEventSink, msgBoxId);
+
+ }
+
+ return subscribe(new EndpointReference(formattedEventSink), topicExpression, xpathExpression, expireTime);
+
+ }
+
+ // ------------------------Message box user
+ // API----------------------------//
+
+ public EndpointReference createPullMsgBox(String msgBoxLocation, long timeout) throws MsgBrokerClientException {
+
+ EndpointReference ret = null;
+ try {
+ ret = msgboxHandler.createPullMsgBox(msgBoxLocation, timeout);
+ } catch (MsgBrokerClientException e) {
+ throw e;
+ }
+
+ return ret;
+ }
+
+ public EndpointReference createPullMsgBox(String msgBoxServerLoc) throws MsgBrokerClientException {
+ EndpointReference ret = null;
+ ret = msgboxHandler.createPullMsgBox(msgBoxServerLoc);
+ return ret;
+ }
+
+ public MessagePuller startPullingEventsFromMsgBox(EndpointReference msgBoxEpr, NotificationHandler handler,
+ long interval, long timeout) throws MsgBrokerClientException {
+
+ MessagePuller ret = null;
+ ret = msgboxHandler.startPullingEventsFromMsgBox(msgBoxEpr, handler, interval, timeout);
+ return ret;
+ }
+
+ public MessagePuller startPullingFromExistingMsgBox(EndpointReference msgBoxAddr, NotificationHandler handler,
+ long interval, long timeout) throws MsgBrokerClientException {
+
+ MessagePuller ret = null;
+ ret = msgboxHandler.startPullingFromExistingMsgBox(msgBoxAddr, handler, interval, timeout);
+ return ret;
+ }
+
+ public String deleteMsgBox(EndpointReference msgBoxEpr, long timeout) throws MsgBrokerClientException {
+ String ret = null;
+ ret = msgboxHandler.deleteMsgBox(msgBoxEpr, timeout);
+ return ret;
+ }
+
+ public void stopPullingEventsFromMsgBox(MessagePuller msgPuller) {
+ msgboxHandler.stopPullingEventsFromMsgBox(msgPuller);
+ }
+}
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/MessageBrokerClient.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/MessageBrokerClient.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/MessageBrokerClient.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/MessageBrokerClient.java Mon Aug 1 20:14:36 2011
@@ -21,10 +21,13 @@
package org.apache.airavata.wsmg.client;
+import org.apache.airavata.wsmg.client.msgbox.MessagePuller;
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
+import java.rmi.RemoteException;
+
public interface MessageBrokerClient {
/**
@@ -94,4 +97,87 @@ public interface MessageBrokerClient {
*/
public String subscribeMsgBox(EndpointReference msgBoxEpr, String topicExpression, String xpathExpression,
long expireTime) throws MsgBrokerClientException;
+
+
+
+ /**
+ * This method can be used to shutdown the consumer server started
+ */
+ public void shutdownConsumerService();
+
+ /**
+ *
+ * @param port
+ * @param handler
+ * @return
+ * @throws MsgBrokerClientException
+ */
+ public String[] startConsumerService(int port, ConsumerNotificationHandler handler) throws MsgBrokerClientException;
+
+ /**
+ *
+ * @param msgBoxAddr
+ * @param handler
+ * @param backoff
+ * @param timeout
+ * @return
+ * @throws AxisFault
+ */
+ public MessagePuller startPullingFromExistingMsgBox(EndpointReference msgBoxAddr, NotificationHandler handler,
+ long backoff, long timeout) throws MsgBrokerClientException;
+
+ /**
+ *
+ * @param msgBoxAddr
+ * @param handler
+ * @param backoff
+ * @param timeout
+ * @return
+ * @throws RemoteException
+ */
+ public MessagePuller startPullingEventsFromMsgBox(EndpointReference msgBoxAddr, NotificationHandler handler,
+ long backoff, long timeout) throws MsgBrokerClientException;
+
+ // public EndpointReference createPullMsgBox(String msgBoxServerLoc, String userAgent) throws RemoteException ;
+
+ /**
+ *
+ * @param msgPuller
+ */
+ public void stopPullingEventsFromMsgBox(org.apache.airavata.wsmg.client.msgbox.MessagePuller msgPuller)throws MsgBrokerClientException;
+
+ /**
+ *
+ * @return
+ */
+ public String[] getConsumerServiceEndpointReference();
+
+ /**
+ *
+ * @param brokerService
+ * @param msgBoxEpr
+ * @param topic
+ * @param xpath
+ * @return
+ */
+ public String subscribeMsgBox(String brokerService, EndpointReference msgBoxEpr, String topic, String xpath)throws MsgBrokerClientException;
+
+ /**
+ *
+ * @param msgBoxLocation
+ * @param timeout
+ * @return
+ * @throws MsgBrokerClientException
+ */
+ public EndpointReference createPullMsgBox(String msgBoxLocation, long timeout) throws MsgBrokerClientException;
+
+ /**
+ *
+ * @param msgBoxServerLoc
+ * @return
+ * @throws MsgBrokerClientException
+ */
+ public EndpointReference createPullMsgBox(String msgBoxServerLoc) throws MsgBrokerClientException;
+
+
}
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/MsgBrokerClientException.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/MsgBrokerClientException.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/MsgBrokerClientException.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/MsgBrokerClientException.java Mon Aug 1 20:14:36 2011
@@ -21,7 +21,9 @@
package org.apache.airavata.wsmg.client;
-public class MsgBrokerClientException extends Exception {
+import org.apache.axis2.AxisFault;
+
+public class MsgBrokerClientException extends AxisFault {
private static final long serialVersionUID = 5609577226544941146L;
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/WseMsgBrokerClient.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/WseMsgBrokerClient.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/WseMsgBrokerClient.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/WseMsgBrokerClient.java Mon Aug 1 20:14:36 2011
@@ -37,7 +37,7 @@ import org.apache.axis2.addressing.Endpo
import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
-public class WseMsgBrokerClient implements MessageBrokerClient {
+public class WseMsgBrokerClient extends CommonMsgBrokerClient implements MessageBrokerClient {
private EndpointReference brokerEndpointRef = null;
@@ -132,31 +132,7 @@ public class WseMsgBrokerClient implemen
return subscriptionId;
}
- public String subscribeMsgBox(EndpointReference msgBoxEpr, String topicExpression, String xpathExpression,
- long expireTime) throws MsgBrokerClientException {
-
- String msgBoxEventSink = msgBoxEpr.getAddress();
-
- String formattedEventSink = null;
-
- if (msgBoxEpr.getAddress().contains("clientid")) {
- formattedEventSink = msgBoxEventSink;
- } else {
- if (msgBoxEpr.getAllReferenceParameters() == null)
- throw new MsgBrokerClientException("Invalid Message Box EPR, no reference parameters found");
- String msgBoxId = msgBoxEpr.getAllReferenceParameters()
- .get(new QName("http://org.apache.airavata/xgws/msgbox/2004/", "MsgBoxAddr")).getText();
- if (msgBoxId == null)
- throw new MsgBrokerClientException("Invalid Message Box EPR, reference parameter MsgBoxAddr is missing");
- String format = msgBoxEventSink.endsWith("/") ? "%sclientid/%s" : "%s/clientid/%s";
-
- formattedEventSink = String.format(format, msgBoxEventSink, msgBoxId);
- }
-
- return subscribe(new EndpointReference(formattedEventSink), topicExpression, xpathExpression, expireTime);
-
- }
public boolean unSubscribe(String subscriptionId) throws MsgBrokerClientException {
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/WsntMsgBrokerClient.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/WsntMsgBrokerClient.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/WsntMsgBrokerClient.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/client/WsntMsgBrokerClient.java Mon Aug 1 20:14:36 2011
@@ -37,7 +37,7 @@ import org.apache.axis2.addressing.Endpo
import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
-public class WsntMsgBrokerClient implements MessageBrokerClient {
+public class WsntMsgBrokerClient extends CommonMsgBrokerClient implements MessageBrokerClient {
protected long timeoutInMilliSeconds = WsmgCommonConstants.DEFAULT_CLIENT_SOCKET_TIME_OUT_MILLIES;
@@ -187,29 +187,4 @@ public class WsntMsgBrokerClient impleme
public void shutdownConsumerService() {
consumerServerHandler.shutdownConsumerService();
}
-
- public String subscribeMsgBox(EndpointReference msgBoxEpr, String topicExpression, String xpathExpression,
- long expireTime) throws MsgBrokerClientException {
- String msgBoxEventSink = msgBoxEpr.getAddress();
-
- String formattedEventSink = null;
-
- if (msgBoxEpr.getAddress().contains("clientid")) {
- formattedEventSink = msgBoxEventSink;
- } else {
- if (msgBoxEpr.getAllReferenceParameters() == null)
- throw new MsgBrokerClientException("Invalid Message Box EPR, no reference parameters found");
- String msgBoxId = msgBoxEpr.getAllReferenceParameters()
- .get(new QName("http://org.apache.airavata/xgws/msgbox/2004/", "MsgBoxAddr")).getText();
- if (msgBoxId == null)
- throw new MsgBrokerClientException("Invalid Message Box EPR, reference parameter MsgBoxAddr is missing");
- String format = msgBoxEventSink.endsWith("/") ? "%sclientid/%s" : "%s/clientid/%s";
-
- formattedEventSink = String.format(format, msgBoxEventSink, msgBoxId);
-
- }
-
- return subscribe(new EndpointReference(formattedEventSink), topicExpression, xpathExpression, expireTime);
- }
-
}
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java Mon Aug 1 20:14:36 2011
@@ -94,7 +94,7 @@ public class ConsumerUrlManager {
RunTimeStatistics.addNewSuccessfulDeliverTime(timeTaken);
- FailedConsumerInfo info = failedConsumerUrls.remove(consumerEndpointReference.getAddress());
+ FailedConsumerInfo info = failedConsumerUrls.remove(consumerEndpointReference.getAddress());
if (info != null) {
logger.debug(String.format("message was delivered to " + "previously %d times failed url : %s",
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/samples/wse-multiple-producers-consumers/src/wsmg/samples/wse/Consumer.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/samples/wse-multiple-producers-consumers/src/wsmg/samples/wse/Consumer.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/samples/wse-multiple-producers-consumers/src/wsmg/samples/wse/Consumer.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/samples/wse-multiple-producers-consumers/src/wsmg/samples/wse/Consumer.java Mon Aug 1 20:14:36 2011
@@ -26,7 +26,6 @@ import java.util.concurrent.BlockingQueu
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
import com.sun.tools.doclets.internal.toolkit.MethodWriter;
-import org.apache.airavata.wsmg.client.WseClientAPI;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/samples/wse-topic-subscription/src/wsmg/samples/wse/TopicSubscribe.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/samples/wse-topic-subscription/src/wsmg/samples/wse/TopicSubscribe.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/samples/wse-topic-subscription/src/wsmg/samples/wse/TopicSubscribe.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/samples/wse-topic-subscription/src/wsmg/samples/wse/TopicSubscribe.java Mon Aug 1 20:14:36 2011
@@ -26,7 +26,6 @@ import java.net.URL;
import java.util.Properties;
import java.io.*;
import org.apache.airavata.wsmg.client.MsgBrokerClientException;
-import org.apache.airavata.wsmg.client.WseClientAPI;
import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
import org.apache.airavata.wsmg.samples.util.ConfigKeys;
import java.io.FileInputStream;
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/NotificationManager.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/NotificationManager.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/NotificationManager.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/NotificationManager.java Mon Aug 1 20:14:36 2011
@@ -25,16 +25,12 @@ import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
-import org.apache.airavata.wsmg.client.WseClientAPI;
-import org.apache.airavata.wsmg.client.WsmgClientAPI;
-import org.apache.airavata.wsmg.client.WsntClientAPI;
+import org.apache.airavata.wsmg.client.*;
import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axis2.AxisFault;
public class NotificationManager implements ConsumerNotificationHandler {
- private WsmgClientAPI client = null;
+ private MessageBrokerClient client = null;
private String[] eprs = null;
private String brokerLocation = null;
private String protocol = null;
@@ -45,7 +41,7 @@ public class NotificationManager impleme
private int multipleThreadSupportIndex = 1;
public NotificationManager(String brokerLocationIn, int consumerServerPortIn, String protocolIn,
- int numMultiThreadSupportPerSub) throws AxisFault {
+ int numMultiThreadSupportPerSub) throws MsgBrokerClientException {
this.brokerLocation = brokerLocationIn;
this.consumerServerPort = consumerServerPortIn;
@@ -54,13 +50,15 @@ public class NotificationManager impleme
if (client == null) {
if (protocol.equalsIgnoreCase("wse")) {
- WseClientAPI wseClient = new WseClientAPI();
- wseClient.setTimeOutInMilliSeconds(200000000);
+ WsntMsgBrokerClient wseClient = new WsntMsgBrokerClient();
+ wseClient.init(this.brokerLocation);
+ wseClient.setTimeoutInMilliSeconds(200000000);
eprs = wseClient.startConsumerService(consumerServerPort, this);
client = wseClient;
} else {
- WsntClientAPI wsntClient = new WsntClientAPI();
- wsntClient.setTimeOutInMilliSeconds(200000000);
+ WsntMsgBrokerClient wsntClient = new WsntMsgBrokerClient();
+ wsntClient.init(this.brokerLocation);
+ wsntClient.setTimeoutInMilliSeconds(200000000);
eprs = wsntClient.startConsumerService(consumerServerPort, this);
client = wsntClient;
}
@@ -88,7 +86,7 @@ public class NotificationManager impleme
multipleThreadSupportIndex = 1;
}
- String subscriptionId = client.subscribe(brokerLocation, eprs[0] + "user" + multipleThreadSupportIndex++,
+ String subscriptionId = client.subscribe(eprs[0] + "user" + multipleThreadSupportIndex++,
topicExpression, xpathExpression);
subscriptionIds.add(subscriptionId);
Subscription subscription = new Subscription(client, subscriptionId, topicExpression, xpathExpression, this,
@@ -96,27 +94,27 @@ public class NotificationManager impleme
return subscription;
}
- public void cleanup() throws AxisFault {
+ public void cleanup() throws MsgBrokerClientException {
- WseClientAPI wseClient = null;
- WsntClientAPI wsntClient = null;
+ WseMsgBrokerClient wseClient = null;
+ WsntMsgBrokerClient wsntClient = null;
if ("wse".equalsIgnoreCase(this.protocol)) {
- wseClient = (WseClientAPI) client;
+ wseClient = (WseMsgBrokerClient) client;
} else {
- wsntClient = (WsntClientAPI) client;
+ wsntClient = (WsntMsgBrokerClient) client;
}
if (subscriptionIds != null) {
if (wseClient != null) {
while (!subscriptionIds.isEmpty()) {
String subId = subscriptionIds.remove();
- wseClient.unSubscribe(brokerLocation, subId, null);
+ wseClient.unSubscribe(subId);
}
} else {
while (!subscriptionIds.isEmpty()) {
String subId = subscriptionIds.remove();
- wsntClient.unSubscribe(brokerLocation, subId, null);
+ wsntClient.unSubscribe(subId);
}
}
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/PublisherThread.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/PublisherThread.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/PublisherThread.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/PublisherThread.java Mon Aug 1 20:14:36 2011
@@ -23,9 +23,7 @@ package performance_evaluator.rtt;
import java.util.concurrent.CountDownLatch;
-import org.apache.airavata.wsmg.client.WseClientAPI;
-import org.apache.airavata.wsmg.client.WsmgClientAPI;
-import org.apache.airavata.wsmg.client.WsntClientAPI;
+import org.apache.airavata.wsmg.client.*;
public class PublisherThread extends Thread {
private String brokerURL;
@@ -37,7 +35,7 @@ public class PublisherThread extends Thr
private String payload = "";
String msg = "";
- private WsmgClientAPI client = null;
+ private MessageBrokerClient client = null;
int trackId = 0;
int threadId = 0;
@@ -51,15 +49,17 @@ public class PublisherThread extends Thr
this.threadId = threadIdIn;
if ("wse".equalsIgnoreCase(protocolIn)) {
- WseClientAPI wseClient = new WseClientAPI();
- wseClient.setTimeOutInMilliSeconds(0);
- client = wseClient;
+ WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
+ wseMsgBrokerClient.setTimeoutInMilliSeconds(0);
+ wseMsgBrokerClient.init(brokerURL);
+ client = wseMsgBrokerClient;
} else {
- WsntClientAPI wsntClient = new WsntClientAPI();
- wsntClient.setTimeOutInMilliSeconds(0);
- client = wsntClient;
+ WsntMsgBrokerClient wsntMsgBrokerClient = new WsntMsgBrokerClient();
+ wsntMsgBrokerClient.setTimeoutInMilliSeconds(0);
+ wsntMsgBrokerClient.init(brokerURL);
+ client = wsntMsgBrokerClient;
}
}
@@ -80,7 +80,7 @@ public class PublisherThread extends Thr
+ "</perf:trackId></perf:trackInfo>"
+ "<perf:payload>" + payload + "</perf:payload></perf:performancetest>";
long publishStartTime = System.currentTimeMillis();
- client.publish(brokerURL, topic, msg);
+ client.publish(topic, msg);
totPublishTime += System.currentTimeMillis() - publishStartTime;
trackId++;
}
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/Subscription.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/Subscription.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/performance_evaluator/rtt/Subscription.java Mon Aug 1 20:14:36 2011
@@ -22,7 +22,7 @@
package performance_evaluator.rtt;
import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
-import org.apache.airavata.wsmg.client.WsmgClientAPI;
+import org.apache.airavata.wsmg.client.MessageBrokerClient;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.log4j.Logger;
@@ -33,14 +33,14 @@ public class Subscription {
private String topic;
private final static org.apache.log4j.Logger logger = Logger.getLogger(Subscription.class);
private ConsumerNotificationHandler handler;
- private WsmgClientAPI client;
+ private MessageBrokerClient client;
private EndpointReference messageBoxEPR;
private String xpath;
private String brokerURL;
private String protocol;
- public Subscription(WsmgClientAPI clientIn, String subscriptionID, String topic,
+ public Subscription(MessageBrokerClient clientIn, String subscriptionID, String topic,
ConsumerNotificationHandler callback, String brokerURL, String protocolIn) {
super();
this.subscriptionID = subscriptionID;
@@ -51,7 +51,7 @@ public class Subscription {
this.protocol = protocolIn;
}
- public Subscription(WsmgClientAPI clientIn, String subscriptionID, String topic, String xpath,
+ public Subscription(MessageBrokerClient clientIn, String subscriptionID, String topic, String xpath,
ConsumerNotificationHandler callback, String brokerURL, String protocolIn) {
super();
this.client = clientIn;
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/broker/BrokerWSETest.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/broker/BrokerWSETest.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/broker/BrokerWSETest.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/broker/BrokerWSETest.java Mon Aug 1 20:14:36 2011
@@ -29,7 +29,7 @@ import java.util.concurrent.LinkedBlocki
import junit.framework.TestCase;
import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
-import org.apache.airavata.wsmg.client.WseClientAPI;
+import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
import org.junit.Test;
@@ -63,32 +63,32 @@ public class BrokerWSETest extends TestC
long value = System.currentTimeMillis();
String msg = String.format("<msg> current time is : %d </msg>", value);
- WseClientAPI clientApi = new WseClientAPI(null, 1000000);
-
+ WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
+ wseMsgBrokerClient.init(brokerEPR);
int consumerPort = 6767;
- String[] consumerEPRs = clientApi.startConsumerService(consumerPort, this);
+ String[] consumerEPRs = wseMsgBrokerClient.startConsumerService(consumerPort, this);
assertTrue(consumerEPRs.length > 0);
String topic = "WseRoundTripTestTopic";
- String subscriptionID = clientApi.subscribe(brokerEPR, consumerEPRs[0], topic);
- clientApi.subscribe(brokerEPR, consumerEPRs[0], topic, "/foo/bar");
+ String subscriptionID = wseMsgBrokerClient.subscribe(brokerEPR, consumerEPRs[0], topic);
+ wseMsgBrokerClient.subscribe(consumerEPRs[0], topic, "/foo/bar");
- clientApi.publish(brokerEPR, topic, msg);
+ wseMsgBrokerClient.publish(topic, msg);
- clientApi.publish(brokerEPR, topic, "<foo><bar>Test</bar></foo>");
+ wseMsgBrokerClient.publish(topic, "<foo><bar>Test</bar></foo>");
Thread.sleep(2000);
try {
- clientApi.unSubscribe(brokerEPR, subscriptionID, null);
+ wseMsgBrokerClient.unSubscribe(subscriptionID);
} catch (AxisFault e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
- clientApi.shutdownConsumerService();
+ wseMsgBrokerClient.shutdownConsumerService();
} catch (AxisFault e) {
e.printStackTrace();
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/broker/BrokerWSNTTest.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/broker/BrokerWSNTTest.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/broker/BrokerWSNTTest.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/broker/BrokerWSNTTest.java Mon Aug 1 20:14:36 2011
@@ -29,7 +29,7 @@ import java.util.concurrent.LinkedBlocki
import junit.framework.TestCase;
import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
-import org.apache.airavata.wsmg.client.WsntClientAPI;
+import org.apache.airavata.wsmg.client.WsntMsgBrokerClient;
import org.apache.airavata.wsmg.util.test.TestUtilServer;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
@@ -62,40 +62,42 @@ public class BrokerWSNTTest extends Test
long value = System.currentTimeMillis();
String msg = String.format("<msg> current time is : %d </msg>", value);
- WsntClientAPI clientApi = new WsntClientAPI(1000000);
+ WsntMsgBrokerClient wsntMsgBrokerClient = new WsntMsgBrokerClient();
int consumerPort = 6767;
String brokerEPR = "http://127.0.0.1:5555/axis2/services/NotificationService";
- String[] consumerEPRs = clientApi.startConsumerService(consumerPort, this);
+ wsntMsgBrokerClient.init(brokerEPR);
+ String[] consumerEPRs = wsntMsgBrokerClient.startConsumerService(consumerPort, this);
assertTrue(consumerEPRs.length > 0);
String topic = "WsntRoundTripTestTopic";
- String topicSubscriptionID = clientApi.subscribe(brokerEPR, consumerEPRs[0], topic);
+ String topicSubscriptionID = wsntMsgBrokerClient.subscribe(brokerEPR, consumerEPRs[0], topic);
System.out.println("topic subscription id: " + topicSubscriptionID);
- String xpathSubscriptionID = clientApi.subscribe(brokerEPR, consumerEPRs[0], topic, "/foo/bar");
+ String xpathSubscriptionID = wsntMsgBrokerClient.subscribe(consumerEPRs[0], topic, "/foo/bar");
System.out.println("xpath subscription id: " + xpathSubscriptionID);
- clientApi.publish(brokerEPR, topic, msg);
- clientApi.publish(brokerEPR, topic, "<foo><bar>eligible to</bar></foo>");
+ wsntMsgBrokerClient.publish(topic, msg);
+
+ wsntMsgBrokerClient.publish(topic, "<foo><bar>eligible to</bar></foo>");
Thread.sleep(2000);
try {
- clientApi.unSubscribe(brokerEPR, topicSubscriptionID, null);
- clientApi.unSubscribe(brokerEPR, xpathSubscriptionID, null);
+ wsntMsgBrokerClient.unSubscribe(topicSubscriptionID);
+ wsntMsgBrokerClient.unSubscribe(xpathSubscriptionID);
} catch (AxisFault e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
- clientApi.shutdownConsumerService();
+ wsntMsgBrokerClient.shutdownConsumerService();
} catch (AxisFault e) {
e.printStackTrace();
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestAddtionalWseXpathAndTopicScenarios.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestAddtionalWseXpathAndTopicScenarios.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestAddtionalWseXpathAndTopicScenarios.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestAddtionalWseXpathAndTopicScenarios.java Mon Aug 1 20:14:36 2011
@@ -29,7 +29,7 @@ import java.util.concurrent.LinkedBlocki
import junit.framework.TestCase;
import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
-import org.apache.airavata.wsmg.client.WseClientAPI;
+import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
@@ -101,7 +101,8 @@ public class TestAddtionalWseXpathAndTop
EndpointReference brokerEpr = new EndpointReference(
configs.getProperty(ConfigKeys.BROKER_EVENTING_SERVICE_EPR));
- WseClientAPI topicOnlyReceiverApi = new WseClientAPI(brokerEpr);
+ WseMsgBrokerClient topicOnlyReceiverApi = new WseMsgBrokerClient();
+ topicOnlyReceiverApi.init(brokerEpr.getAddress());
NotificationReciever topicOnlyMsgReceiver = new NotificationReciever("Topic Only");
String[] topicConsumerEPRs = topicOnlyReceiverApi.startConsumerService(consumerPort, topicOnlyMsgReceiver);
@@ -110,19 +111,21 @@ public class TestAddtionalWseXpathAndTop
String topicOnlySubId = topicOnlyReceiverApi.subscribe(brokerEpr.getAddress(), topicConsumerEPRs[0], topic);
- WseClientAPI xpathAndTopicReceiverApi = new WseClientAPI(brokerEpr);
+ WseMsgBrokerClient xpathAndTopicReceiverApi = new WseMsgBrokerClient();
+ xpathAndTopicReceiverApi.init(brokerEpr.getAddress());
NotificationReciever topicAndXpathMsgReceiver = new NotificationReciever("Topic And Xpath");
String[] topicAndXpathConsumerEPRs = xpathAndTopicReceiverApi.startConsumerService(consumerPort + 1,
topicAndXpathMsgReceiver);
assertTrue("invalid consumer eprs returned", topicAndXpathConsumerEPRs.length > 0);
- String topicAndXpathSubId = xpathAndTopicReceiverApi.subscribe(brokerEpr.getAddress(),
+ String topicAndXpathSubId = xpathAndTopicReceiverApi.subscribe(
topicAndXpathConsumerEPRs[0], topic, xpathExpression);
- WseClientAPI senderApi = new WseClientAPI(brokerEpr);
- senderApi.publish(brokerEpr.getAddress(), topic, matchingMsg);
- senderApi.publish(brokerEpr.getAddress(), topic, unmatchingMsg);
+ WseMsgBrokerClient senderApi = new WseMsgBrokerClient();
+ senderApi.init(brokerEpr.getAddress());
+ senderApi.publish(topic, matchingMsg);
+ senderApi.publish(topic, unmatchingMsg);
try {
@@ -139,10 +142,10 @@ public class TestAddtionalWseXpathAndTop
fail("interrupted while waiting for message");
}
- topicOnlyReceiverApi.unSubscribe(brokerEpr.getAddress(), topicOnlySubId, null);
+ topicOnlyReceiverApi.unSubscribe(topicOnlySubId);
topicOnlyReceiverApi.shutdownConsumerService();
- xpathAndTopicReceiverApi.unSubscribe(brokerEpr.getAddress(), topicAndXpathSubId, null);
+ xpathAndTopicReceiverApi.unSubscribe(topicAndXpathSubId);
xpathAndTopicReceiverApi.shutdownConsumerService();
} catch (AxisFault e) {
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestWseXpathAndTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestWseXpathAndTopicSubscription.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestWseXpathAndTopicSubscription.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestWseXpathAndTopicSubscription.java Mon Aug 1 20:14:36 2011
@@ -31,7 +31,7 @@ import javax.xml.stream.XMLStreamExcepti
import junit.framework.TestCase;
import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
-import org.apache.airavata.wsmg.client.WseClientAPI;
+import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
@@ -90,18 +90,19 @@ public class TestWseXpathAndTopicSubscri
String brokerEPR = configs.getProperty(ConfigKeys.BROKER_EVENTING_SERVICE_EPR);
- WseClientAPI clientApi = new WseClientAPI(new EndpointReference(brokerEPR));
+ WseMsgBrokerClient msgBrokerClient = new WseMsgBrokerClient();
+ msgBrokerClient.init(brokerEPR);
- String[] consumerEPRs = clientApi.startConsumerService(consumerPort, this);
+ String[] consumerEPRs = msgBrokerClient.startConsumerService(consumerPort, this);
assertTrue(consumerEPRs.length > 0);
String xpathExpression = "/c/b/a";
- String subscriptionID = clientApi.subscribe(brokerEPR, consumerEPRs[0], null, xpathExpression);
+ String subscriptionID = msgBrokerClient.subscribe(consumerEPRs[0], null, xpathExpression);
- clientApi.publish(validMsg);
- clientApi.publish(invalidMsg);
+ msgBrokerClient.publish(null,validMsg);
+ msgBrokerClient.publish(null,invalidMsg);
try {
SOAPEnvelope env = getMsgQueue().take();
@@ -128,8 +129,8 @@ public class TestWseXpathAndTopicSubscri
fail("invalid xml recieved: " + e.getMessage());
}
- clientApi.unSubscribe(brokerEPR, subscriptionID, null);
- clientApi.shutdownConsumerService();
+ msgBrokerClient.unSubscribe(subscriptionID);
+ msgBrokerClient.shutdownConsumerService();
} catch (AxisFault e) {
e.printStackTrace();
Modified: incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestWseXpathSubscription.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestWseXpathSubscription.java?rev=1152917&r1=1152916&r2=1152917&view=diff
==============================================================================
--- incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestWseXpathSubscription.java (original)
+++ incubator/airavata/trunk/ws-messaging/messagebroker/src/test/java/wsmg/matching/XPath/TestWseXpathSubscription.java Mon Aug 1 20:14:36 2011
@@ -31,7 +31,7 @@ import javax.xml.stream.XMLStreamExcepti
import junit.framework.TestCase;
import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
-import org.apache.airavata.wsmg.client.WseClientAPI;
+import org.apache.airavata.wsmg.client.*;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
@@ -90,18 +90,19 @@ public class TestWseXpathSubscription ex
String brokerEPR = configs.getProperty(ConfigKeys.BROKER_EVENTING_SERVICE_EPR);
- WseClientAPI clientApi = new WseClientAPI(new EndpointReference(brokerEPR));
+ WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
+ wseMsgBrokerClient.init(brokerEPR);
- String[] consumerEPRs = clientApi.startConsumerService(consumerPort, this);
+ String[] consumerEPRs = wseMsgBrokerClient.startConsumerService(consumerPort, this);
assertTrue(consumerEPRs.length > 0);
String xpathExpression = "/c/b/a";
- String subscriptionID = clientApi.subscribe(brokerEPR, consumerEPRs[0], null, xpathExpression);
+ String subscriptionID = wseMsgBrokerClient.subscribe(consumerEPRs[0], null, xpathExpression);
- clientApi.publish(validMsg);
- clientApi.publish(invalidMsg);
+ wseMsgBrokerClient.publish(null,validMsg);
+ wseMsgBrokerClient.publish(null,invalidMsg);
try {
SOAPEnvelope env = getMsgQueue().take();
@@ -128,8 +129,8 @@ public class TestWseXpathSubscription ex
fail("invalid xml recieved: " + e.getMessage());
}
- clientApi.unSubscribe(brokerEPR, subscriptionID, null);
- clientApi.shutdownConsumerService();
+ wseMsgBrokerClient.unSubscribe(subscriptionID);
+ wseMsgBrokerClient.shutdownConsumerService();
} catch (AxisFault e) {
e.printStackTrace();
@@ -154,19 +155,20 @@ public class TestWseXpathSubscription ex
String brokerEPR = configs.getProperty(ConfigKeys.BROKER_EVENTING_SERVICE_EPR);
- WseClientAPI clientApi = new WseClientAPI(new EndpointReference(brokerEPR));
+ WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
+ wseMsgBrokerClient.init(brokerEPR);
- String[] consumerEPRs = clientApi.startConsumerService(consumerPort, this);
+ String[] consumerEPRs = wseMsgBrokerClient.startConsumerService(consumerPort, this);
assertTrue(consumerEPRs.length > 0);
String xpathExpression = "/c/b/a";
String topicExpression = "XpathAndTopicTestWse";
- String subscriptionID = clientApi.subscribe(brokerEPR, consumerEPRs[0], topicExpression, xpathExpression);
+ String subscriptionID = wseMsgBrokerClient.subscribe(consumerEPRs[0], topicExpression, xpathExpression);
- clientApi.publish(brokerEPR, topicExpression, validMsg);
- clientApi.publish(brokerEPR, topicExpression, invalidMsg);
+ wseMsgBrokerClient.publish(topicExpression, validMsg);
+ wseMsgBrokerClient.publish(topicExpression, invalidMsg);
try {
SOAPEnvelope env = getMsgQueue().take();
@@ -193,8 +195,8 @@ public class TestWseXpathSubscription ex
fail("invalid xml recieved: " + e.getMessage());
}
- clientApi.unSubscribe(brokerEPR, subscriptionID, null);
- clientApi.shutdownConsumerService();
+ wseMsgBrokerClient.unSubscribe(subscriptionID);
+ wseMsgBrokerClient.shutdownConsumerService();
} catch (AxisFault e) {
e.printStackTrace();