You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ja...@apache.org on 2004/10/11 12:58:37 UTC
cvs commit: ws-fx/sandesha/src/org/apache/sandesha/server Sender.java RMInvoker.java ServerStorageManager.java RMMessageProcessorIdentifier.java
jaliya 2004/10/11 03:58:37
Modified: sandesha/src/org/apache/sandesha/server Sender.java
RMInvoker.java ServerStorageManager.java
RMMessageProcessorIdentifier.java
Log:
Few small modifications mainly code formatting.
Revision Changes Path
1.9 +6 -10 ws-fx/sandesha/src/org/apache/sandesha/server/Sender.java
Index: Sender.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/Sender.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- Sender.java 9 Oct 2004 08:50:23 -0000 1.8
+++ Sender.java 11 Oct 2004 10:58:37 -0000 1.9
@@ -70,11 +70,7 @@
switch (rmMessageContext.getMessageType()) {
case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST: {
try {
-
System.out.println("CREATE SEQ REQUESET");
- //storageManager.setApprovedOutSequence(
- // "abcdefghijk", "1233abcdefghijk");
-
//Send the message.
//may get the reply back to here.
Service service = new Service();
@@ -85,7 +81,7 @@
.getOutGoingAddress());
if (rmMessageContext.getMsgContext()
.getRequestMessage() == null)
- System.out.println("It is null man");
+ System.out.println("NULL REQUEST MESSAGE");
call.setRequestMessage(rmMessageContext
.getMsgContext().getRequestMessage());
@@ -134,14 +130,14 @@
//No response and we can just close the connection
try {
System.out.println("CREATE SEQ RESPONSE");
- System.out
- .println("******** Sending the message**************");
+ //System.out
+ // .println("******** Sending the message**************");
System.out
.println(rmMessageContext.getMsgContext()
.getResponseMessage()
.getSOAPPartAsString());
- System.out
- .println("******** Sending the message**************");
+ //System.out
+ // .println("******** Sending the message**************");
Service service = new Service();
Call call = (Call) service.createCall();
System.out.println(rmMessageContext
@@ -150,7 +146,7 @@
.getOutGoingAddress());
if (rmMessageContext.getMsgContext()
.getResponseMessage() == null)
- System.out.println("It is null man");
+ System.out.println("NULL RESPONSE MESSAGE");
call.setRequestMessage(rmMessageContext
.getMsgContext().getResponseMessage());
1.4 +15 -86 ws-fx/sandesha/src/org/apache/sandesha/server/RMInvoker.java
Index: RMInvoker.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/RMInvoker.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- RMInvoker.java 9 Oct 2004 08:50:23 -0000 1.3
+++ RMInvoker.java 11 Oct 2004 10:58:37 -0000 1.4
@@ -48,17 +48,14 @@
public void run() {
while (true) {
try {
-
System.out
.println("RMInvoker THREAD IS SLEEPING -----------000----------\n");
//Sleep for Constants.RMINVOKER_SLEEP_TIME.
//Currently the RMInvoker is a single thread, but this needs to
- // be
- //replaced by a thread pool so that the performance can be
+ // be replaced by a thread pool so that the performance can be
// improved.
Thread.sleep(Constants.RMINVOKER_SLEEP_TIME);
-
//Get the next message to invoke.
//storeManager is responsible of giving the correct message to
// invoke.
@@ -67,7 +64,6 @@
//If not return is null then proceed with invokation.
if (rmMessageContext != null) {
-
//Currently RPCProvider is used as the default provider and
// this is
//used to actually invoke the service.
@@ -104,19 +100,10 @@
boolean firstMsgOfResponseSeq = !storageManager
.isResponseSequenceExist(rmMessageContext
.getSequenceID());
-
- storageManager.insertResponseMessage(rmMessageContext); //This
- // will
- // automatically
- // create
- // a
- // response
- // requence
- // and
- // add
- // the
- // message.
+ storageManager.insertResponseMessage(rmMessageContext);
+ //This will automatically create a response requence
+ // and add the message.
//Need to decide whether the server needs resource
// reclamtion or not.
//This can be a property set by adminstrator and may be
@@ -131,16 +118,15 @@
System.out.println("NO RESPONSE SEQUENCE");
RMMessageContext rmMsgContext = new RMMessageContext();
rmMessageContext.copyContents(rmMsgContext);
-
+
MessageContext msgContext = new MessageContext(
rmMessageContext.getMsgContext()
.getAxisEngine());
-
- // RMMessageContext.copyMessageContext(
- // rmMessageContext.getMsgContext(),
- // msgContext);
+ // RMMessageContext.copyMessageContext(
+ // rmMessageContext.getMsgContext(),
+ // msgContext);
//Set this new msgContext to the rmMsgContext.
-
+
rmMessageContext.setMsgContext(msgContext);
rmMsgContext
@@ -148,18 +134,17 @@
UUIDGen uuid = UUIDGenFactory.getUUIDGen();
String id = uuid.nextUUID();
- //String id = "abcdefghijk";
-
- //Need to add "uuid" or we can always remove this part
+
+ //Need to add "uuid" or we can always remove this
+ // part
//Posible Problem
//TODO
storageManager.setTemporaryOutSequence(rmMsgContext
- .getSequenceID(), "uuid:"+id);
+ .getSequenceID(), "uuid:" + id);
SOAPEnvelope createSequenceEnvelope = EnvelopeCreator
.createCreateSequenceEnvelope(id,
rmMsgContext, Constants.SERVER);
-
rmMsgContext.getMsgContext().setRequestMessage(
new Message(createSequenceEnvelope));
@@ -168,73 +153,17 @@
.getAddressingHeaders().getReplyTo()
.getAddress().toString());
-
-
- rmMsgContext.setMessageID("uuid:"+id);
+ rmMsgContext.setMessageID("uuid:" + id);
storageManager
.addCreateSequenceRequest(rmMsgContext);
- System.out.println("END OF IF");
}
+ //Uncomment this section to print the queues.
ServerQueue sq = ServerQueue.getInstance();
sq.displayPriorityQueue();
sq.displayOutgoingMap();
sq.displayIncomingMap();
-
- //we need to check whether we have this sequence in the
- // response queue.
- //storageManager.isResponseSequenceExist();
- //If this is false then we need to create a new create
- // sequence request messag
- //and add it to the queue.
- /*
- * if(storageManager.isResponseSequenceExist(rmMessageContext.getSequenceID())){
- *
- * RMMessageContext rmMsgContext= new
- * RMMessageContext();
- * rmMessageContext.copyContents(rmMsgContext);
- * rmMsgContext.setMessageType(Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST);
- *
- * //Use UUIDGen to genrrate a temp messageID
- *
- * //We will create a new out sequence with this
- * right??.
- * storageManager.setTemporaryOutSequence(rmMessageContext.getSequenceID(),"abcdefghijklmnop");
- *
- * SOAPEnvelope
- * createSequenceEnvelop=EnvelopeCreator.createCreateSequenceRequestEnvelope(String
- * messageID);
- *
- * MessageContext msgContext = new
- * MessageContext(rmMessageContext.getMsgContext().getAxisEngine());
- * RMMessageContext.copyMessageContext(rmMessageContext.getMsgContext(),msgContext);
- *
- * rmMsgContext.getMsgContext().setRequestMessage(new
- * Message(createSequenceEnvelop));
- *
- * storageManager.addCreateSequenceRequest(rmMsgContext); }
- */
-
- //At this point we don't know the new sequence that we
- // are going to use for sending
- //the response messages. So we will just add the
- // response message as it is,
- //without RM or Addressing headers
-
- /*
- * //@@@@@@@@@@ ----THIS PART IS JUST FOR
- * TESTING-----@@@@@@@@@@@@@@@@@@@@@ //If we enable this
- * part, we can just send responses asynchronously.
- *
- * //We have to get this using create sequence from the
- * client. //This should be able to switched off
- * depending on the server config.
- * storageManager.setTemporaryOutSequence(rmMessageContext.getSequenceID(),"abcdefghijklmnop");
- * storageManager.setApprovedOutSequence("abcdefghijklmnop","qrstuvwxyz1234");
- *
- * //@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
- */
}
}
1.6 +283 -294 ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java
Index: ServerStorageManager.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- ServerStorageManager.java 9 Oct 2004 08:50:23 -0000 1.5
+++ ServerStorageManager.java 11 Oct 2004 10:58:37 -0000 1.6
@@ -37,317 +37,306 @@
public class ServerStorageManager implements IStorageManager {
- protected static Log log =
- LogFactory.getLog(ServerStorageManager.class.getName());
-
-
- private String tempSeqId=null; // used by getNextMessageToProcess();
-
-
- /**
- * A very important method.
- * Makes life easy for the thread or thread pool that is using this.
- * Every thread just have to create an instance of ServerStorageManager and
- * keep calling getNextMessageToProcess() and processing messages.
- * The method will try to give the messages from the same sequence id.
- * But if that doesnt hv processable messages it will go 4 a new sequence.
- *
- */
- public RMMessageContext getNextMessageToProcess() {
- IServerDAO accessor =
- ServerDAOFactory.getStorageAccessor(
- Constants.SERVER_QUEUE_ACCESSOR);
-
- if(tempSeqId==null)
- tempSeqId = accessor.getRandomSeqIdToProcess();
-
- if(tempSeqId==null)
- return null;
-
- RMMessageContext nextMsg = accessor.getNextMsgContextToProcess(tempSeqId);
-
- if(nextMsg==null){
- tempSeqId = accessor.getRandomSeqIdToProcess();
- nextMsg = accessor.getNextMsgContextToProcess(tempSeqId);
- }
-
- return nextMsg;
- }
+ protected static Log log = LogFactory.getLog(ServerStorageManager.class
+ .getName());
+ private String tempSeqId = null; // used by getNextMessageToProcess();
+ /**
+ * A very important method. Makes life easy for the thread or thread pool
+ * that is using this. Every thread just have to create an instance of
+ * ServerStorageManager and keep calling getNextMessageToProcess() and
+ * processing messages. The method will try to give the messages from the
+ * same sequence id. But if that doesnt hv processable messages it will go 4
+ * a new sequence.
+ *
+ */
+ public RMMessageContext getNextMessageToProcess() {
+ IServerDAO accessor = ServerDAOFactory
+ .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+
+ if (tempSeqId == null)
+ tempSeqId = accessor.getRandomSeqIdToProcess();
+
+ if (tempSeqId == null)
+ return null;
+
+ RMMessageContext nextMsg = accessor
+ .getNextMsgContextToProcess(tempSeqId);
+
+ if (nextMsg == null) {
+ tempSeqId = accessor.getRandomSeqIdToProcess();
+ nextMsg = accessor.getNextMsgContextToProcess(tempSeqId);
+ }
+
+ return nextMsg;
+ }
+
+ public void setAcknowledged(String seqID, long msgNumber) {
+ IServerDAO accessor = ServerDAOFactory
+ .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+ //TODO decide this in implementing the ServerSender.
+
+ accessor.moveOutgoingMessageToBin(seqID, new Long(msgNumber));
+ }
+
+ public void init() {
+ //:TODO Complete
+ }
+
+ /**
+ * This will insert the message to the sequence in the InqQueue identified
+ * by sequenceId. If sequence is not present will create a new one.
+ */
+ public void insertRequestMessage(RMMessageContext rmMessageContext) {
+
+ IServerDAO accessor = ServerDAOFactory
+ .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+
+ //No need to use this property
+ //RMHeaders rmHeaders =
+ // (RMHeaders) rmMessageContext.getMsgContext().getProperty(
+ // org.apache.sandesha.Constants.ENV_RM_REQUEST_HEADERS);
- public void setAcknowledged(String seqID,long msgNumber) {
- IServerDAO accessor =
- ServerDAOFactory.getStorageAccessor(
- Constants.SERVER_QUEUE_ACCESSOR);
- //TODO decide this in implementing the ServerSender.
-
- accessor.moveOutgoingMessageToBin(seqID,new Long(msgNumber));
- }
-
- public void init() {
- //:TODO Complete
- }
+ RMHeaders rmHeaders = rmMessageContext.getRMHeaders();
+ String sequenceId = rmHeaders.getSequence().getIdentifier()
+ .getIdentifier();
+
+ boolean exists = accessor.isIncomingSequenceExists(sequenceId);
+
+ if (!exists)
+ addSequence(sequenceId); //Creating new sequence
+
+ //TODO: add getRmHeaders method to MessageContext
+ long messageNumber = rmHeaders.getSequence().getMessageNumber()
+ .getMessageNumber();
+
+ if (messageNumber <= 0)
+ return; //TODO: throw some exception
+
+ Long msgNo = new Long(messageNumber);
+ accessor.addMessageToIncomingSequence(sequenceId, msgNo,
+ rmMessageContext);
+ }
+
+ public void insertResponseMessage(RMMessageContext msg) {
+ IServerDAO accessor = ServerDAOFactory
+ .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+ String sequenceId = msg.getSequenceID();
+ //RMHeaders rmHeaders =msg.getRMHeaders();
+ //String sequenceId =
+ // rmHeaders.getSequence().getIdentifier().getIdentifier();
+
+ boolean exists = accessor.isOutgoingSequenceExists(sequenceId);
+ if (!exists)
+ accessor.addOutgoingSequence(sequenceId);
+
+ accessor.addMessageToOutgoingSequence(sequenceId, msg);
+
+ }
/**
- * This will insert the message to the sequence in the InqQueue
- * identified by sequenceId.
- * If sequence is not present will create a new one.
- */
- public void insertRequestMessage(RMMessageContext rmMessageContext) {
-
- IServerDAO accessor =
- ServerDAOFactory.getStorageAccessor(
- Constants.SERVER_QUEUE_ACCESSOR);
-
- //No need to use this property
- //RMHeaders rmHeaders =
- // (RMHeaders) rmMessageContext.getMsgContext().getProperty(
- // org.apache.sandesha.Constants.ENV_RM_REQUEST_HEADERS);
-
- RMHeaders rmHeaders =rmMessageContext.getRMHeaders();
-
- String sequenceId = rmHeaders.getSequence().getIdentifier().getIdentifier();
-
- boolean exists = accessor.isIncomingSequenceExists(sequenceId);
-
- if (!exists)
- addSequence(sequenceId); //Creating new sequence
-
-
- //TODO: add getRmHeaders method to MessageContext
- long messageNumber = rmHeaders.getSequence().getMessageNumber().getMessageNumber();
-
- if(messageNumber<=0)
- return; //TODO: throw some exception
-
- Long msgNo = new Long(messageNumber);
- accessor.addMessageToIncomingSequence(sequenceId,msgNo,rmMessageContext);
- }
-
-
- public void insertResponseMessage(RMMessageContext msg){
- IServerDAO accessor =
- ServerDAOFactory.getStorageAccessor(
- Constants.SERVER_QUEUE_ACCESSOR);
- String sequenceId = msg.getSequenceID();
- //RMHeaders rmHeaders =msg.getRMHeaders();
- //String sequenceId = rmHeaders.getSequence().getIdentifier().getIdentifier();
-
- boolean exists = accessor.isOutgoingSequenceExists(sequenceId);
- if(!exists)
- accessor.addOutgoingSequence(sequenceId);
-
- accessor.addMessageToOutgoingSequence(sequenceId,msg);
-
+ * Used to find out weather the sequence with this id has already been
+ * created.
+ */
+ public boolean isSequenceExist(String sequenceID) {
+ IServerDAO accessor = ServerDAOFactory
+ .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+ return accessor.isIncomingSequenceExists(sequenceID);
+ }
+
+ public boolean isResponseSequenceExist(String sequenceID) {
+ IServerDAO accessor = ServerDAOFactory
+ .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+ //return accessor.isIncomingSequenceExists(sequenceID);
+ return accessor.isOutgoingSequenceExists(sequenceID);
+ }
+
+ /**
+ * This is used to get a random message from the out queue Basically server
+ * sender will use this.
+ */
+ public RMMessageContext getNextMessageToSend() {
+ IServerDAO accessor = ServerDAOFactory
+ .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+ RMMessageContext msg;
+
+ msg = accessor.getNextPriorityMessageContextToSend();
+
+ if (msg == null)
+ msg = accessor.getNextOutgoingMsgContextToSend();
+
+ return msg;
}
-
+
/**
- * Used to find out weather the sequence with this id
- * has already been created.
+ * This is used to add a new message to the out queue. Will be used by
+ * various processors.
*/
- public boolean isSequenceExist(String sequenceID) {
- IServerDAO accessor =
- ServerDAOFactory.getStorageAccessor(
- Constants.SERVER_QUEUE_ACCESSOR);
- return accessor.isIncomingSequenceExists(sequenceID);
- }
-
- public boolean isResponseSequenceExist(String sequenceID) {
- IServerDAO accessor =
- ServerDAOFactory.getStorageAccessor(
- Constants.SERVER_QUEUE_ACCESSOR);
- //return accessor.isIncomingSequenceExists(sequenceID);
- return accessor.isOutgoingSequenceExists(sequenceID);
- }
-
+ /*
+ * public void addMessageToOutQueue(RMMessageContext rmMessageContext) {
+ * IServerDAO accessor = ServerDAOFactory.getStorageAccessor(
+ * Constants.SERVER_QUEUE_ACCESSOR); boolean result =
+ * accessor.addOutQueueMessage(rmMessageContext);
+ *
+ * if(!result) log.error("Message was not added to the out queue"); }
+ */
+
/**
- * This is used to get a random message from the out queue
- * Basically server sender will use this.
+ * Will be used to add a new Sequence Hash to the In Queue.
*/
- public RMMessageContext getNextMessageToSend() {
- IServerDAO accessor =
- ServerDAOFactory.getStorageAccessor(
- Constants.SERVER_QUEUE_ACCESSOR);
- RMMessageContext msg;
-
- msg = accessor.getNextPriorityMessageContextToSend();
-
- if(msg==null)
- msg = accessor.getNextOutgoingMsgContextToSend();
-
- return msg;
- }
+ public void addSequence(String sequenceId) {
+ IServerDAO accessor = ServerDAOFactory
+ .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+ boolean result = accessor.addIncomingSequence(sequenceId);
+
+ if (!result)
+ log.error("Sequence was not created correcly in the in queue");
+ }
/**
- * This is used to add a new message to the out queue.
- * Will be used by various processors.
+ * This gives a sorted(by keys) map of messageIds present for a sequence.
+ * This will be used to send Acks.
+ */
+ public Map getListOfMessageNumbers(String sequenceID) {
+
+ IServerDAO accessor = ServerDAOFactory
+ .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+
+ Set st = accessor.getAllReceivedMsgNumsOfIncomingSeq(sequenceID);
+
+ Iterator it = st.iterator();
+
+ //To find the largest id present
+ long largest = 0;
+ while (it.hasNext()) {
+ Long key = (Long) it.next();
+ if (key == null)
+ continue;
+
+ long l = key.longValue();
+ if (l > largest)
+ largest = l;
+ }
+
+ HashMap results = new HashMap();
+ //Add Keys to the results in order.
+ long currentPosition = 1;
+ for (long l = 1; l <= largest; l++) {
+ boolean present = st.contains(new Long(l));
+ if (present) {
+ results.put(new Long(currentPosition), new Long(l));
+ currentPosition++;
+ }
+ }
+ return results;
+ }
+
+ public boolean isMessageExist(String sequenceID, long messageNumber) {
+ IServerDAO accessor = ServerDAOFactory
+ .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+ return accessor.isIncomingMessageExists(sequenceID, new Long(
+ messageNumber));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.sandesha.IStorageManager#addCreateSequenceResponse(org.apache.sandesha.RMMessageContext)
+ */
+ public void addCreateSequenceResponse(RMMessageContext rmMessageContext) {
+ addPriorityMessage(rmMessageContext);
+ }
+
+ public void addCreateSequenceRequest(RMMessageContext rmMessageContext) {
+ addPriorityMessage(rmMessageContext);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.sandesha.IStorageManager#addAcknowledgement(org.apache.sandesha.RMMessageContext)
+ */
+ public void addAcknowledgement(RMMessageContext rmMessageContext) {
+ addPriorityMessage(rmMessageContext);
+ }
+
+ private void addPriorityMessage(RMMessageContext msg) {
+ IServerDAO accessor = ServerDAOFactory
+ .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+
+ accessor.addPriorityMessage(msg);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.sandesha.IStorageManager#getNextResponseMessageToSend()
*/
- /*public void addMessageToOutQueue(RMMessageContext rmMessageContext) {
- IServerDAO accessor =
- ServerDAOFactory.getStorageAccessor(
- Constants.SERVER_QUEUE_ACCESSOR);
- boolean result = accessor.addOutQueueMessage(rmMessageContext);
-
- if(!result)
- log.error("Message was not added to the out queue");
- }*/
-
- /**
- * Will be used to add a new Sequence Hash to the
- * In Queue.
- */
- public void addSequence(String sequenceId) {
- IServerDAO accessor =
- ServerDAOFactory.getStorageAccessor(
- Constants.SERVER_QUEUE_ACCESSOR);
- boolean result = accessor.addIncomingSequence(sequenceId);
-
- if(!result)
- log.error("Sequence was not created correcly in the in queue");
- }
-
- /**
- * This gives a sorted(by keys) map of messageIds present for a
- * sequence. This will be used to send Acks.
- */
- public Map getListOfMessageNumbers(String sequenceID){
-
- IServerDAO accessor =
- ServerDAOFactory.getStorageAccessor(
- Constants.SERVER_QUEUE_ACCESSOR);
-
- Set st = accessor.getAllReceivedMsgNumsOfIncomingSeq(sequenceID);
-
- Iterator it = st.iterator();
-
- //To find the largest id present
- long largest=0;
- while(it.hasNext()){
- Long key = (Long) it.next();
- if(key==null)
- continue;
-
- long l = key.longValue();
- if(l>largest)
- largest = l;
- }
-
-
- HashMap results = new HashMap();
- //Add Keys to the results in order.
- long currentPosition=1;
- for(long l=1;l<=largest;l++){
- boolean present = st.contains(new Long(l));
- if(present){
- results.put(new Long(currentPosition),new Long(l));
- currentPosition++;
- }
- }
- return results;
- }
-
- public boolean isMessageExist(String sequenceID,long messageNumber){
- IServerDAO accessor =
- ServerDAOFactory.getStorageAccessor(
- Constants.SERVER_QUEUE_ACCESSOR);
- return accessor.isIncomingMessageExists(sequenceID,new Long(messageNumber));
- }
-
- /* (non-Javadoc)
- * @see org.apache.sandesha.IStorageManager#addCreateSequenceResponse(org.apache.sandesha.RMMessageContext)
- */
- public void addCreateSequenceResponse(RMMessageContext rmMessageContext) {
- addPriorityMessage(rmMessageContext);
- }
-
- public void addCreateSequenceRequest(RMMessageContext rmMessageContext) {
- addPriorityMessage(rmMessageContext);
- }
-
- /* (non-Javadoc)
- * @see org.apache.sandesha.IStorageManager#addAcknowledgement(org.apache.sandesha.RMMessageContext)
- */
- public void addAcknowledgement(RMMessageContext rmMessageContext) {
- addPriorityMessage(rmMessageContext);
- }
-
- private void addPriorityMessage(RMMessageContext msg){
- IServerDAO accessor = ServerDAOFactory.getStorageAccessor(
- Constants.SERVER_QUEUE_ACCESSOR);
-
- accessor.addPriorityMessage(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.sandesha.IStorageManager#getNextResponseMessageToSend()
- */
- /*public RMMessageContext getNextResponseMessageToSend() {
- // TODO Auto-generated method stub
- return null;
- }*/
-
-
- //Simple method to sort an object array.
- /*private Object[] sortObjArray(Object[] objs){
-
- Object temp;
- for(int i=0;i<objs.length;i++){
- for(int j=(i+1);j<objs.length;j++){
- long l1 = ((Long) objs[i]).longValue();
- long l2 = ((Long) objs[j]).longValue();
-
- if(l1>l2){
- //swaping
- temp=objs[i];
- objs[i]=objs[j];
- objs[j]=temp;
- }
- }
- }
-
- return objs;
- }*/
-
-
- /* (non-Javadoc)
- * @see org.apache.sandesha.IStorageManager#setOutSequence(java.lang.String, java.lang.String)
- */
- public void setTemporaryOutSequence(String sequenceId,String outSequenceId) {
- IServerDAO accessor = ServerDAOFactory.getStorageAccessor(
- Constants.SERVER_QUEUE_ACCESSOR);
-
- accessor.setOutSequence(sequenceId,outSequenceId);
- accessor.setOutSequenceApproved(sequenceId,false);
- }
-
- public boolean setApprovedOutSequence(String oldOutsequenceId,String newOutSequenceId){
- IServerDAO accessor = ServerDAOFactory.getStorageAccessor(
- Constants.SERVER_QUEUE_ACCESSOR);
-
- boolean done = false;
- String sequenceID = accessor.getSequenceOfOutSequence(oldOutsequenceId);
-
- if(sequenceID==null){
- System.out.println("REOVING REMOVING REMOVING REMO " +oldOutsequenceId);
- return false;
- }
- System.out.println("REOVING REMOVING " +oldOutsequenceId);
- accessor.setOutSequence(sequenceID,newOutSequenceId);
- accessor.setOutSequenceApproved(sequenceID,true);
-
- //Deleting create sequence message from the priority queue.
- accessor.removeCreateSequenceMsg(oldOutsequenceId);
- return true;
- }
+ /*
+ * public RMMessageContext getNextResponseMessageToSend() { // TODO
+ * Auto-generated method stub return null; }
+ */
+
+ //Simple method to sort an object array.
+ /*
+ * private Object[] sortObjArray(Object[] objs){
+ *
+ * Object temp; for(int i=0;i <objs.length;i++){ for(int j=(i+1);j
+ * <objs.length;j++){ long l1 = ((Long) objs[i]).longValue(); long l2 =
+ * ((Long) objs[j]).longValue();
+ *
+ * if(l1>l2){ //swaping temp=objs[i]; objs[i]=objs[j]; objs[j]=temp; } } }
+ *
+ * return objs; }
+ */
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.sandesha.IStorageManager#setOutSequence(java.lang.String,
+ * java.lang.String)
+ */
+ public void setTemporaryOutSequence(String sequenceId, String outSequenceId) {
+ IServerDAO accessor = ServerDAOFactory
+ .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+
+ accessor.setOutSequence(sequenceId, outSequenceId);
+ accessor.setOutSequenceApproved(sequenceId, false);
+ }
+
+ public boolean setApprovedOutSequence(String oldOutsequenceId,
+ String newOutSequenceId) {
+ IServerDAO accessor = ServerDAOFactory
+ .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+
+ boolean done = false;
+ String sequenceID = accessor.getSequenceOfOutSequence(oldOutsequenceId);
+
+ if (sequenceID == null) {
+ // System.out.println("REOVING REMOVING REMOVING REMO "
+ // +oldOutsequenceId);
+ return false;
+ }
+ System.out.println("REOVING REMOVING " + oldOutsequenceId);
+ accessor.setOutSequence(sequenceID, newOutSequenceId);
+ accessor.setOutSequenceApproved(sequenceID, true);
+
+ //Deleting create sequence message from the priority queue.
+ accessor.removeCreateSequenceMsg(oldOutsequenceId);
+ return true;
+ }
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.sandesha.IStorageManager#getNextMessageNumber(java.lang.String)
*/
public long getNextMessageNumber(String sequenceID) {
- IServerDAO accessor = ServerDAOFactory.getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
- long l = accessor.getNextOutgoingMessageNumber (sequenceID);
+ IServerDAO accessor = ServerDAOFactory
+ .getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
+ long l = accessor.getNextOutgoingMessageNumber(sequenceID);
return l;
}
-}
+}
\ No newline at end of file
1.7 +11 -4 ws-fx/sandesha/src/org/apache/sandesha/server/RMMessageProcessorIdentifier.java
Index: RMMessageProcessorIdentifier.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/RMMessageProcessorIdentifier.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- RMMessageProcessorIdentifier.java 9 Oct 2004 08:50:23 -0000 1.6
+++ RMMessageProcessorIdentifier.java 11 Oct 2004 10:58:37 -0000 1.7
@@ -23,16 +23,24 @@
import org.apache.sandesha.ws.rm.RMHeaders;
/**
- * @author
+ * @author
*/
public class RMMessageProcessorIdentifier {
-
+ /**
+ * This method will identify the messages. Messages specific to the
+ * reliablility are identified using the action. Request messages are
+ * identified using the message number property.
+ *
+ * @param rmMessageContext
+ * @param storageManager
+ * @return
+ */
public static IRMMessageProcessor getMessageProcessor(
RMMessageContext rmMessageContext, IStorageManager storageManager) {
AddressingHeaders addrHeaders = rmMessageContext.getAddressingHeaders();
RMHeaders rmHeaders = rmMessageContext.getRMHeaders();
-
+
if (addrHeaders.getAction().toString().equals(
Constants.ACTION_CREATE_SEQUENCE)) {
return new CreateSequenceProcessor(storageManager);
@@ -44,7 +52,6 @@
return new TerminateSequenceProcessor(storageManager);
} else if ((rmHeaders.getSequenceAcknowledgement() != null)
|| (rmHeaders.getSequence().getMessageNumber() != null)) {
-
return new CompositeProcessor(storageManager);
} else
return new FaultProcessor(storageManager);