You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by as...@apache.org on 2004/05/08 10:34:36 UTC
cvs commit: ws-fx/sandesha/src/org/apache/sandesha RMSequence.java
aslom 2004/05/08 01:34:36
Modified: sandesha/src/org/apache/sandesha RMSequence.java
Log:
refactored behemoth uber processMessage method into smaller pieces (needs more refactoring ...)
Revision Changes Path
1.11 +521 -486 ws-fx/sandesha/src/org/apache/sandesha/RMSequence.java
Index: RMSequence.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/RMSequence.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- RMSequence.java 7 May 2004 17:39:38 -0000 1.10
+++ RMSequence.java 8 May 2004 08:34:36 -0000 1.11
@@ -60,9 +60,12 @@
import java.util.List;
import java.util.Map;
import java.util.Vector;
+import javax.xml.soap.SOAPException;
+import java.lang.reflect.Method;
+
/**
* class RMSequence
- *
+ *
* @author
* Amila Navarathna<br>
* Jaliya Ekanayaka<br>
@@ -74,7 +77,7 @@
public class RMSequence {
/**
* Field log
- */
+ */
protected static Log log = LogFactory.getLog(RMSequence.class.getName());
/**
* Field messageList
@@ -108,11 +111,11 @@
* Field lastProcessedMessageNumber
*/
private long lastProcessedMessageNumber;
-
+
/**
* Constructor RMSequence
- *
- * @param identifier
+ *
+ * @param identifier
*/
public RMSequence(Identifier identifier) {
this.sequenceIdentifier = identifier;
@@ -120,7 +123,7 @@
nextMessageNo = 1;
maxMessageNumber = 0;
}
-
+
/**
* Constructor RMSequence
*/
@@ -128,288 +131,320 @@
messageList = new HashMap();
lastProcessedMessageNumber = 0;
}
-
+
/**
* Method getSequenceIdentifier
- *
+ *
* @return Identifier
*/
-
+
public Identifier getSequenceIdentifier() {
return sequenceIdentifier;
}
/**
- * Method processMessage
+ * ???
*/
-
private void processMessage() {
RMMessage rmMessge;
-
- if (messageList.get(new Long(lastProcessedMessageNumber + 1))
- != null) {
-
+
+ if (messageList.get(new Long(lastProcessedMessageNumber + 1)) != null)
+ {
rmMessge =
(RMMessage) messageList.get(
- new Long(lastProcessedMessageNumber + 1));
- ServiceDesc serviceDesc = rmMessge.getServiceDesc();
- OperationDesc operation = rmMessge.getOperation();
- try {
- SOAPEnvelope reqEnv =
- rmMessge.getRequestMessage().getSOAPEnvelope();
- Vector bodies = reqEnv.getBodyElements();
- RPCElement body = null;
- // Find the first "root" body element, which is the RPC call.
- for (int bNum = 0;
- body == null && bNum < bodies.size();
- bNum++) {
- // If this is a regular old SOAPBodyElement, and it's a root,
- // we're probably a non-wrapped doc/lit service. In this case,
- // we deserialize the element, and create an RPCElement "wrapper"
- // around it which points to the correct method.
- // FIXME : There should be a cleaner way to do this...
- if (!(bodies.get(bNum) instanceof RPCElement)) {
- SOAPBodyElement bodyEl =
- (SOAPBodyElement) bodies.get(bNum);
- // igors: better check if bodyEl.getID() != null
- // to make sure this loop does not step on SOAP-ENC objects
- // that follow the parameters! FIXME?
- if (bodyEl.isRoot()
- && operation != null
- && bodyEl.getID() == null) {
- ParameterDesc param = operation.getParameter(bNum);
- // at least do not step on non-existent parameters!
- if (param != null) {
- Object val =
- bodyEl.getValueAsType(param.getTypeQName());
- body =
- new RPCElement(
- "",
- operation.getName(),
- new Object[] { val });
- }
- }
- } else {
- body = (RPCElement) bodies.get(bNum);
- }
+ new Long(lastProcessedMessageNumber + 1));
+
+ processMessageInSeq(rmMessge);
+ }
+ }
+
+ private void processMessageInSeq(RMMessage rmMessge) {
+ ServiceDesc serviceDesc = rmMessge.getServiceDesc();
+ OperationDesc operation = rmMessge.getOperation();
+ try {
+ SOAPEnvelope reqEnv =
+ rmMessge.getRequestMessage().getSOAPEnvelope();
+ Vector bodies = reqEnv.getBodyElements();
+ RPCElement body = null;
+ // Find the first "root" body element, which is the RPC call.
+ for (int bNum = 0; bNum < bodies.size(); bNum++)
+ {
+ body = extractBody(bodies, bNum, operation, body);
+ if(body != null) {
+ break;
}
- String methodName = body.getMethodName();
- Vector args = null;
- try {
- args = body.getParams();
- } catch (SAXException e) {
- if (e.getException() != null)
- throw e.getException();
- throw e;
+ }
+ if(body == null) {
+ throw new AxisFault("could not find body in request");
+ }
+ String methodName = body.getMethodName();
+ Vector args = null;
+ try {
+ args = body.getParams();
+ } catch (SAXException e) {
+ if (e.getException() != null) {
+ throw e.getException();
}
- int numArgs = args.size();
- // Create the array we'll use to hold the actual parameter
- // values. We know how big to make it from the metadata.
- Object[] argValues = new Object[operation.getNumParams()];
-
- // A place to keep track of the out params (INOUTs and OUTs)
- ArrayList outs = new ArrayList();
-
- // Put the values contained in the RPCParams into an array
- // suitable for passing to java.lang.reflect.Method.invoke()
- // Make sure we respect parameter ordering if we know about it
- // from metadata, and handle whatever conversions are necessary
- // (values -> Holders, etc)
- for (int i = 0; i < numArgs; i++) {
- RPCParam rpcParam = (RPCParam) args.get(i);
- Object value = rpcParam.getValue();
-
- // first check the type on the paramter
- ParameterDesc paramDesc = rpcParam.getParamDesc();
-
- // if we found some type info try to make sure the value type is
- // correct. For instance, if we deserialized a xsd:dateTime in
- // to a Calendar and the service takes a Date, we need to convert
- if (paramDesc != null && paramDesc.getJavaType() != null) {
-
- // Get the type in the signature (java type or its holder)
- Class sigType = paramDesc.getJavaType();
-
- // Convert the value into the expected type in the signature
- value = JavaUtils.convert(value, sigType);
-
- rpcParam.setValue(value);
- if (paramDesc.getMode() == ParameterDesc.INOUT) {
- outs.add(rpcParam);
- }
- }
-
- // Put the value (possibly converted) in the argument array
- // make sure to use the parameter order if we have it
- if (paramDesc == null || paramDesc.getOrder() == -1) {
- argValues[i] = value;
- } else {
- argValues[paramDesc.getOrder()] = value;
- }
-
+ throw e;
+ }
+ int numArgs = args.size();
+ // Create the array we'll use to hold the actual parameter
+ // values. We know how big to make it from the metadata.
+ Object[] argValues = new Object[operation.getNumParams()];
+
+ // A place to keep track of the out params (INOUTs and OUTs)
+ ArrayList outs = new ArrayList();
+
+ // Put the values contained in the RPCParams into an array
+ // suitable for passing to java.lang.reflect.Method.invoke()
+ // Make sure we respect parameter ordering if we know about it
+ // from metadata, and handle whatever conversions are necessary
+ // (values -> Holders, etc)
+ prepareArgumentsForInvocation(numArgs, args, outs, argValues);
+ // See if any subclasses want a crack at faulting on a bad operation
+ // FIXME : Does this make sense here???
+ // String allowedMethods = (String) service.getOption("allowedMethods");
+ // checkMethodName(msgContext, allowedMethods, operation.getName());
+
+ // Now create any out holders we need to pass in
+ prepareOutHolders(numArgs, argValues, operation, outs);
+
+ //OK! Now we can invoke the method
+ Object responseObject = null;
+ Method methodToInvoke = operation.getMethod();
+ responseObject = methodToInvoke.invoke(rmMessge.getServiceObject(), argValues);
+
+ if (responseObject != null) {
+ //now we have response
+ sendResponse(methodName, responseObject, rmMessge);
+
+ } else {
+
+ //now we don't have response so we are sending only ack
+ sendOnlyAck(rmMessge);
+
+ }
+
+ } catch (AxisFault e) {
+
+ log.equals(e); //FIXME
+ } catch (Exception e) {
+ log.error(e); //FIXME
+
+ }
+ //can increase the value it have to be after invoke the methode
+ lastProcessedMessageNumber++;
+ //now u may process the next message
+ //then check again
+ processMessage(); // NOTE: recursive call
+ }
+
+ private RPCElement extractBody(Vector bodies, int bNum, OperationDesc operation, RPCElement body) throws Exception {
+ // If this is a regular old SOAPBodyElement, and it's a root,
+ // we're probably a non-wrapped doc/lit service. In this case,
+ // we deserialize the element, and create an RPCElement "wrapper"
+ // around it which points to the correct method.
+ // FIXME : There should be a cleaner way to do this...
+ if (!(bodies.get(bNum) instanceof RPCElement)) {
+ SOAPBodyElement bodyEl =
+ (SOAPBodyElement) bodies.get(bNum);
+ // igors: better check if bodyEl.getID() != null
+ // to make sure this loop does not step on SOAP-ENC objects
+ // that follow the parameters! FIXME?
+ if (bodyEl.isRoot()
+ && operation != null
+ && bodyEl.getID() == null) {
+ ParameterDesc param = operation.getParameter(bNum);
+ // at least do not step on non-existent parameters!
+ if (param != null) {
+ Object val =
+ bodyEl.getValueAsType(param.getTypeQName());
+ body =
+ new RPCElement(
+ "",
+ operation.getName(),
+ new Object[] { val });
}
- // See if any subclasses want a crack at faulting on a bad operation
- // FIXME : Does this make sense here???
- // String allowedMethods = (String) service.getOption("allowedMethods");
- // checkMethodName(msgContext, allowedMethods, operation.getName());
-
- // Now create any out holders we need to pass in
- int count = numArgs;
- for (int i = 0; i < argValues.length; i++) {
-
- // We are interested only in OUT/INOUT
- ParameterDesc param = operation.getParameter(i);
- if (param.getMode() == ParameterDesc.IN)
- continue;
+ }
+ } else {
+ body = (RPCElement) bodies.get(bNum);
+ }
+ return body;
+ }
- Class holderClass = param.getJavaType();
- if (holderClass != null
- && Holder.class.isAssignableFrom(holderClass)) {
- int index = count;
- // Use the parameter order if specified or just stick them to the end.
- if (param.getOrder() != -1) {
- index = param.getOrder();
- } else {
- count++;
- }
- // If it's already filled, don't muck with it
- if (argValues[index] != null) {
- continue;
- }
- argValues[index] = holderClass.newInstance();
- // Store an RPCParam in the outs collection so we
- // have an easy and consistent way to write these
- // back to the client below
- RPCParam p =
- new RPCParam(param.getQName(), argValues[index]);
- p.setParamDesc(param);
- outs.add(p);
- } /*else {
- }*/
+ private void prepareArgumentsForInvocation(int numArgs, Vector args, ArrayList outs, Object[] argValues) {
+ for (int i = 0; i < numArgs; i++) {
+ RPCParam rpcParam = (RPCParam) args.get(i);
+ Object value = rpcParam.getValue();
+
+ // first check the type on the paramter
+ ParameterDesc paramDesc = rpcParam.getParamDesc();
+
+ // if we found some type info try to make sure the value type is
+ // correct. For instance, if we deserialized a xsd:dateTime in
+ // to a Calendar and the service takes a Date, we need to convert
+ if (paramDesc != null && paramDesc.getJavaType() != null) {
+
+ // Get the type in the signature (java type or its holder)
+ Class sigType = paramDesc.getJavaType();
+
+ // Convert the value into the expected type in the signature
+ value = JavaUtils.convert(value, sigType);
+
+ rpcParam.setValue(value);
+ if (paramDesc.getMode() == ParameterDesc.INOUT) {
+ outs.add(rpcParam);
}
+ }
+
+ // Put the value (possibly converted) in the argument array
+ // make sure to use the parameter order if we have it
+ if (paramDesc == null || paramDesc.getOrder() == -1) {
+ argValues[i] = value;
+ } else {
+ argValues[paramDesc.getOrder()] = value;
+ }
+
+ }
+ }
- //OK! Now we can invoke the method
- Object responseObject = null;
- responseObject =
- operation.getMethod().invoke(
- rmMessge.getServiceObject(),
- argValues);
-
- if (responseObject != null) {
- //now we have response
- SOAPEnvelope responseEnv = new SOAPEnvelope();
- SOAPBodyElement reponceBody =
- new SOAPBodyElement(
- responseEnv.createName(
- methodName + "Response",
- "xsd",
- Constants.RESPONSE_NAME_SPACE));
-
- Name name =
- responseEnv.createName(
- methodName + "Return",
- null,
- Constants.RESPONSE_NAME_SPACE);
- reponceBody.addChildElement(name).addTextNode(
- responseObject.toString());
-
- responseEnv.addBodyElement(reponceBody);
-
- String fromAddress =
- rmMessge
- .getAddressingHeaders()
- .getFrom()
- .getAddress()
- .toString();
- String replayToAddress = null;
- replayToAddress =
- rmMessge
- .getAddressingHeaders()
- .getReplyTo()
- .getAddress()
- .toString();
-
- //two cases here
- //1. send the response to from address :- from address and replay to address are same
- // in this case we going to send both ack and resonse in same message.
- //2. send the response to replay to address :- from and reply to address are different
- // in this case response is to reply to and ack is to from
-
- if (fromAddress.equals(replayToAddress)) {
- //send the response to from (see above comment)
- sendResponseToFrom(rmMessge, responseEnv);
- } else {
- //send the response to reply to (see above comment)
- sendResponseToReplayTo(rmMessge, responseEnv);
- }
-
+ private void prepareOutHolders(int numArgs, Object[] argValues, OperationDesc operation, ArrayList outs)
+ throws InstantiationException, IllegalAccessException
+ {
+ int count = numArgs;
+ for (int i = 0; i < argValues.length; i++) {
+
+ // We are interested only in OUT/INOUT
+ ParameterDesc param = operation.getParameter(i);
+ if (param.getMode() == ParameterDesc.IN)
+ continue;
+
+ Class holderClass = param.getJavaType();
+ if (holderClass != null
+ && Holder.class.isAssignableFrom(holderClass)) {
+ int index = count;
+ // Use the parameter order if specified or just stick them to the end.
+ if (param.getOrder() != -1) {
+ index = param.getOrder();
} else {
-
- //now we don't have response so we are sending only ack
- SOAPEnvelope responseEnv = new SOAPEnvelope();
- SOAPBodyElement responseBody =
- new SOAPBodyElement(responseEnv.createName(""));
- RMHeaders resHeaders = new RMHeaders();
- resHeaders.setSequenceAcknowledgement(
- this.getSequenceAcknowledgement());
- resHeaders.toSoapEnvelop(responseEnv);
-
- // create ack soap enveploe to send
- AddressingHeaders addressingHeaders =
- rmMessge.getAddressingHeaders();
- Address from = addressingHeaders.getFrom().getAddress();
- ;
- To to, resTo;
- Address replyTo;
- resTo = new To(from);
- to = addressingHeaders.getTo();
- From resFrom = new From(new Address(to.toString()));
- addAddressingHeader(responseEnv, resFrom, resTo, null);
-
- //Message ID
- UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
- URI msgIDURI = new URI("uuid:" + uuidGen.nextUUID());
- MessageID msgID = new MessageID(msgIDURI);
- msgID.toSOAPHeaderElement(responseEnv);
-
- //add action
- URI actionURI =
- new URI(Constants.WSRM_SEQUENCE_ACKNOWLEDGEMENT_ACTION);
- Action action = new Action(actionURI);
- action.toSOAPHeaderElement(responseEnv);
-
- // sending the ack
- Message responseMsg = new Message(responseEnv);
- Call call = new Call(resTo.toString());
- call.setRequestMessage(responseMsg);
- call.invoke();
-
+ count++;
}
-
- } catch (AxisFault e) {
-
- log.equals(e);
- } catch (Exception e) {
- log.error(e);
-
- }
- //can increase the value it have to be after invoke the methode
- lastProcessedMessageNumber++;
- //now u may process the next message
- //then check again
- processMessage();
+ // If it's already filled, don't muck with it
+ if (argValues[index] != null) {
+ continue;
+ }
+ argValues[index] = holderClass.newInstance();
+ // Store an RPCParam in the outs collection so we
+ // have an easy and consistent way to write these
+ // back to the client below
+ RPCParam p =
+ new RPCParam(param.getQName(), argValues[index]);
+ p.setParamDesc(param);
+ outs.add(p);
+ } /*else {
+ }*/
}
}
+ private void sendResponse(String methodName, Object responseObject, RMMessage rmMessge)
+ throws SOAPException
+ {
+ SOAPEnvelope responseEnv = new SOAPEnvelope();
+ SOAPBodyElement reponceBody =
+ new SOAPBodyElement(
+ responseEnv.createName(
+ methodName + "Response",
+ "xsd",
+ Constants.RESPONSE_NAME_SPACE));
+
+ Name name =
+ responseEnv.createName(
+ methodName + "Return",
+ null,
+ Constants.RESPONSE_NAME_SPACE);
+ reponceBody.addChildElement(name).addTextNode(
+ responseObject.toString());
+
+ responseEnv.addBodyElement(reponceBody);
+
+ String fromAddress =
+ rmMessge
+ .getAddressingHeaders()
+ .getFrom()
+ .getAddress()
+ .toString();
+ String replayToAddress = null;
+ replayToAddress =
+ rmMessge
+ .getAddressingHeaders()
+ .getReplyTo()
+ .getAddress()
+ .toString();
+
+ //two cases here
+ //1. send the response to from address :- from address and replay to address are same
+ // in this case we going to send both ack and resonse in same message.
+ //2. send the response to replay to address :- from and reply to address are different
+ // in this case response is to reply to and ack is to from
+
+ if (fromAddress.equals(replayToAddress)) {
+ //send the response to from (see above comment)
+ sendResponseToFrom(rmMessge, responseEnv);
+ } else {
+ //send the response to reply to (see above comment)
+ sendResponseToReplayTo(rmMessge, responseEnv);
+ }
+ }
+
+ private void sendOnlyAck(RMMessage rmMessge) throws Exception {
+ SOAPEnvelope responseEnv = new SOAPEnvelope();
+ SOAPBodyElement responseBody =
+ new SOAPBodyElement(responseEnv.createName(""));
+ RMHeaders resHeaders = new RMHeaders();
+ resHeaders.setSequenceAcknowledgement(
+ this.getSequenceAcknowledgement());
+ resHeaders.toSoapEnvelop(responseEnv);
+
+ // create ack soap enveploe to send
+ AddressingHeaders addressingHeaders =
+ rmMessge.getAddressingHeaders();
+ Address from = addressingHeaders.getFrom().getAddress();
+ ;
+ To to, resTo;
+ Address replyTo;
+ resTo = new To(from);
+ to = addressingHeaders.getTo();
+ From resFrom = new From(new Address(to.toString()));
+ addAddressingHeader(responseEnv, resFrom, resTo, null);
+
+ //Message ID
+ UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
+ URI msgIDURI = new URI("uuid:" + uuidGen.nextUUID());
+ MessageID msgID = new MessageID(msgIDURI);
+ msgID.toSOAPHeaderElement(responseEnv);
+
+ //add action
+ URI actionURI =
+ new URI(Constants.WSRM_SEQUENCE_ACKNOWLEDGEMENT_ACTION);
+ Action action = new Action(actionURI);
+ action.toSOAPHeaderElement(responseEnv);
+
+ // sending the ack
+ Message responseMsg = new Message(responseEnv);
+ Call call = new Call(resTo.toString());
+ call.setRequestMessage(responseMsg);
+ call.invoke();
+ }
+
/**
* Method addAddressingHeader
- *
- * @param soapEnv
- * @param from
- * @param to
- * @param reployTo
- * @throws Exception
+ *
+ * @param soapEnv
+ * @param from
+ * @param to
+ * @param reployTo
+ * @throws Exception
*/
-
+
private void addAddressingHeader(
SOAPEnvelope soapEnv,
From from,
@@ -428,212 +463,212 @@
}
}
}
-
+
/**
* Method insertClientMessage
- *
- * @param message
+ *
+ * @param message
*/
-
+
public synchronized void insertClientMessage(RMMessage message) {
-
+
message.setMessageNumber(nextMessageNo);
messageList.put(new Long(nextMessageNo), message);
nextMessageNo++;
-
+
}
-
+
/**
* Method insertResponseMessage
- *
- * @param message
+ *
+ * @param message
*/
-
+
public synchronized void insertResponseMessage(RMMessage message) {
long msgNo =
message
- .getRMHeaders()
- .getSequence()
- .getMessageNumber()
- .getMessageNumber();
-
+ .getRMHeaders()
+ .getSequence()
+ .getMessageNumber()
+ .getMessageNumber();
+
if (messageList.get((new Long(msgNo))) == null) {
messageList.put((new Long(msgNo)), message);
nextMessageNo++;
-
+
}
if (msgNo > maxMessageNumber) {
maxMessageNumber = msgNo;
}
-
+
}
-
+
/**
* Method insertServerMessage
- *
- * @param message
+ *
+ * @param message
*/
-
+
public synchronized void insertServerMessage(RMMessage message) {
-
+
long msgNo =
message
- .getRMHeaders()
- .getSequence()
- .getMessageNumber()
- .getMessageNumber();
-
+ .getRMHeaders()
+ .getSequence()
+ .getMessageNumber()
+ .getMessageNumber();
+
if (messageList.get((new Long(msgNo))) == null) {
messageList.put((new Long(msgNo)), message);
nextMessageNo++;
processMessage();
-
+
}
if (msgNo > maxMessageNumber) {
maxMessageNumber = msgNo;
}
-
+
}
-
+
/**
* Method retrieveMessage
- *
- * @param messageNo
- * @return
+ *
+ * @param messageNo
+ * @return
*/
-
+
public RMMessage retrieveMessage(Long messageNo) {
RMMessage message;
message = (RMMessage) messageList.get(messageNo);
return message;
}
-
+
/**
* Method setSequenceIdentifier
- *
- * @param identifier
+ *
+ * @param identifier
*/
public void setSequenceIdentifier(Identifier identifier) {
sequenceIdentifier = identifier;
}
-
+
/**
* Method getNextMessageNo
*/
public long getNextMessageNo() {
return nextMessageNo;
}
-
+
/**
* Method setNextMessageNo
- *
+ *
* @param l
*/
public void setNextMessageNo(long l) {
nextMessageNo = l;
}
-
+
/**
* Method getSequenceAcknowledgement
- *
+ *
* @return SequenceAcknowledgement
- *
+ *
*/
-
+
public SequenceAcknowledgement getSequenceAcknowledgement() {
-
+
SequenceAcknowledgement seqAck = new SequenceAcknowledgement();
long min = 0;
long max = 0;
boolean gotMsgRage = false;
AcknowledgementRange ackRange;
-
+
if (maxMessageNumber > 0) {
-
+
for (long i = 1; i <= maxMessageNumber + 1; i++) {
-
+
if (messageList.get(new Long(i)) != null) {
-
+
if (!gotMsgRage) {
-
+
min = i;
gotMsgRage = true;
}
} else {
if (gotMsgRage) {
-
+
max = i - 1;
gotMsgRage = false;
ackRange = new AcknowledgementRange();
ackRange.setMinValue(min);
ackRange.setMaxValue(max);
seqAck.addAckRanges(ackRange);
-
+
}
}
}
}
seqAck.setIdentifier(this.getSequenceIdentifier());
-
+
return seqAck;
}
-
+
/**
* Method setSequenceAcknowledgement
- *
+ *
* @param seqAck
- *
+ *
*/
-
+
public void setSequenceAcknowledgement(SequenceAcknowledgement seqAck) {
if (seqAck.getAckRanges() != null) {
Iterator iterator = seqAck.getAckRanges().iterator();
if (iterator != null) {
AcknowledgementRange ackRange;
RMMessage rmMsg;
-
+
while (iterator.hasNext()) {
ackRange = (AcknowledgementRange) iterator.next();
for (long i = ackRange.getMinValue();
- i <= ackRange.getMaxValue();
- i++) {
+ i <= ackRange.getMaxValue();
+ i++) {
rmMsg = (RMMessage) messageList.get(new Long(i));
if (rmMsg != null) {
rmMsg.setAcknowledged(true);
}
-
+
}
}
}
-
+
}
-
+
}
-
+
/**
* Method setResponseMessage
- *
+ *
* @param resMsg
- *
+ *
*/
-
+
public void setResponseMessage(RMMessage resMsg) {
RMMessage rmMsg =
(RMMessage) messageList.get((new Long(resMsg.getMessageNumber())));
if (rmMsg != null) {
rmMsg.setResponseMessage(resMsg.getRequestMessage());
} else {
-
+
}
}
-
+
/**
* Method updateAckedMessages
- *
+ *
* @param acknowledgement
- *
+ *
*/
-
+
public void updateAckedMessages(SequenceAcknowledgement acknowledgement) {
List msgList = acknowledgement.getAckRanges();
Iterator ite = msgList.iterator();
@@ -642,7 +677,7 @@
if (range != null) {
long max = range.getMaxValue();
long min = range.getMinValue();
-
+
for (long i = min; i <= max; i++) {
RMMessage msg = (RMMessage) messageList.get(new Long(i));
msg.setAcknowledged(true);
@@ -650,73 +685,73 @@
}
}
}
-
+
/**
* Method getLastProcessedMessageNumber
- *
+ *
* @return long
- *
+ *
*/
public long getLastProcessedMessageNumber() {
return lastProcessedMessageNumber;
}
-
+
/**
* Method getMaxMessageNumber
- *
+ *
* @return long
- *
+ *
*/
public long getMaxMessageNumber() {
return maxMessageNumber;
}
-
+
/**
* Method getMessageList
- *
+ *
* @return Map
- *
+ *
*/
public Map getMessageList() {
return messageList;
}
-
+
/**
* Method setLastProcessedMessageNumber
- *
+ *
* @param l
- *
+ *
*/
public void setLastProcessedMessageNumber(long l) {
lastProcessedMessageNumber = l;
}
-
+
/**
* Method setMaxMessageNumber
- *
+ *
* @param l
- *
+ *
*/
public void setMaxMessageNumber(long l) {
maxMessageNumber = l;
}
-
+
/**
* Method sendResponseToFrom
- *
+ *
* @param rmMessge
* @param responseEnv
- *
+ *
*/
-
+
private void sendResponseToFrom(
RMMessage rmMessge,
SOAPEnvelope responseEnv) {
-
+
RMMessage rmResMessage = new RMMessage();
rmResMessage.setRequestMessage(rmMessge.getResponseMessage());
UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
-
+
ClientMessageController clientMessageController =
ClientMessageController.getInstance();
try {
@@ -727,17 +762,17 @@
To to = addressingHeaders.getTo();
From resFrom = new From(new Address(to.toString()));
addAddressingHeader(responseEnv, resFrom, resTo, null);
-
+
if (responseSequenceIdentifier == null) {
- //no response sequence identifer
+ //no response sequence identifer
// two cases are here
// 1. can create a identifer server itself
// 2. have to send create sequence message to client
if (isClientDidReclamtion) {
- // case 2 above
- //if client did reclamtion when client sending request
+ // case 2 above
+ //if client did reclamtion when client sending request
// then we are going to create a new sequence
-
+
//create a sequence
RMHeaders createSeqRMHeaders = new RMHeaders();
CreateSequence createSeq = new CreateSequence();
@@ -745,7 +780,7 @@
SOAPEnvelope createSeqEnv = new SOAPEnvelope();
createSeqRMHeaders.toSoapEnvelop(createSeqEnv);
RMMessage createMsg = new RMMessage();
-
+
//to add message id
URI msgIDURI = new URI("uuid:" + uuidGen.nextUUID());
MessageID msgID = new MessageID(msgIDURI);
@@ -754,7 +789,7 @@
clientMessageController.storeMessage(createMsg);
ReplyTo createSeqReplyTo =
new ReplyTo(resFrom.getAddress());
-
+
//put neccessary headers
getInitialSOAPEnvelope(
createSeqEnv,
@@ -763,22 +798,22 @@
resTo,
createSeqReplyTo,
msgID);
- //send the message to client
+ //send the message to client
Call call = new Call(resTo.toString());
Message msg = new Message(createSeqEnv);
call.setRequestMessage(msg);
call.invoke();
-
+
//check for the response and get the identifier
for (int i = 0;
- i
- < org
- .apache
- .sandesha
- .Constants
- .SERVER_RESPONSE_CREATE_SEQUENCE_MAX_CHECK_COUNT;
- i++) {
-
+ i
+ < org
+ .apache
+ .sandesha
+ .Constants
+ .SERVER_RESPONSE_CREATE_SEQUENCE_MAX_CHECK_COUNT;
+ i++) {
+
Thread.sleep(
org
.apache
@@ -786,19 +821,19 @@
.Constants
.SERVER_RESPONSE_CREATE_SEQUENCE_CHECKING_INTERVAL);
if (createMsg.getResponseMessage() != null) {
-
+
RMHeaders createSeqResRMHeaders = new RMHeaders();
createSeqResRMHeaders.fromSOAPEnvelope(
createMsg
.getResponseMessage()
.getSOAPEnvelope());
if (createSeqResRMHeaders
- .getCreateSequenceResponse()
- != null) {
+ .getCreateSequenceResponse()
+ != null) {
responseSequenceIdentifier =
createSeqResRMHeaders
- .getCreateSequenceResponse()
- .getIdentifier();
+ .getCreateSequenceResponse()
+ .getIdentifier();
break;
} else {
////Exception;;;
@@ -806,22 +841,22 @@
}
}
}
-
+
} else {
-
- //no need to get the response sequence identifier from client send with server
+
+ //no need to get the response sequence identifier from client send with server
// own identifier
-
+
responseSequenceIdentifier = new Identifier();
responseSequenceIdentifier.setIdentifier(
"uuid:" + uuidGen.nextUUID());
}
-
+
}
//put the sequence in the sequence map
RMSequence rmResSeq =
clientMessageController.retrieveIfSequenceExists(
- responseSequenceIdentifier);
+ responseSequenceIdentifier);
if (rmResSeq == null) {
rmResSeq = new RMSequence();
rmResSeq.setSequenceIdentifier(responseSequenceIdentifier);
@@ -842,18 +877,18 @@
this.getSequenceAcknowledgement());
//get the envploye
resHeaders.toSoapEnvelop(responseEnv);
-
+
//to add message id
URI msgIDURI = new URI("uuid:" + uuidGen.nextUUID());
MessageID msgID = new MessageID(msgIDURI);
-
- //add relates to
+
+ //add relates to
AddressingHeaders resAddressingHaders = new AddressingHeaders();
resAddressingHaders.addRelatesTo(
addressingHeaders.getMessageID().toString(),
- (new QName("wsa", Constants.WS_ADDRESSING_NAMESPACE)));
+ (new QName("wsa", Constants.WS_ADDRESSING_NAMESPACE)));
resAddressingHaders.toEnvelope(responseEnv);
-
+
getInitialSOAPEnvelope(
responseEnv,
"wsrm:Response",
@@ -861,19 +896,19 @@
resTo,
null,
msgID);
-
- //send the message
+
+ //send the message
Call call =
new Call(
- rmMessge
- .getAddressingHeaders()
- .getFrom()
- .getAddress()
- .toString());
+ rmMessge
+ .getAddressingHeaders()
+ .getFrom()
+ .getAddress()
+ .toString());
Message responseMsg = new Message(responseEnv);
call.setRequestMessage(responseMsg);
call.invoke();
-
+
// set AckRequested
RMHeaders rmAckReqHeader = new RMHeaders();
AckRequested ackReq = new AckRequested();
@@ -881,17 +916,17 @@
rmAckReqHeader.setAckRequest(ackReq);
rmAckReqHeader.toSoapEnvelop(responseEnv);
call.setRequestMessage(responseMsg);
-
+
//wait for ack and resend
for (int i = 0;
- i
- < org
- .apache
- .sandesha
- .Constants
- .MAXIMAM_SERVER_RETRANSMISION_COUNT;
- i++) {
-
+ i
+ < org
+ .apache
+ .sandesha
+ .Constants
+ .MAXIMAM_SERVER_RETRANSMISION_COUNT;
+ i++) {
+
if (!rmMessge.isAcknowledged()) {
call.invoke();
} else {
@@ -908,30 +943,30 @@
// TODO Auto-generated catch block
log.error(e5);
}
-
+
}
-
+
/**
* Method sendResponseToReplayTo
- *
+ *
* @param rmMessge
* @param responseEnv
- *
+ *
*/
-
+
private void sendResponseToReplayTo(
RMMessage rmMessge,
SOAPEnvelope responseEnv) {
//here we are sending reponse to reply to and
- //ack to from
+ //ack to from
RMMessage rmResMessage = new RMMessage();
rmResMessage.setRequestMessage(rmMessge.getResponseMessage());
UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
-
+
ClientMessageController clientMessageController =
ClientMessageController.getInstance();
try {
-
+
AddressingHeaders addressingHeaders =
rmMessge.getAddressingHeaders();
Address from = addressingHeaders.getFrom().getAddress();
@@ -940,7 +975,7 @@
From resFrom = new From(new Address(to.toString()));
addAddressingHeader(responseEnv, resFrom, resTo, null);
Address replyTo = addressingHeaders.getReplyTo().getAddress();
-
+
if (responseSequenceIdentifier == null) {
if (isClientDidReclamtion) {
//see send response to from
@@ -949,14 +984,14 @@
createSeqRMHeaders.setCreateSequence(createSeq);
SOAPEnvelope createSeqEnv = new SOAPEnvelope();
createSeqRMHeaders.toSoapEnvelop(createSeqEnv);
-
+
RMMessage createMsg = new RMMessage();
-
+
URI msgIDURI = new URI("uuid:" + uuidGen.nextUUID());
MessageID msgID = new MessageID(msgIDURI);
ReplyTo createSeqReplyTo =
new ReplyTo(resFrom.getAddress());
-
+
createMsg.setMessageID(msgID);
clientMessageController.storeMessage(createMsg);
getInitialSOAPEnvelope(
@@ -968,16 +1003,16 @@
msgID);
Call call = new Call(resTo.toString());
call.invoke(createSeqEnv);
-
+
for (int i = 0;
- i
- < org
- .apache
- .sandesha
- .Constants
- .SERVER_RESPONSE_CREATE_SEQUENCE_MAX_CHECK_COUNT;
- i++) {
-
+ i
+ < org
+ .apache
+ .sandesha
+ .Constants
+ .SERVER_RESPONSE_CREATE_SEQUENCE_MAX_CHECK_COUNT;
+ i++) {
+
Thread.sleep(
org
.apache
@@ -991,39 +1026,39 @@
.getResponseMessage()
.getSOAPEnvelope());
if (createSeqResRMHeaders
- .getCreateSequenceResponse()
- != null) {
+ .getCreateSequenceResponse()
+ != null) {
responseSequenceIdentifier =
createSeqResRMHeaders
- .getCreateSequenceResponse()
- .getIdentifier();
+ .getCreateSequenceResponse()
+ .getIdentifier();
} else {
////Exception;;;
//TODO:Have to send the fault when there are no create sequence responses.
-
+
}
break;
}
}
-
+
} else {
-
+
responseSequenceIdentifier = new Identifier();
responseSequenceIdentifier.setIdentifier(
"uuid:" + uuidGen.nextUUID());
}
-
+
}
RMSequence rmResSeq =
clientMessageController.retrieveIfSequenceExists(
- responseSequenceIdentifier);
+ responseSequenceIdentifier);
if (rmResSeq == null) {
rmResSeq = new RMSequence();
rmResSeq.setSequenceIdentifier(responseSequenceIdentifier);
clientMessageController.storeSequence(rmResSeq);
}
rmResSeq.insertClientMessage(rmResMessage);
-
+
Sequence resSeq = new Sequence();
resSeq.setIdentifier(responseSequenceIdentifier);
MessageNumber resMsgNo = new MessageNumber();
@@ -1031,11 +1066,11 @@
resSeq.setMessageNumber(resMsgNo);
RMHeaders resHeaders = new RMHeaders();
resHeaders.setSequence(resSeq);
-
+
RMHeaders ackResHeaders = new RMHeaders();
-
+
resHeaders.setSequence(rmMessge.getRMHeaders().getSequence());
-
+
ackResHeaders.setSequenceAcknowledgement(
this.getSequenceAcknowledgement());
resHeaders.toSoapEnvelop(responseEnv);
@@ -1050,11 +1085,11 @@
null,
msgID);
AddressingHeaders resAddressingHaders = new AddressingHeaders();
- //add relates to
+ //add relates to
resAddressingHaders.addRelatesTo(
addressingHeaders.getMessageID().toString(),
- (new QName("wsa", Constants.WS_ADDRESSING_NAMESPACE)));
-
+ (new QName("wsa", Constants.WS_ADDRESSING_NAMESPACE)));
+
resAddressingHaders.toEnvelope(responseEnv);
//to send the response
Message responseMsg = new Message(responseEnv);
@@ -1074,22 +1109,22 @@
Message ackMsg = new Message(ackEnv);
Call ackCall =
new Call(
- rmMessge
- .getAddressingHeaders()
- .getFrom()
- .getAddress()
- .toString());
+ rmMessge
+ .getAddressingHeaders()
+ .getFrom()
+ .getAddress()
+ .toString());
ackCall.setRequestMessage(ackMsg);
ackCall.invoke();
-
+
//send respponse and wait for ack if not retranmiss
Call call =
new Call(
- rmMessge
- .getAddressingHeaders()
- .getFrom()
- .getAddress()
- .toString());
+ rmMessge
+ .getAddressingHeaders()
+ .getFrom()
+ .getAddress()
+ .toString());
call.setRequestMessage(responseMsg);
call.invoke();
//set AckRequested
@@ -1099,85 +1134,85 @@
rmAckReqHeader.setAckRequest(ackReq);
rmAckReqHeader.toSoapEnvelop(responseEnv);
call.setRequestMessage(responseMsg);
-
+
for (int i = 0;
- i
- < org
- .apache
- .sandesha
- .Constants
- .MAXIMAM_SERVER_RETRANSMISION_COUNT;
- i++) {
+ i
+ < org
+ .apache
+ .sandesha
+ .Constants
+ .MAXIMAM_SERVER_RETRANSMISION_COUNT;
+ i++) {
if (!rmMessge.isAcknowledged()) {
call.invoke();
} else {
break;
}
-
+
Thread.sleep(
org
.apache
.sandesha
.Constants
.SERVER_RETRANSMISION_INTERVAL);
-
+
}
} catch (Exception e) {
log.error(e);
}
-
+
}
-
+
/**
* Method getResponseSequenceIdentifier
- *
+ *
* @return Identifier
*/
public Identifier getResponseSequenceIdentifier() {
return responseSequenceIdentifier;
}
-
+
/**
* Method setResponseSequenceIdentifier
- *
+ *
* @param identifier
*/
public void setResponseSequenceIdentifier(Identifier identifier) {
responseSequenceIdentifier = identifier;
}
-
+
/**
* Method isClientDidReclamtion
- *
+ *
* @return boolean
*/
public boolean isClientDidReclamtion() {
return isClientDidReclamtion;
}
-
+
/**
* Method setClientDidReclamtion
- *
- * @param b
- *
+ *
+ * @param b
+ *
*/
public void setClientDidReclamtion(boolean b) {
isClientDidReclamtion = b;
}
-
+
/**
* Method setClientDidReclamtion
- *
+ *
* @param reqSOAPEnvelop
* @param strAction
* @param from
* @param to
* @param replyTo
* @param msgID
- *
+ *
* @return SOAPEnvelope
*/
-
+
private SOAPEnvelope getInitialSOAPEnvelope(
SOAPEnvelope reqSOAPEnvelop,
String strAction,
@@ -1185,9 +1220,9 @@
To to,
ReplyTo replyTo,
MessageID msgID) {
-
+
try {
-
+
reqSOAPEnvelop.addNamespaceDeclaration(
Constants.NS_PREFIX_RM,
Constants.NS_URI_RM);
@@ -1197,7 +1232,7 @@
reqSOAPEnvelop.addNamespaceDeclaration(
Constants.WSU_PREFIX,
Constants.WSU_NS);
-
+
URI actionURI = new URI(strAction);
Action action = new Action(actionURI);
action.toSOAPHeaderElement(reqSOAPEnvelop);
@@ -1207,17 +1242,17 @@
if (replyTo != null) {
replyTo.toSOAPHeaderElement(reqSOAPEnvelop);
}
-
+
to.toSOAPHeaderElement(reqSOAPEnvelop);
-
+
msgID.toSOAPHeaderElement(reqSOAPEnvelop);
-
+
} catch (Exception e3) {
// TODO Auto-generated catch block
log.error(e3);
}
-
+
return reqSOAPEnvelop;
}
-
+
}