You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ja...@apache.org on 2004/09/13 13:26:51 UTC
cvs commit: ws-fx/sandesha/src/org/apache/sandesha/server/queue QueueException.java ResponseSequenceHash.java SequenceHash.java ServerQueue.java
jaliya 2004/09/13 04:26:51
Modified: sandesha/src/org/apache/sandesha/server/queue
QueueException.java ResponseSequenceHash.java
SequenceHash.java ServerQueue.java
Log:
Formatted code
Revision Changes Path
1.2 +6 -7 ws-fx/sandesha/src/org/apache/sandesha/server/queue/QueueException.java
Index: QueueException.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/QueueException.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- QueueException.java 28 Aug 2004 03:12:29 -0000 1.1
+++ QueueException.java 13 Sep 2004 11:26:51 -0000 1.2
@@ -16,7 +16,6 @@
*/
package org.apache.sandesha.server.queue;
-import org.apache.sandesha.RMSequence;
/**
* @author Chamikara Jayalath
@@ -24,9 +23,9 @@
*/
public class QueueException extends Exception {
-
- public QueueException(String msg){
- super(msg);
-
- }
-}
+
+ public QueueException(String msg) {
+ super(msg);
+
+ }
+}
\ No newline at end of file
1.2 +193 -207 ws-fx/sandesha/src/org/apache/sandesha/server/queue/ResponseSequenceHash.java
Index: ResponseSequenceHash.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ResponseSequenceHash.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ResponseSequenceHash.java 7 Sep 2004 10:57:30 -0000 1.1
+++ ResponseSequenceHash.java 13 Sep 2004 11:26:51 -0000 1.2
@@ -21,15 +21,10 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
-import java.util.Vector;
import org.apache.sandesha.Constants;
import org.apache.sandesha.RMMessageContext;
-
-
-
-
/*
* Created on Aug 4, 2004 at 5:08:29 PM
*/
@@ -40,214 +35,205 @@
*/
/**
- * This class works as a hash map for storing response messages
- * until they are sent.
+ * This class works as a hash map for storing response messages until they are
+ * sent.
*/
public class ResponseSequenceHash {
-
-
- //following concepts was removed from responseQueue.
- //This was to reduce complexity. (Since here time is also taken into account)
- //private long lastProcessed;
- //private boolean hasMessagesToSend;
-
- private String sequenceId;
- private String outSequenceId;
- private boolean outSeqApproved;
- private HashMap hash;
- private long nextAutoNumber; // key for storing messages.
-
- public ResponseSequenceHash(String sequenceId){
- //lastProcessed = 0;
- //hasMessagesToSend = false;
- this.sequenceId = sequenceId;
- hash = new HashMap();
- nextAutoNumber = 1; //This is the key for storing messages.
- outSeqApproved = false;
- }
-
- /*public boolean hasMessagesToSend(){
- return hasMessagesToSend;
- }*/
-
-
- public String getSequenceId() {
- return sequenceId;
- }
-
- public void setSequenceId(String sequenceId) {
- this.sequenceId = sequenceId;
- }
-
- public boolean isOutSeqApproved() {
- return outSeqApproved;
- }
-
- public void setOutSeqApproved(boolean b) {
- outSeqApproved = b;
- }
-
- public String getOutSequenceId() {
- return outSequenceId;
- }
-
- public void setOutSequenceId(String string) {
- outSequenceId = string;
- }
-
-
- /**
- * adds the message to map.
- */
- public Object putNewMessage(RMMessageContext msg) {
- Long key = new Long(nextAutoNumber);
- Object obj = hash.put(key, msg);
- increaseAutoNo();
- //refreshHasProcessableMessages();
- return obj;
- }
-
- /**
- * Increases auto number by 1.
- */
- private void increaseAutoNo(){
- nextAutoNumber++;
- }
-
- /**
- * Removes a message from the hash map.
- */
- public boolean removeMessage(long id){
- //TODO: Add messageremoving code if needed.
- boolean removed = false;
-
- Long key = new Long(id);
- Object obj = hash.remove(key);
-
- if(obj!=null)
- removed = true;
-
+ //following concepts was removed from responseQueue.
+ //This was to reduce complexity. (Since here time is also taken into
+ // account)
+ //private long lastProcessed;
+ //private boolean hasMessagesToSend;
+
+ private String sequenceId;
+
+ private String outSequenceId;
+
+ private boolean outSeqApproved;
+
+ private HashMap hash;
+
+ private long nextAutoNumber; // key for storing messages.
+
+ public ResponseSequenceHash(String sequenceId) {
+ //lastProcessed = 0;
+ //hasMessagesToSend = false;
+ this.sequenceId = sequenceId;
+ hash = new HashMap();
+ nextAutoNumber = 1; //This is the key for storing messages.
+ outSeqApproved = false;
+ }
+
+ /*
+ * public boolean hasMessagesToSend(){ return hasMessagesToSend; }
+ */
+
+ public String getSequenceId() {
+ return sequenceId;
+ }
+
+ public void setSequenceId(String sequenceId) {
+ this.sequenceId = sequenceId;
+ }
+
+ public boolean isOutSeqApproved() {
+ return outSeqApproved;
+ }
+
+ public void setOutSeqApproved(boolean b) {
+ outSeqApproved = b;
+ }
+
+ public String getOutSequenceId() {
+ return outSequenceId;
+ }
+
+ public void setOutSequenceId(String string) {
+ outSequenceId = string;
+ }
+
+ /**
+ * adds the message to map.
+ */
+ public Object putNewMessage(RMMessageContext msg) {
+ Long key = new Long(nextAutoNumber);
+ Object obj = hash.put(key, msg);
+ increaseAutoNo();
+ //refreshHasProcessableMessages();
+ return obj;
+ }
+
+ /**
+ * Increases auto number by 1.
+ */
+ private void increaseAutoNo() {
+ nextAutoNumber++;
+ }
+
+ /**
+ * Removes a message from the hash map.
+ */
+ public boolean removeMessage(long id) {
+ //TODO: Add messageremoving code if needed.
+ boolean removed = false;
+
+ Long key = new Long(id);
+ Object obj = hash.remove(key);
+
+ if (obj != null)
+ removed = true;
+
return removed;
}
-
- /**
- * Returns the key of the next message to be sent.
- */
- /*public long getNextMessageKeyToSend(){
-
- long id = lastProcessed+1;
-
- return id;
- }*/
-
-
- /**
- * Returns the next deliverable message if has any.
- * Otherwise returns null.
- */
- public RMMessageContext getNextMessageToSend(){
- //Long nextKey = new Long (lastProcessed+1);
- //RMMessageContext msg = (RMMessageContext) hash.get(nextKey);
-
- RMMessageContext msg=null;
-
- Iterator keys = hash.keySet().iterator();
-
- whileLoop:
- while(keys.hasNext()){
- RMMessageContext tempMsg;
- tempMsg = (RMMessageContext) hash.get(keys.next());
- long lastSentTime = tempMsg.getLastSentTime();
- //System.out.println("last sent time: "+lastSentTime);
- Date d = new Date();
- long currentTime = d.getTime();
- //System.out.println("current time: "+currentTime);
- // System.out.println("difference: "+(currentTime-lastSentTime));
- if(currentTime >= lastSentTime + Constants.RETRANSMISSION_INTERVAL){
- msg = tempMsg;
- msg.setLastSentTime(currentTime);
- break whileLoop;
- }
- }
-
- return msg;
- }
-
- /**
- * Gives all the deliverable messages of this sequence.
- * Resturns a vector.
- */
- /*public Vector getNextMessagesToSend(){
-
- boolean done = false;
- Vector messages = new Vector();
-
- while(!done){
- Long nextKey = new Long(lastProcessed+1);
- Object obj = hash.get(nextKey);
- if(obj!=null){
- messages.add(obj);
- incrementProcessedCount();
- }else{
- done=true; //To exit the loop.
- }
- }
- refreshHasProcessableMessages();
-
- return messages;
- }*/
-
- /*private void incrementProcessedCount(){
- lastProcessed++;
- }*/
-
- /*private void refreshHasProcessableMessages(){
- Long nextKey = new Long(lastProcessed+1);
- hasMessagesToSend = hash.containsKey(nextKey);
- }*/
-
- public boolean hasMessage(Long key){
- Object obj = hash.get(key);
-
- return (!(obj==null));
- }
-
- public void clearSequence(boolean yes){
- if(!yes)
- return;
-
- hash.clear();
- //lastProcessed = 0;
- //hasMessagesToSend = false;
- nextAutoNumber = 1;
- outSeqApproved = false;
- outSequenceId = null;
- sequenceId = null;
- }
-
- public Set getAllKeys(){
- return hash.keySet();
- }
-
- public String getMessageId(Long key){
- RMMessageContext msg = (RMMessageContext) hash.get(key);
- if(msg==null)
- return null;
-
- return msg.getMessageID();
-
+
+ /**
+ * Returns the key of the next message to be sent.
+ */
+ /*
+ * public long getNextMessageKeyToSend(){
+ *
+ * long id = lastProcessed+1;
+ *
+ * return id; }
+ */
+
+ /**
+ * Returns the next deliverable message if has any. Otherwise returns null.
+ */
+ public RMMessageContext getNextMessageToSend() {
+ //Long nextKey = new Long (lastProcessed+1);
+ //RMMessageContext msg = (RMMessageContext) hash.get(nextKey);
+
+ RMMessageContext msg = null;
+
+ Iterator keys = hash.keySet().iterator();
+
+ whileLoop: while (keys.hasNext()) {
+ RMMessageContext tempMsg;
+ tempMsg = (RMMessageContext) hash.get(keys.next());
+ long lastSentTime = tempMsg.getLastSentTime();
+ //System.out.println("last sent time: "+lastSentTime);
+ Date d = new Date();
+ long currentTime = d.getTime();
+ //System.out.println("current time: "+currentTime);
+ // System.out.println("difference: "+(currentTime-lastSentTime));
+ if (currentTime >= lastSentTime + Constants.RETRANSMISSION_INTERVAL) {
+ msg = tempMsg;
+ msg.setLastSentTime(currentTime);
+ break whileLoop;
+ }
+ }
+
+ return msg;
+ }
+
+ /**
+ * Gives all the deliverable messages of this sequence. Resturns a vector.
+ */
+ /*
+ * public Vector getNextMessagesToSend(){
+ *
+ * boolean done = false; Vector messages = new Vector();
+ *
+ * while(!done){ Long nextKey = new Long(lastProcessed+1); Object obj =
+ * hash.get(nextKey); if(obj!=null){ messages.add(obj);
+ * incrementProcessedCount(); }else{ done=true; //To exit the loop. } }
+ * refreshHasProcessableMessages();
+ *
+ * return messages; }
+ */
+
+ /*
+ * private void incrementProcessedCount(){ lastProcessed++; }
+ */
+
+ /*
+ * private void refreshHasProcessableMessages(){ Long nextKey = new
+ * Long(lastProcessed+1); hasMessagesToSend = hash.containsKey(nextKey); }
+ */
+
+ public boolean hasMessage(Long key) {
+ Object obj = hash.get(key);
+
+ return (!(obj == null));
+ }
+
+ public void clearSequence(boolean yes) {
+ if (!yes)
+ return;
+
+ hash.clear();
+ //lastProcessed = 0;
+ //hasMessagesToSend = false;
+ nextAutoNumber = 1;
+ outSeqApproved = false;
+ outSequenceId = null;
+ sequenceId = null;
+ }
+
+ public Set getAllKeys() {
+ return hash.keySet();
+ }
+
+ public String getMessageId(Long key) {
+ RMMessageContext msg = (RMMessageContext) hash.get(key);
+ if (msg == null)
+ return null;
+
+ return msg.getMessageID();
+
}
-
+
//Deleting returns the deleted message.
- public RMMessageContext deleteMessage(Long msgId){
- RMMessageContext msg = (RMMessageContext) hash.get(msgId);
-
- if(msg==null)
- return null;
-
- hash.remove(msgId);
- return msg;
+ public RMMessageContext deleteMessage(Long msgId) {
+ RMMessageContext msg = (RMMessageContext) hash.get(msgId);
+
+ if (msg == null)
+ return null;
+
+ hash.remove(msgId);
+ return msg;
}
-}
+}
\ No newline at end of file
1.2 +146 -150 ws-fx/sandesha/src/org/apache/sandesha/server/queue/SequenceHash.java
Index: SequenceHash.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/SequenceHash.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SequenceHash.java 7 Sep 2004 10:56:24 -0000 1.1
+++ SequenceHash.java 13 Sep 2004 11:26:51 -0000 1.2
@@ -18,18 +18,10 @@
package org.apache.sandesha.server.queue;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Set;
import java.util.Vector;
-import javax.xml.rpc.handler.MessageContext;
-
-import org.apache.sandesha.Constants;
import org.apache.sandesha.RMMessageContext;
-import org.apache.sandesha.RMSequence;
-
-
-
/*
* Created on Aug 4, 2004 at 5:08:29 PM
@@ -42,147 +34,151 @@
public class SequenceHash {
- private long lastProcessed;
- private boolean hasProcessableMessages;
- private String sequenceId;
- //private String outSequenceId;
- private HashMap hash;
- private boolean beingProcessedLock = false; //When true messages are currently being processed.
-
- public SequenceHash(String sequenceId){
- lastProcessed = 0;
- //cacheBottom = 1;
- hasProcessableMessages = false;
- this.sequenceId = sequenceId;
- hash = new HashMap();
- }
-
- public boolean hasProcessableMessages(){
- return hasProcessableMessages;
- }
-
-
- public String getSequenceId() {
- return sequenceId;
- }
-
- public void setSequenceId(String sequenceId) {
- this.sequenceId = sequenceId;
- }
-
-
- /**
- * adds the message to map.
- * Also adds a record to cache if needed.
- */
- public Object putNewMessage(Long key, RMMessageContext value) {
-
- Object obj = hash.put(key, value);
- //addToCacheIfNeeded(key);
- refreshHasProcessableMessages();
- return obj;
- }
-
- public boolean removeMessage(long msgId){
- //TODO: Add messageremoving code if needed.
- boolean removed = false;
-
- Long key = new Long(msgId);
- Object obj = hash.remove(key);
-
- if(obj!=null)
- removed = true;
-
+ private long lastProcessed;
+
+ private boolean hasProcessableMessages;
+
+ private String sequenceId;
+
+ //private String outSequenceId;
+ private HashMap hash;
+
+ private boolean beingProcessedLock = false; //When true messages are
+ // currently being processed.
+
+ public SequenceHash(String sequenceId) {
+ lastProcessed = 0;
+ //cacheBottom = 1;
+ hasProcessableMessages = false;
+ this.sequenceId = sequenceId;
+ hash = new HashMap();
+ }
+
+ public boolean hasProcessableMessages() {
+ return hasProcessableMessages;
+ }
+
+ public String getSequenceId() {
+ return sequenceId;
+ }
+
+ public void setSequenceId(String sequenceId) {
+ this.sequenceId = sequenceId;
+ }
+
+ /**
+ * adds the message to map. Also adds a record to cache if needed.
+ */
+ public Object putNewMessage(Long key, RMMessageContext value) {
+
+ Object obj = hash.put(key, value);
+ //addToCacheIfNeeded(key);
+ refreshHasProcessableMessages();
+ return obj;
+ }
+
+ public boolean removeMessage(long msgId) {
+ //TODO: Add messageremoving code if needed.
+ boolean removed = false;
+
+ Long key = new Long(msgId);
+ Object obj = hash.remove(key);
+
+ if (obj != null)
+ removed = true;
+
return removed;
}
-
-
- public long getNextMessageIdToProcess(){
-
- long id = lastProcessed+1;
-
- return id;
- }
-
- public RMMessageContext getNextMessageToProcess(){
- Long nextKey = new Long (lastProcessed+1);
- RMMessageContext msg = (RMMessageContext) hash.get(nextKey);
- if(msg!=null){
- incrementProcessedCount();
- refreshHasProcessableMessages();
- }else{
- setProcessLock(false); // Not a must. (det done in refreshHasProcessableMessages();)
- }
-
- return msg;
-
- }
-
- public Vector getNextMessagesToProcess(){
-
- boolean done = false;
- Vector messages = new Vector();
-
- while(!done){
- Long nextKey = new Long(lastProcessed+1);
- Object obj = hash.get(nextKey);
- if(obj!=null){
- messages.add(obj);
- incrementProcessedCount();
- }else{
- setProcessLock(false);
- done=true; //To exit the loop.
- }
- }
- refreshHasProcessableMessages();
-
- return messages;
- }
-
- private void incrementProcessedCount(){
- lastProcessed++;
- }
-
- private void refreshHasProcessableMessages(){
- Long nextKey = new Long(lastProcessed+1);
- hasProcessableMessages = hash.containsKey(nextKey);
-
- if(!hasProcessableMessages) //Cant be being procesed if no messages to process.
- setProcessLock(false);
- }
-
- public boolean hasMessage(Long msgId){
- Object obj = hash.get(msgId);
-
- return (!(obj==null));
- }
-
- public void clearSequence(boolean yes){
- if(!yes)
- return;
-
- hash.clear();
- lastProcessed = 0;
- hasProcessableMessages = false;
- }
-
- public Set getAllKeys(){
- return hash.keySet();
- }
- public void setProcessLock(boolean lock){
- beingProcessedLock = lock;
- }
-
- public boolean isSequenceLocked(){
- return beingProcessedLock;
- }
-
- public String getMessageId(Long key){
- RMMessageContext msg = (RMMessageContext) hash.get(key);
- if(msg==null)
- return null;
-
- return msg.getMessageID();
-
- }
-}
+
+ public long getNextMessageIdToProcess() {
+
+ long id = lastProcessed + 1;
+
+ return id;
+ }
+
+ public RMMessageContext getNextMessageToProcess() {
+ Long nextKey = new Long(lastProcessed + 1);
+ RMMessageContext msg = (RMMessageContext) hash.get(nextKey);
+ if (msg != null) {
+ incrementProcessedCount();
+ refreshHasProcessableMessages();
+ } else {
+ setProcessLock(false); // Not a must. (det done in
+ // refreshHasProcessableMessages();)
+ }
+
+ return msg;
+
+ }
+
+ public Vector getNextMessagesToProcess() {
+
+ boolean done = false;
+ Vector messages = new Vector();
+
+ while (!done) {
+ Long nextKey = new Long(lastProcessed + 1);
+ Object obj = hash.get(nextKey);
+ if (obj != null) {
+ messages.add(obj);
+ incrementProcessedCount();
+ } else {
+ setProcessLock(false);
+ done = true; //To exit the loop.
+ }
+ }
+ refreshHasProcessableMessages();
+
+ return messages;
+ }
+
+ private void incrementProcessedCount() {
+ lastProcessed++;
+ }
+
+ private void refreshHasProcessableMessages() {
+ Long nextKey = new Long(lastProcessed + 1);
+ hasProcessableMessages = hash.containsKey(nextKey);
+
+ if (!hasProcessableMessages) //Cant be being procesed if no messages to
+ // process.
+ setProcessLock(false);
+ }
+
+ public boolean hasMessage(Long msgId) {
+ Object obj = hash.get(msgId);
+
+ return (!(obj == null));
+ }
+
+ public void clearSequence(boolean yes) {
+ if (!yes)
+ return;
+
+ hash.clear();
+ lastProcessed = 0;
+ hasProcessableMessages = false;
+ }
+
+ public Set getAllKeys() {
+ return hash.keySet();
+ }
+
+ public void setProcessLock(boolean lock) {
+ beingProcessedLock = lock;
+ }
+
+ public boolean isSequenceLocked() {
+ return beingProcessedLock;
+ }
+
+ public String getMessageId(Long key) {
+ RMMessageContext msg = (RMMessageContext) hash.get(key);
+ if (msg == null)
+ return null;
+
+ return msg.getMessageID();
+
+ }
+}
\ No newline at end of file
1.2 +559 -561 ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java
Index: ServerQueue.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ServerQueue.java 7 Sep 2004 10:55:13 -0000 1.1
+++ ServerQueue.java 13 Sep 2004 11:26:51 -0000 1.2
@@ -16,6 +16,7 @@
*/
package org.apache.sandesha.server.queue;
+
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -23,14 +24,8 @@
import java.util.Set;
import java.util.Vector;
-import org.apache.log4j.Priority;
import org.apache.sandesha.Constants;
-import org.apache.sandesha.RMMessage;
import org.apache.sandesha.RMMessageContext;
-import org.apache.sandesha.RMSequence;
-import org.apache.sandesha.RMSequenceContext;
-import org.apache.sandesha.server.ReTransmissionProcessor;
-
/*
* Created on Aug 4, 2004 at 4:49:49 PM
@@ -43,572 +38,575 @@
public class ServerQueue {
- private static ServerQueue queue=null;
+ private static ServerQueue queue = null;
+
+ HashMap sequenceMap; //In comming messages.
+
+ HashMap responseMap; //Response messages
+
+ ArrayList responseQueue; // Acks and create seq. responses.
- HashMap sequenceMap; //In comming messages.
- HashMap responseMap; //Response messages
- ArrayList responseQueue; // Acks and create seq. responses.
-
- HashMap responseQueueBin; // Messaged processed from out queue will be moved to this.
-
- private ServerQueue(){
- sequenceMap = new HashMap();
- responseMap = new HashMap();
+ HashMap responseQueueBin; // Messaged processed from out queue will be moved
+ // to this.
+
+ private ServerQueue() {
+ sequenceMap = new HashMap();
+ responseMap = new HashMap();
responseQueue = new ArrayList();
- responseQueueBin = new HashMap();
- }
-
- public static ServerQueue getInstance() {
- if(queue==null){
- queue = new ServerQueue();
- }
- return queue;
- }
-
- //This is wrong. No sequence context object is inside queue.
- //Only sequence id and messages (in sequence hash).
- /*public void addRMSequenceContext(RMSequenceContext seq) throws QueueException{
-
- if(seq==null || seq.getSequenceID()==null || seq.getSequenceID().equals(""))
- throw new QueueException("Invalid Sequence");
-
-
- String seqId = seq.getSequenceID();
- createNewSequence(seqId);
-
- }*/
-
- //This is wrong. No sequence context object is inside queue.
- //Only sequence id and messages (in sequence hash).
- /*public RMSequenceContext getRMSequenceContext(String sequenceID){
- if(sequenceID==null)
- return null;
-
- Object obj = sequenceMap.get(sequenceID);
- if(obj!=null && (obj instanceof RMSequenceContext))
- return (RMSequenceContext) sequenceMap.get(sequenceID);
- else
- return null;
- }*/
-
-
- /**
- * This will not replace messages automatically.
- */
- public boolean addMessageToSequence(String seqId,Long messageNo,RMMessageContext msgCon) throws QueueException{
- boolean successful = false;
-
- if(seqId==null || msgCon==null)
- throw new QueueException("Error in adding message");
-
-
-
- if(isSequenceExists(seqId)){
- SequenceHash seqHash = (SequenceHash) sequenceMap.get(seqId);
-
- synchronized (seqHash) {
-
- if(seqHash==null)
- throw new QueueException("Inconsistent queue");
-
- if(seqHash.hasMessage(messageNo))
+ responseQueueBin = new HashMap();
+ }
+
+ public static ServerQueue getInstance() {
+ if (queue == null) {
+ queue = new ServerQueue();
+ }
+ return queue;
+ }
+
+ //This is wrong. No sequence context object is inside queue.
+ //Only sequence id and messages (in sequence hash).
+ /*
+ * public void addRMSequenceContext(RMSequenceContext seq) throws
+ * QueueException{
+ *
+ * if(seq==null || seq.getSequenceID()==null ||
+ * seq.getSequenceID().equals("")) throw new QueueException("Invalid
+ * Sequence");
+ *
+ *
+ * String seqId = seq.getSequenceID(); createNewSequence(seqId);
+ * }
+ */
+
+ //This is wrong. No sequence context object is inside queue.
+ //Only sequence id and messages (in sequence hash).
+ /*
+ * public RMSequenceContext getRMSequenceContext(String sequenceID){
+ * if(sequenceID==null) return null;
+ *
+ * Object obj = sequenceMap.get(sequenceID); if(obj!=null && (obj instanceof
+ * RMSequenceContext)) return (RMSequenceContext)
+ * sequenceMap.get(sequenceID); else return null; }
+ */
+
+ /**
+ * This will not replace messages automatically.
+ */
+ public boolean addMessageToSequence(String seqId, Long messageNo,
+ RMMessageContext msgCon) throws QueueException {
+ boolean successful = false;
+
+ if (seqId == null || msgCon == null)
+ throw new QueueException("Error in adding message");
+
+ if (isSequenceExists(seqId)) {
+ SequenceHash seqHash = (SequenceHash) sequenceMap.get(seqId);
+
+ synchronized (seqHash) {
+
+ if (seqHash == null)
+ throw new QueueException("Inconsistent queue");
+
+ if (seqHash.hasMessage(messageNo))
throw new QueueException("Message already exists");
- //Messages will not be replaced automatically.
-
- seqHash.putNewMessage(messageNo,msgCon);
- }
+ //Messages will not be replaced automatically.
+
+ seqHash.putNewMessage(messageNo, msgCon);
+ }
}
-
+
return successful;
- }
+ }
/**
- *
+ *
*/
- public boolean addMessageToResponseSequence(String seqId,RMMessageContext msgCon) throws QueueException{
- boolean successful = false;
-
- if(seqId==null || msgCon==null)
- throw new QueueException("Error in adding message");
-
- if(isResponseSequenceExists(seqId)){
- ResponseSequenceHash resSeqHash = (ResponseSequenceHash) responseMap.get(seqId);
-
- synchronized (resSeqHash) {
-
- if(resSeqHash==null)
- throw new QueueException("Inconsistent queue");
-
- resSeqHash.putNewMessage(msgCon);
- }
- }
-
- return successful;
- }
-
- public boolean messagePresentInSequence(String sequenceId,Long messageNo) throws QueueException{
-
- SequenceHash seqHash = (SequenceHash) sequenceMap.get(sequenceId);
-
- if(seqHash==null)
- throw new QueueException("Sequence not present");
-
- synchronized (seqHash){
- return seqHash.hasMessage(messageNo);
- }
- }
-
- public boolean isSequenceExists(String seqId){
-
- synchronized (sequenceMap){
-
- return sequenceMap.containsKey(seqId);
- }
- }
-
- public boolean isResponseSequenceExists(String resSeqId){
-
- synchronized (responseMap){
-
- return responseMap.containsKey(resSeqId);
- }
- }
-
- public String nextSequenceIdToProcess(){
-
- synchronized (sequenceMap){
-
- int count = sequenceMap.size();
- Iterator it = sequenceMap.keySet().iterator();
- SequenceHash sh = null;
- String seqId = null;
-
- whileLoop:
- while(it.hasNext()){
- String tempSeqId = (String) it.next();
- sh = (SequenceHash) sequenceMap.get(tempSeqId);
- if(sh.hasProcessableMessages()){
- seqId = tempSeqId;
- break whileLoop;
- }
- }
-
- return seqId;
- }
- }
-
- public RMMessageContext nextMessageToProcess(String sequenceId) throws QueueException{
-
- if(sequenceId==null)
- return null;
-
- SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
-
- synchronized (sh) {
+ public boolean addMessageToResponseSequence(String seqId,
+ RMMessageContext msgCon) throws QueueException {
+ boolean successful = false;
+
+ if (seqId == null || msgCon == null)
+ throw new QueueException("Error in adding message");
+
+ if (isResponseSequenceExists(seqId)) {
+ ResponseSequenceHash resSeqHash = (ResponseSequenceHash) responseMap
+ .get(seqId);
+
+ synchronized (resSeqHash) {
+
+ if (resSeqHash == null)
+ throw new QueueException("Inconsistent queue");
+
+ resSeqHash.putNewMessage(msgCon);
+ }
+ }
+
+ return successful;
+ }
+
+ public boolean messagePresentInSequence(String sequenceId, Long messageNo)
+ throws QueueException {
+
+ SequenceHash seqHash = (SequenceHash) sequenceMap.get(sequenceId);
- if(sh==null)
+ if (seqHash == null)
+ throw new QueueException("Sequence not present");
+
+ synchronized (seqHash) {
+ return seqHash.hasMessage(messageNo);
+ }
+ }
+
+ public boolean isSequenceExists(String seqId) {
+
+ synchronized (sequenceMap) {
+
+ return sequenceMap.containsKey(seqId);
+ }
+ }
+
+ public boolean isResponseSequenceExists(String resSeqId) {
+
+ synchronized (responseMap) {
+
+ return responseMap.containsKey(resSeqId);
+ }
+ }
+
+ public String nextSequenceIdToProcess() {
+
+ synchronized (sequenceMap) {
+
+ int count = sequenceMap.size();
+ Iterator it = sequenceMap.keySet().iterator();
+ SequenceHash sh = null;
+ String seqId = null;
+
+ whileLoop: while (it.hasNext()) {
+ String tempSeqId = (String) it.next();
+ sh = (SequenceHash) sequenceMap.get(tempSeqId);
+ if (sh.hasProcessableMessages()) {
+ seqId = tempSeqId;
+ break whileLoop;
+ }
+ }
+
+ return seqId;
+ }
+ }
+
+ public RMMessageContext nextMessageToProcess(String sequenceId)
+ throws QueueException {
+
+ if (sequenceId == null)
+ return null;
+
+ SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
+
+ synchronized (sh) {
+
+ if (sh == null)
throw new QueueException("Sequence id doen not exist");
-
- if(!sh.hasProcessableMessages())
- return null;
-
- RMMessageContext msgCon = sh.getNextMessageToProcess();
- return msgCon;
-
- }
- }
-
- public RMMessageContext nextResponseMessageToSend() throws QueueException {
-
- RMMessageContext msg=null;
-
- synchronized (responseMap) {
-
- Iterator it = responseMap.keySet().iterator();
-
- whileLoop:
- while(it.hasNext()){
- RMMessageContext tempMsg;
- String tempKey = (String) it.next();
-
- ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(tempKey);
- if(rsh.isOutSeqApproved()){
- tempMsg = rsh.getNextMessageToSend();
- if(tempMsg!=null){
- msg = tempMsg;
- break whileLoop;
- }
- }
- }
- }
- return msg;
- }
-
- public void createNewSequence(String sequenceId) throws QueueException {
- if(sequenceId==null)
- throw new QueueException("Sequence Id is null");
-
- synchronized (sequenceMap){
-
- SequenceHash sh = new SequenceHash(sequenceId);
- sequenceMap.put(sequenceId,sh);
- }
- }
-
- public void createNewResponseSequence(String sequenceId) throws QueueException {
- if(sequenceId==null)
- throw new QueueException("Sequence Id is null");
-
- synchronized (responseMap){
-
- ResponseSequenceHash rsh = new ResponseSequenceHash(sequenceId);
- responseMap.put(sequenceId,rsh);
- }
- }
- /**
- * Adds a new message to the responses queue.
- *
- */
- public void addPriorityMessage(RMMessageContext msg) throws QueueException {
-
- synchronized (responseQueue){
-
- if(msg==null)
- throw new QueueException("Message is null");
-
- responseQueue.add(msg);
- }
- }
-
-
- public RMMessageContext nextPriorityMessageToSend() throws QueueException{
-
- synchronized (responseQueue){
-
- if(responseQueue.size()<=0)
- return null;
-
- //RMMessageContext msg = (RMMessageContext) responseQueue.get(0);
- RMMessageContext msg = null;
- int size = responseQueue.size();
-
- synchronized (responseQueue){
-
-
- forLoop: //Label
- for(int i=0;i<size;i++){
- RMMessageContext tempMsg = (RMMessageContext) responseQueue.get(i);
- if(tempMsg!=null){
-
- switch (tempMsg.getMessageType()){
- //Create seq messages will not be removed.
- case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
- long lastSentTime = tempMsg.getLastSentTime();
- Date d = new Date();
- long currentTime = d.getTime();
- if(currentTime >= lastSentTime+ Constants.RETRANSMISSION_INTERVAL){
- tempMsg.setLastSentTime(currentTime);
- msg = tempMsg;
- break forLoop;
- }
- break;
-
- //Other msgs will be removed.
- //These include CreareSeqResponses and Acknowledgements.
- default :
- responseQueue.remove(i);
- responseQueueBin.put(tempMsg.getMessageID(),tempMsg);
- msg = tempMsg;
- break forLoop;
- }
-
- }
- }
- }
-
- return msg;
-
- }
- }
-
- /*public RMMessageContext getNextToProcessIfHasNew(String sequenceId){
- SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
- if(sh==null)
- return null;
-
- synchronized (sh) {
- if(!sh.hasNewMessages())
- return null;
-
- Long key = sh.
- }
- }*/
-
- public Vector nextAllMessagesToProcess(String sequenceId) throws QueueException{
- SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
-
- synchronized (sh){
- Vector v = sh.getNextMessagesToProcess();
- return v;
- }
- }
-
-
- //Folowing func. may cause errors.
- /*public Vector nextAllResponseMessagesToSend(String sequenceId) throws QueueException{
- ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(sequenceId);
- Vector v = new Vector();
- synchronized (rsh){
- RMMessageContext msg = nextAllResponseMessagesToSend()
-
- while(msg!=null){
- v.add(msg);
- msg = rsh.getNextMessageToSend();
- }
- return v;
- }
- }*/
-
-
- public Vector nextAllSeqIdsToProcess(){
- Vector ids = new Vector();
-
- synchronized (sequenceMap){
- Iterator it = sequenceMap.keySet().iterator();
-
- while(it.hasNext()){
- Object tempKey = it.next();
- SequenceHash sh = (SequenceHash) sequenceMap.get(tempKey);
- if(sh.hasProcessableMessages() && !sh.isSequenceLocked())
- ids.add(sh.getSequenceId());
- }
- return ids;
- }
- }
-
- /*public Vector nextAllResponseSeqIdsToSend(){
- Vector ids = new Vector();
-
- synchronized (responseMap){
- Iterator it = responseMap.keySet().iterator();
-
- while(it.hasNext()){
- Object tempKey = it.next();
- ResponseSequenceHash sh = (ResponseSequenceHash) responseMap.get(tempKey);
- if(sh.hasProcessableMessages())
- ids.add(sh.getSequenceId());
- }
- }
- return ids;
- }*/
-
- public void clear(boolean yes){
- if(!yes)
- return;
-
- sequenceMap.clear();
- responseQueue.clear();
- responseMap.clear();
+
+ if (!sh.hasProcessableMessages())
+ return null;
+
+ RMMessageContext msgCon = sh.getNextMessageToProcess();
+ return msgCon;
+
+ }
+ }
+
+ public RMMessageContext nextResponseMessageToSend() throws QueueException {
+
+ RMMessageContext msg = null;
+
+ synchronized (responseMap) {
+
+ Iterator it = responseMap.keySet().iterator();
+
+ whileLoop: while (it.hasNext()) {
+ RMMessageContext tempMsg;
+ String tempKey = (String) it.next();
+
+ ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap
+ .get(tempKey);
+ if (rsh.isOutSeqApproved()) {
+ tempMsg = rsh.getNextMessageToSend();
+ if (tempMsg != null) {
+ msg = tempMsg;
+ break whileLoop;
+ }
+ }
+ }
+ }
+ return msg;
+ }
+
+ public void createNewSequence(String sequenceId) throws QueueException {
+ if (sequenceId == null)
+ throw new QueueException("Sequence Id is null");
+
+ synchronized (sequenceMap) {
+
+ SequenceHash sh = new SequenceHash(sequenceId);
+ sequenceMap.put(sequenceId, sh);
+ }
+ }
+
+ public void createNewResponseSequence(String sequenceId)
+ throws QueueException {
+ if (sequenceId == null)
+ throw new QueueException("Sequence Id is null");
+
+ synchronized (responseMap) {
+
+ ResponseSequenceHash rsh = new ResponseSequenceHash(sequenceId);
+ responseMap.put(sequenceId, rsh);
+ }
+ }
+
+ /**
+ * Adds a new message to the responses queue.
+ *
+ */
+ public void addPriorityMessage(RMMessageContext msg) throws QueueException {
+
+ synchronized (responseQueue) {
+
+ if (msg == null)
+ throw new QueueException("Message is null");
+
+ responseQueue.add(msg);
+ }
+ }
+
+ public RMMessageContext nextPriorityMessageToSend() throws QueueException {
+
+ synchronized (responseQueue) {
+
+ if (responseQueue.size() <= 0)
+ return null;
+
+ //RMMessageContext msg = (RMMessageContext) responseQueue.get(0);
+ RMMessageContext msg = null;
+ int size = responseQueue.size();
+
+ synchronized (responseQueue) {
+
+ forLoop: //Label
+ for (int i = 0; i < size; i++) {
+ RMMessageContext tempMsg = (RMMessageContext) responseQueue
+ .get(i);
+ if (tempMsg != null) {
+
+ switch (tempMsg.getMessageType()) {
+ //Create seq messages will not be removed.
+ case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
+ long lastSentTime = tempMsg.getLastSentTime();
+ Date d = new Date();
+ long currentTime = d.getTime();
+ if (currentTime >= lastSentTime
+ + Constants.RETRANSMISSION_INTERVAL) {
+ tempMsg.setLastSentTime(currentTime);
+ msg = tempMsg;
+ break forLoop;
+ }
+ break;
+
+ //Other msgs will be removed.
+ //These include CreareSeqResponses and
+ // Acknowledgements.
+ default:
+ responseQueue.remove(i);
+ responseQueueBin.put(tempMsg.getMessageID(),
+ tempMsg);
+ msg = tempMsg;
+ break forLoop;
+ }
+
+ }
+ }
+ }
+
+ return msg;
+
+ }
+ }
+
+ /*
+ * public RMMessageContext getNextToProcessIfHasNew(String sequenceId){
+ * SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
+ * if(sh==null) return null;
+ *
+ * synchronized (sh) { if(!sh.hasNewMessages()) return null;
+ *
+ * Long key = sh. } }
+ */
+
+ public Vector nextAllMessagesToProcess(String sequenceId)
+ throws QueueException {
+ SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
+
+ synchronized (sh) {
+ Vector v = sh.getNextMessagesToProcess();
+ return v;
+ }
+ }
+
+ //Folowing func. may cause errors.
+ /*
+ * public Vector nextAllResponseMessagesToSend(String sequenceId) throws
+ * QueueException{ ResponseSequenceHash rsh = (ResponseSequenceHash)
+ * responseMap.get(sequenceId); Vector v = new Vector(); synchronized (rsh){
+ * RMMessageContext msg = nextAllResponseMessagesToSend()
+ *
+ * while(msg!=null){ v.add(msg); msg = rsh.getNextMessageToSend(); } return
+ * v; } }
+ */
+
+ public Vector nextAllSeqIdsToProcess() {
+ Vector ids = new Vector();
+
+ synchronized (sequenceMap) {
+ Iterator it = sequenceMap.keySet().iterator();
+
+ while (it.hasNext()) {
+ Object tempKey = it.next();
+ SequenceHash sh = (SequenceHash) sequenceMap.get(tempKey);
+ if (sh.hasProcessableMessages() && !sh.isSequenceLocked())
+ ids.add(sh.getSequenceId());
+ }
+ return ids;
+ }
+ }
+
+ /*
+ * public Vector nextAllResponseSeqIdsToSend(){ Vector ids = new Vector();
+ *
+ * synchronized (responseMap){ Iterator it =
+ * responseMap.keySet().iterator();
+ *
+ * while(it.hasNext()){ Object tempKey = it.next(); ResponseSequenceHash sh =
+ * (ResponseSequenceHash) responseMap.get(tempKey);
+ * if(sh.hasProcessableMessages()) ids.add(sh.getSequenceId()); } } return
+ * ids; }
+ */
+
+ public void clear(boolean yes) {
+ if (!yes)
+ return;
+
+ sequenceMap.clear();
+ responseQueue.clear();
+ responseMap.clear();
responseQueueBin.clear();
- }
-
- public void removeAllMsgsFromSeqence(String seqId,boolean yes){
- if(!yes)
- return;
-
- SequenceHash sh = (SequenceHash) sequenceMap.get(seqId);
- sh.clearSequence(yes);
- }
-
- public void removeAllMsgsFromResponseSeqence(String seqId,boolean yes){
- if(!yes)
- return;
-
- ResponseSequenceHash sh = (ResponseSequenceHash) responseMap.get(seqId);
- sh.clearSequence(yes);
- }
-
- public void removeSequence(String sequenceId,boolean yes){
- if(!yes)
- return;
-
- sequenceMap.remove(sequenceId);
- }
-
- public void removeResponseSequence(String sequenceId,boolean yes){
- if(!yes)
- return;
-
- synchronized (responseMap) {
- responseMap.remove(sequenceId);
- }
- }
-
- public void setSequenceLock(String sequenceId,boolean lock){
- SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
- sh.setProcessLock(lock);
- }
-
- public Set getAllReceivedMsgNumsOfSeq(String sequenceId){
- Vector v = new Vector();
- SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
- return sh.getAllKeys();
- }
-
- public Set getAllReceivedMsgNumsOfResponseSeq(String sequenceId){
- Vector v = new Vector();
- ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(sequenceId);
- synchronized (rsh) {
- return rsh.getAllKeys();
- }
- }
-
- public boolean isMessageExists(String sequenceId,Long messageNo){
- SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
- //sh can be null if there are no messages at the initial point.
- if(sh!=null)
- return sh.hasMessage(messageNo);
- else
- return false;
- }
-
- public void setOutSequence(String seqId,String outSeqId){
- ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(seqId);
-
- if(rsh==null){
- System.out.println("ERROR: RESPONSE SEQ IS NULL");
- return;
- }
-
- synchronized (rsh) {
- rsh.setOutSequenceId(outSeqId);
- }
- }
-
- public void setOutSequenceApproved(String seqId,boolean approved){
- ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(seqId);
-
- if(rsh==null){
- System.out.println("ERROR: RESPONSE SEQ IS NULL");
- return;
- }
- synchronized (rsh) {
- rsh.setOutSeqApproved(approved);
- }
- }
-
- public String getSequenceOfOutSequence(String outSequence){
- if(outSequence==null)
- return null;
-
- Iterator it = responseMap.keySet().iterator();
- synchronized (responseMap){
- while(it.hasNext()){
- String tempSeqId = (String) it.next();
- ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(tempSeqId);
- String tempOutSequence = rsh.getOutSequenceId();
-
- if(outSequence.equals(tempOutSequence))
- return tempSeqId;
-
- }
- }
- return null;
- }
-
-
- public void displayResponseMap(){
- Iterator it = responseMap.keySet().iterator();
- System.out.println("***************************************");
- System.out.println(" DISPLAYING RESPONSE MAP");
- System.out.println(" -----------------------");
- while(it.hasNext()){
- String s = (String) it.next();
- System.out.println("\n Sequence id - "+s);
- ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(s);
-
+ }
+
+ public void removeAllMsgsFromSeqence(String seqId, boolean yes) {
+ if (!yes)
+ return;
+
+ SequenceHash sh = (SequenceHash) sequenceMap.get(seqId);
+ sh.clearSequence(yes);
+ }
+
+ public void removeAllMsgsFromResponseSeqence(String seqId, boolean yes) {
+ if (!yes)
+ return;
+
+ ResponseSequenceHash sh = (ResponseSequenceHash) responseMap.get(seqId);
+ sh.clearSequence(yes);
+ }
+
+ public void removeSequence(String sequenceId, boolean yes) {
+ if (!yes)
+ return;
+
+ sequenceMap.remove(sequenceId);
+ }
+
+ public void removeResponseSequence(String sequenceId, boolean yes) {
+ if (!yes)
+ return;
+
+ synchronized (responseMap) {
+ responseMap.remove(sequenceId);
+ }
+ }
+
+ public void setSequenceLock(String sequenceId, boolean lock) {
+ SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
+ sh.setProcessLock(lock);
+ }
+
+ public Set getAllReceivedMsgNumsOfSeq(String sequenceId) {
+ Vector v = new Vector();
+ SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
+ return sh.getAllKeys();
+ }
+
+ public Set getAllReceivedMsgNumsOfResponseSeq(String sequenceId) {
+ Vector v = new Vector();
+ ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap
+ .get(sequenceId);
+ synchronized (rsh) {
+ return rsh.getAllKeys();
+ }
+ }
+
+ public boolean isMessageExists(String sequenceId, Long messageNo) {
+ SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
+ //sh can be null if there are no messages at the initial point.
+ if (sh != null)
+ return sh.hasMessage(messageNo);
+ else
+ return false;
+ }
+
+ public void setOutSequence(String seqId, String outSeqId) {
+ ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap
+ .get(seqId);
+
+ if (rsh == null) {
+ System.out.println("ERROR: RESPONSE SEQ IS NULL");
+ return;
+ }
+
+ synchronized (rsh) {
+ rsh.setOutSequenceId(outSeqId);
+ }
+ }
+
+ public void setOutSequenceApproved(String seqId, boolean approved) {
+ ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap
+ .get(seqId);
+
+ if (rsh == null) {
+ System.out.println("ERROR: RESPONSE SEQ IS NULL");
+ return;
+ }
+ synchronized (rsh) {
+ rsh.setOutSeqApproved(approved);
+ }
+ }
+
+ public String getSequenceOfOutSequence(String outSequence) {
+ if (outSequence == null)
+ return null;
+
+ Iterator it = responseMap.keySet().iterator();
+ synchronized (responseMap) {
+ while (it.hasNext()) {
+ String tempSeqId = (String) it.next();
+ ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap
+ .get(tempSeqId);
+ String tempOutSequence = rsh.getOutSequenceId();
+
+ if (outSequence.equals(tempOutSequence))
+ return tempSeqId;
+
+ }
+ }
+ return null;
+ }
+
+ public void displayResponseMap() {
+ Iterator it = responseMap.keySet().iterator();
+ System.out.println("***************************************");
+ System.out.println(" DISPLAYING RESPONSE MAP");
+ System.out.println(" -----------------------");
+ while (it.hasNext()) {
+ String s = (String) it.next();
+ System.out.println("\n Sequence id - " + s);
+ ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap
+ .get(s);
+
Iterator it1 = rsh.getAllKeys().iterator();
- while(it1.hasNext()){
- Long l = (Long) it1.next();
- String msgId = rsh.getMessageId(l);
- System.out.println("* key -"+l.longValue()+"- MessageID -"+msgId+"-");
- }
- }
- System.out.println("***************************************");
- }
-
- public void displaySequenceMap(){
- Iterator it = sequenceMap.keySet().iterator();
- System.out.println("***************************************");
- System.out.println(" DISPLAYING SEQUENCE MAP");
- System.out.println(" -----------------------");
- while(it.hasNext()){
- String s = (String) it.next();
- System.out.println("\n Sequence id - "+s);
- SequenceHash sh = (SequenceHash) sequenceMap.get(s);
-
- Iterator it1 = sh.getAllKeys().iterator();
- while(it1.hasNext()){
- Long l = (Long) it1.next();
- String msgId = sh.getMessageId(l);
- System.out.println("* key -"+l.longValue()+"- MessageID -"+msgId+"-");
- }
- }
- System.out.println("***************************************");
- }
-
- public void displayPriorityQueue(){
-
- System.out.println("***************************************");
- System.out.println(" DISPLAYING PRIORITY QUEUE");
- System.out.println(" -------------------------");
-
- Iterator it = responseQueue.iterator();
- while(it.hasNext()){
- RMMessageContext msg = (RMMessageContext) it.next();
- String id = msg.getMessageID();
- int type = msg.getMessageType();
-
- System.out.println("Message "+id+" Type "+type);
- }
- System.out.println("***************************************");
- }
-
- public void moveResponseMsgToBin(String sequenceId,Long messageNo){
- ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(sequenceId);
-
- if(rsh==null){
- System.out.println("ERROR: RESPONSE SEQ IS NULL");
- return;
- }
-
- synchronized (rsh){
- //Deleting retuns the deleted message.
- RMMessageContext msg = rsh.deleteMessage(messageNo);
- String msgId = msg.getMessageID();
-
- //Add msg to bin if id isnt null.
- if(msgId!=null)
- responseQueueBin.put(msgId,msg);
-
- }
- }
-
- public void movePriorityMsgToBin(String messageId){
- synchronized (responseQueue){
-
- int size = responseQueue.size();
-
- for(int i=0;i<size;i++){
- RMMessageContext msg = (RMMessageContext) responseQueue.get(i);
-
- if(msg.getMessageID().equals(messageId)){
- responseQueue.remove(i);
- responseQueueBin.put(messageId,msg);
- return;
- }
- }
- }
- }
+ while (it1.hasNext()) {
+ Long l = (Long) it1.next();
+ String msgId = rsh.getMessageId(l);
+ System.out.println("* key -" + l.longValue() + "- MessageID -"
+ + msgId + "-");
+ }
+ }
+ System.out.println("***************************************");
+ }
+
+ public void displaySequenceMap() {
+ Iterator it = sequenceMap.keySet().iterator();
+ System.out.println("***************************************");
+ System.out.println(" DISPLAYING SEQUENCE MAP");
+ System.out.println(" -----------------------");
+ while (it.hasNext()) {
+ String s = (String) it.next();
+ System.out.println("\n Sequence id - " + s);
+ SequenceHash sh = (SequenceHash) sequenceMap.get(s);
+
+ Iterator it1 = sh.getAllKeys().iterator();
+ while (it1.hasNext()) {
+ Long l = (Long) it1.next();
+ String msgId = sh.getMessageId(l);
+ System.out.println("* key -" + l.longValue() + "- MessageID -"
+ + msgId + "-");
+ }
+ }
+ System.out.println("***************************************");
+ }
+
+ public void displayPriorityQueue() {
+
+ System.out.println("***************************************");
+ System.out.println(" DISPLAYING PRIORITY QUEUE");
+ System.out.println(" -------------------------");
+
+ Iterator it = responseQueue.iterator();
+ while (it.hasNext()) {
+ RMMessageContext msg = (RMMessageContext) it.next();
+ String id = msg.getMessageID();
+ int type = msg.getMessageType();
+
+ System.out.println("Message " + id + " Type " + type);
+ }
+ System.out.println("***************************************");
+ }
+
+ public void moveResponseMsgToBin(String sequenceId, Long messageNo) {
+ ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap
+ .get(sequenceId);
+
+ if (rsh == null) {
+ System.out.println("ERROR: RESPONSE SEQ IS NULL");
+ return;
+ }
+
+ synchronized (rsh) {
+ //Deleting retuns the deleted message.
+ RMMessageContext msg = rsh.deleteMessage(messageNo);
+ String msgId = msg.getMessageID();
+
+ //Add msg to bin if id isnt null.
+ if (msgId != null)
+ responseQueueBin.put(msgId, msg);
+
+ }
+ }
+
+ public void movePriorityMsgToBin(String messageId) {
+ synchronized (responseQueue) {
+
+ int size = responseQueue.size();
+
+ for (int i = 0; i < size; i++) {
+ RMMessageContext msg = (RMMessageContext) responseQueue.get(i);
+
+ if (msg.getMessageID().equals(messageId)) {
+ responseQueue.remove(i);
+ responseQueueBin.put(messageId, msg);
+ return;
+ }
+ }
+ }
+ }
}