You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ja...@apache.org on 2004/11/03 11:59:31 UTC
cvs commit: ws-fx/sandesha/src/org/apache/sandesha/client RMSender.java
jaliya 2004/11/03 02:59:31
Modified: sandesha/src/org/apache/sandesha/client RMSender.java
Log:
The source address and the sync/async properties are now taken from the client.
Revision Changes Path
1.9 +140 -100 ws-fx/sandesha/src/org/apache/sandesha/client/RMSender.java
Index: RMSender.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/RMSender.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- RMSender.java 20 Oct 2004 10:23:28 -0000 1.8
+++ RMSender.java 3 Nov 2004 10:59:31 -0000 1.9
@@ -24,6 +24,7 @@
import org.apache.axis.Message;
import org.apache.axis.MessageContext;
import org.apache.axis.SimpleChain;
+import org.apache.axis.client.Call;
import org.apache.axis.components.uuid.UUIDGen;
import org.apache.axis.components.uuid.UUIDGenFactory;
import org.apache.axis.configuration.SimpleProvider;
@@ -46,6 +47,7 @@
import org.apache.sandesha.IStorageManager;
import org.apache.sandesha.RMMessageContext;
import org.apache.sandesha.server.Sender;
+import org.apache.sandesha.server.ServerStorageManager;
import org.apache.sandesha.server.queue.ServerQueue;
import org.apache.sandesha.ws.rm.MessageNumber;
import org.apache.sandesha.ws.rm.RMHeaders;
@@ -54,6 +56,8 @@
import org.apache.sandesha.ws.rm.providers.RMProvider;
import org.apache.sandesha.ws.utility.Identifier;
+import com.sun.jndi.url.rmi.rmiURLContext;
+
public class RMSender extends BasicHandler {
/**
@@ -72,6 +76,12 @@
private Sender sender = null;
public void invoke(MessageContext msgContext) throws AxisFault {
+
+ //Get the URL to send the message.
+
+ storageManager = new ClientStorageManager();
+ initializeRMSender(msgContext, storageManager);
+
//Check whether we have messages or not in the queue.
//If yes, just add
//If no, need to add a priority message.
@@ -83,102 +93,19 @@
//Insert the messae
//Return null ; Later return for callback.
- storageManager = new ClientStorageManager();
-
- if (!senderStarted) {
- //Pass the storageManager to the Sender.
- sender = new Sender(storageManager);
- Thread senderThread = new Thread(sender);
- //senderThread.setDaemon(true);
- senderThread.start();
- }
-
- if (!serverStarted) {
- sas = new SimpleAxisServer();
- serverStarted = true;
- try {
- SimpleProvider sp = new SimpleProvider();
- sas.setMyConfig(sp);
- //SOAPService myService = new SOAPService(new RPCProvider());
-
- Handler addrHanlder = new AddressingHandler();
- Handler rmHandler = new RMServerRequestHandler();
-
- SimpleChain shc = new SimpleChain();
- shc.addHandler(addrHanlder);
- shc.addHandler(rmHandler);
-
- //Set the provider to the CRMProvider so that it will use the
- //ClientStorageManger..
- //Need to revise this use of CRMProvider.
-
- RMProvider rmProvider = new RMProvider();
- rmProvider.setClient(true);
-
- SOAPService myService = new SOAPService(shc, rmProvider, null);
- // customize the webservice
- JavaServiceDesc desc = new JavaServiceDesc();
- myService.setOption("className",
- "samples.userguide.example3.MyService");
- myService.setOption("allowedMethods", "*");
-
- //Add Handlers ; Addressing and ws-rm before the service.
-
- desc.setName("MyService");
- myService.setServiceDescription(desc);
-
- // deploy the service to server
- sp.deployService("MyService", myService);
- // finally start the server
- sas.setServerSocket(new ServerSocket(8090));
-
- Thread serverThread = new Thread(sas);
- //serverThread.setDaemon(true);
- serverThread.start();
- } catch (IOException ex) {
- ex.printStackTrace();
- }
-
- }
try {
//This should be changed so the inital sequence is sent by the
// client.
//This way we can verify that we are shifting from one set of
// messages to the other.
//Also helps when we introduce the callback mechanism.
-
//we need the following sequence of methods to support this
- /*
- *
- * if(!storageManager.isSequenceExist(sequenceID)){
- * storageManager.addSequence(sequenceID);
- * nextMsgNumber=storageManager.getNextMessageNumber(sequenceID);
- * }else{
- * nextMsgNumber=storageManager.getNextMessageNumber(sequenceID); }
- */
-
- //Addressing information currently hard-coded.
- //---------------------------------------------------------------
- AddressingHeaders addrHeaders = new AddressingHeaders();
- From from = new From(new Address(
- "http://localhost:8090/axis/services/MyService"));
- addrHeaders.setFrom(from);
-
- To to = new To(
- new Address(
- "http://127.0.0.1:9090/axis/services/EchoStringService?wsdl"));
- addrHeaders.setTo(to);
- ReplyTo replyTo = new ReplyTo(new Address(
- "http://localhost:8090/axis/services/MyService"));
- addrHeaders.setReplyTo(replyTo);
- //---------------------------------------------------------------
+ AddressingHeaders addrHeaders = getAddressingHeaders(msgContext);
- //At this moment we don't know a sequence
long nextMsgNumber = storageManager
.getNextMessageNumber(Constants.CLIENT_DEFAULD_SEQUENCE_ID);
- //UUIDGenerator to generate the messageID
UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
if (nextMsgNumber == 1) {
@@ -188,15 +115,16 @@
//System.out.println("First Message");
//Set the tempUUID
- String tempUUID = "ABCDEFGH";
+ String tempUUID = uuidGen.nextUUID();
RMMessageContext createSeqRMMsgContext = getCreateSeqRMContext(
msgContext, addrHeaders, tempUUID);
- createSeqRMMsgContext.setMessageID("uuid:ABCDEFGH");
+ createSeqRMMsgContext.setMessageID("uuid:" + tempUUID);
//Create a sequence first.
storageManager
.addSequence(Constants.CLIENT_DEFAULD_SEQUENCE_ID);
storageManager.setTemporaryOutSequence(
- Constants.CLIENT_DEFAULD_SEQUENCE_ID, "uuid:ABCDEFGH");
+ Constants.CLIENT_DEFAULD_SEQUENCE_ID, "uuid:"
+ + tempUUID);
storageManager.addCreateSequenceRequest(createSeqRMMsgContext);
//RMMessageContext reqRMMsgContext =
@@ -228,16 +156,29 @@
.setMessageType(Constants.MSG_TYPE_SERVICE_REQUEST);
reqRMMsgContext.setMessageID("uuid:" + uuidGen.nextUUID());
storageManager.insertOutgoingMessage(reqRMMsgContext);
-
+ //System.out.println("This is NOT the first
+ // message..........................");
}
+ /*
+ * RMMessageContext rmMessageContext= new RMMessageContext();
+ *
+ * rmMessageContext.setMsgContext(msgContext);
+ * rmMessageContext.setSequenceID("abc");
+ * storageManager.insertRequestMessage(rmMessageContext);
+ *
+ * storageManager.setTemporaryOutSequence("abc","def");
+ * storageManager.setApprovedOutSequence("def","pqr");
+ *
+ */
+
} catch (Exception ex) {
ex.printStackTrace();
}
- ServerQueue sq = ServerQueue.getInstance();
- sq.displayIncomingMap();
- sq.displayOutgoingMap();
- sq.displayPriorityQueue();
+ //ServerQueue sq= ServerQueue.getInstance();
+ // sq.displayIncomingMap();
+ // sq.displayOutgoingMap();
+ // sq.displayPriorityQueue();
// RMSender will hang at this point.
//while(storageManager.getResponseMessage(MessageID)!=null){
@@ -256,6 +197,11 @@
*/
private RMMessageContext getReqRMContext(MessageContext msgContext,
AddressingHeaders addrHeaders, String uuid, long msgNo) {
+
+ //Get the URL to send the message.
+ String toAddress = (String) msgContext
+ .getProperty(MessageContext.TRANS_URL);
+
// Create the RMMessageContext to hold the Request message.
RMMessageContext reqRMMsgContext = new RMMessageContext();
MessageContext messageContext = new MessageContext(msgContext
@@ -286,10 +232,7 @@
//Set the addrssing headers to RMMessageContext.
reqRMMsgContext.setAddressingHeaders(addrHeaders);
- //These should be taken from the prpoerties.
- //TODO
- reqRMMsgContext
- .setOutGoingAddress("http://127.0.0.1:9090/axis/services/EchoStringService?wsdl");
+ reqRMMsgContext.setOutGoingAddress(toAddress);
//SOAPEnvelope resEnvelope = EnvelopeCreator
// .createServiceRequestEnvelope(uuid, reqRMMsgContext,
// Constants.CLIENT);
@@ -302,6 +245,9 @@
private RMMessageContext getCreateSeqRMContext(MessageContext msgContext,
AddressingHeaders addrHeaders, String uuid)
throws MalformedURIException {
+
+ String toAddress = (String) msgContext
+ .getProperty(MessageContext.TRANS_URL);
//Set the action
Action action = new Action(new URI(Constants.ACTION_CREATE_SEQUENCE));
addrHeaders.setAction(action);
@@ -311,10 +257,7 @@
createSeqRMMsgContext.setAddressingHeaders(addrHeaders);
//Set the outgoing address these need to be corrected.
- //TODO
- //These should be taken from the properties.
- createSeqRMMsgContext
- .setOutGoingAddress("http://127.0.0.1:9090/axis/services/EchoStringService?wsdl");
+ createSeqRMMsgContext.setOutGoingAddress(toAddress);
SOAPEnvelope resEnvelope = EnvelopeCreator
.createCreateSequenceEnvelope(uuid, createSeqRMMsgContext,
@@ -331,6 +274,103 @@
createSeqRMMsgContext
.setMessageType(Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST);
return createSeqRMMsgContext;
+ }
+
+ private void initializeRMSender(MessageContext msgContext,
+ IStorageManager storageManager) {
+
+ if (!senderStarted) {
+ //Pass the storageManager to the Sender.
+ sender = new Sender(storageManager);
+ Thread senderThread = new Thread(sender);
+ //senderThread.setDaemon(true);
+ senderThread.start();
+ }
+
+ if (!serverStarted) {
+ sas = new SimpleAxisServer();
+ serverStarted = true;
+ try {
+ SimpleProvider sp = new SimpleProvider();
+ sas.setMyConfig(sp);
+ //SOAPService myService = new SOAPService(new RPCProvider());
+
+ Handler addrHanlder = new AddressingHandler();
+ Handler rmHandler = new RMServerRequestHandler();
+
+ SimpleChain shc = new SimpleChain();
+ shc.addHandler(addrHanlder);
+ shc.addHandler(rmHandler);
+
+ //Need to use the RMProvider at the client side to handle the
+ //Asynchronous responses.
+ RMProvider rmProvider = new RMProvider();
+ //This is the switch used to inform the RMProvider about the
+ // side that it operates.
+ rmProvider.setClient(true);
+
+ SOAPService myService = new SOAPService(shc, rmProvider, null);
+
+ JavaServiceDesc desc = new JavaServiceDesc();
+ myService.setOption("className",
+ "samples.userguide.example3.MyService");
+ myService.setOption("allowedMethods", "*");
+
+ //Add Handlers ; Addressing and ws-rm before the service.
+ desc.setName("MyService");
+ myService.setServiceDescription(desc);
+
+ //deploy the service to server
+ sp.deployService("MyService", myService);
+ //finally start the server
+ //Start the simple axis server in port 8090
+ sas.setServerSocket(new ServerSocket(8090));
+
+ Thread serverThread = new Thread(sas);
+ //serverThread.setDaemon(true);
+ serverThread.start();
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ }
+
+ }
+
+ }
+
+ private AddressingHeaders getAddressingHeaders(MessageContext msgContext)
+ throws MalformedURIException {
+ String toAddress = (String) msgContext
+ .getProperty(MessageContext.TRANS_URL);//"http://127.0.0.1:9070/axis/services/EchoStringService?wsdl";
+ Call call = (Call) msgContext.getProperty(MessageContext.CALL);
+ //Variable to hold the status of the asynchronous or synchronous state.
+ boolean isAsync = false;
+ if ((String) call.getProperty("isAsync") == "true")
+ isAsync = true;
+
+ //Get the host address of the source machine.
+ String sourceHost = (String) call.getProperty("sourceAddress");
+
+ AddressingHeaders addrHeaders = new AddressingHeaders();
+
+ From from = null;
+
+ //Need to use the anonymous_URI if the client is synchronous.
+ if (isAsync == true) {
+ from = new From(new Address("http://" + sourceHost
+ + ":8080/axis/services/MyService"));
+ addrHeaders.setFrom(from);
+ ReplyTo replyTo = new ReplyTo(new Address("http://" + sourceHost
+ + ":8080/axis/services/MyService"));
+ addrHeaders.setReplyTo(replyTo);
+ } else {
+ from = new From(new Address(Constants.ANONYMOUS_URI));
+ }
+
+ //Set the target endpoint URL
+ To to = new To(new Address(toAddress));
+ addrHeaders.setTo(to);
+
+ return addrHeaders;
}
}