You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ja...@apache.org on 2004/10/20 12:32:18 UTC
cvs commit: ws-fx/sandesha/src/org/apache/sandesha/server/queue SequenceHash.java ServerQueue.java
jaliya 2004/10/20 03:32:18
Modified: sandesha/src/org/apache/sandesha/server/queue
SequenceHash.java ServerQueue.java
Log:
Fixed the bugs and formatted the code
Revision Changes Path
1.3 +4 -3 ws-fx/sandesha/src/org/apache/sandesha/server/queue/SequenceHash.java
Index: SequenceHash.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/SequenceHash.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SequenceHash.java 13 Sep 2004 11:26:51 -0000 1.2
+++ SequenceHash.java 20 Oct 2004 10:32:18 -0000 1.3
@@ -44,7 +44,8 @@
private HashMap hash;
private boolean beingProcessedLock = false; //When true messages are
- // currently being processed.
+
+ // currently being processed.
public SequenceHash(String sequenceId) {
lastProcessed = 0;
@@ -105,7 +106,7 @@
refreshHasProcessableMessages();
} else {
setProcessLock(false); // Not a must. (det done in
- // refreshHasProcessableMessages();)
+ // refreshHasProcessableMessages();)
}
return msg;
@@ -142,7 +143,7 @@
hasProcessableMessages = hash.containsKey(nextKey);
if (!hasProcessableMessages) //Cant be being procesed if no messages to
- // process.
+ // process.
setProcessLock(false);
}
1.5 +65 -86 ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java
Index: ServerQueue.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- ServerQueue.java 9 Oct 2004 08:50:23 -0000 1.4
+++ ServerQueue.java 20 Oct 2004 10:32:18 -0000 1.5
@@ -47,7 +47,8 @@
ArrayList priorityQueue; // Acks and create seq. responses.
HashMap queueBin; // Messaged processed from out queue will be moved
- // to this.
+
+ // to this.
private ServerQueue() {
incomingMap = new HashMap();
@@ -63,32 +64,6 @@
return queue;
}
- //This is wrong. No sequence context object is inside queue.
- //Only sequence id and messages (in sequence hash).
- /*
- * public void addRMSequenceContext(RMSequenceContext seq) throws
- * QueueException{
- *
- * if(seq==null || seq.getSequenceID()==null ||
- * seq.getSequenceID().equals("")) throw new QueueException("Invalid
- * Sequence");
- *
- *
- * String seqId = seq.getSequenceID(); createNewSequence(seqId);
- * }
- */
-
- //This is wrong. No sequence context object is inside queue.
- //Only sequence id and messages (in sequence hash).
- /*
- * public RMSequenceContext getRMSequenceContext(String sequenceID){
- * if(sequenceID==null) return null;
- *
- * Object obj = incomingMap.get(sequenceID); if(obj!=null && (obj instanceof
- * RMSequenceContext)) return (RMSequenceContext)
- * incomingMap.get(sequenceID); else return null; }
- */
-
/**
* This will not replace messages automatically.
*/
@@ -136,7 +111,6 @@
if (resSeqHash == null)
throw new QueueException("Inconsistent queue");
-
resSeqHash.putNewMessage(msgCon);
}
}
@@ -144,8 +118,8 @@
return successful;
}
- public boolean messagePresentInIncomingSequence(String sequenceId, Long messageNo)
- throws QueueException {
+ public boolean messagePresentInIncomingSequence(String sequenceId,
+ Long messageNo) throws QueueException {
SequenceHash seqHash = (SequenceHash) incomingMap.get(sequenceId);
@@ -160,7 +134,7 @@
public boolean isIncomingSequenceExists(String seqId) {
synchronized (incomingMap) {
-
+
return incomingMap.containsKey(seqId);
}
}
@@ -228,7 +202,6 @@
whileLoop: while (it.hasNext()) {
RMMessageContext tempMsg;
String tempKey = (String) it.next();
-
ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
.get(tempKey);
if (rsh.isOutSeqApproved()) {
@@ -245,7 +218,8 @@
return msg;
}
- public void createNewIncomingSequence(String sequenceId) throws QueueException {
+ public void createNewIncomingSequence(String sequenceId)
+ throws QueueException {
if (sequenceId == null)
throw new QueueException("Sequence Id is null");
@@ -322,8 +296,7 @@
// Acknowledgements.
default:
priorityQueue.remove(i);
- queueBin.put(tempMsg.getMessageID(),
- tempMsg);
+ queueBin.put(tempMsg.getMessageID(), tempMsg);
msg = tempMsg;
break forLoop;
}
@@ -472,7 +445,6 @@
.get(seqId);
if (rsh == null) {
- System.out.println("ERROR: RESPONSE SEQ IS NULL");
return;
}
@@ -486,7 +458,6 @@
.get(seqId);
if (rsh == null) {
- System.out.println("ERROR: RESPONSE SEQ IS NULL");
return;
}
synchronized (rsh) {
@@ -495,20 +466,23 @@
}
public String getSequenceOfOutSequence(String outSequence) {
- if (outSequence == null){
- return null;
+
+ if (outSequence == null) {
+ return null;
}
+
+ //Client will always handle a single seq
+ //if(outSequence==Constants.CLIENT_DEFAULD_SEQUENCE_ID)
+ // return outSequence;
+
Iterator it = outgoingMap.keySet().iterator();
synchronized (outgoingMap) {
while (it.hasNext()) {
-
-
+
String tempSeqId = (String) it.next();
-
ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
.get(tempSeqId);
String tempOutSequence = rsh.getOutSequenceId();
-
if (outSequence.equals(tempOutSequence))
return tempSeqId;
@@ -519,9 +493,9 @@
public void displayOutgoingMap() {
Iterator it = outgoingMap.keySet().iterator();
- System.out.println("***************************************");
- System.out.println(" DISPLAYING RESPONSE MAP");
- System.out.println(" -----------------------");
+ System.out.println("------------------------------------");
+ System.out.println(" DISPLAYING RESPONSE MAP");
+ System.out.println("------------------------------------");
while (it.hasNext()) {
String s = (String) it.next();
System.out.println("\n Sequence id - " + s);
@@ -536,14 +510,14 @@
+ msgId + "-");
}
}
- System.out.println("***************************************");
+ System.out.println("\n");
}
public void displayIncomingMap() {
Iterator it = incomingMap.keySet().iterator();
- System.out.println("***************************************");
- System.out.println(" DISPLAYING SEQUENCE MAP");
- System.out.println(" -----------------------");
+ System.out.println("------------------------------------");
+ System.out.println(" DISPLAYING SEQUENCE MAP");
+ System.out.println("------------------------------------");
while (it.hasNext()) {
String s = (String) it.next();
System.out.println("\n Sequence id - " + s);
@@ -557,14 +531,14 @@
+ msgId + "-");
}
}
- System.out.println("***************************************");
+ System.out.println("\n");
}
public void displayPriorityQueue() {
- System.out.println("***************************************");
- System.out.println(" DISPLAYING PRIORITY QUEUE");
- System.out.println(" -------------------------");
+ System.out.println("------------------------------------");
+ System.out.println(" DISPLAYING PRIORITY QUEUE");
+ System.out.println("------------------------------------");
Iterator it = priorityQueue.iterator();
while (it.hasNext()) {
@@ -574,41 +548,45 @@
System.out.println("Message " + id + " Type " + type);
}
- System.out.println("***************************************");
+ System.out.println("\n");
}
public void moveOutgoingMsgToBin(String sequenceId, Long messageNo) {
- String sequence = getSequenceOfOutSequence(sequenceId);
+ String sequence = getSequenceOfOutSequence(sequenceId);
ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
.get(sequence);
-
-
+
if (rsh == null) {
- System.out.println("ERROR: RESPONSE SEQ IS NULL");
+ System.out.println("ERROR: Response sequence is NULL " + sequence);
return;
}
synchronized (rsh) {
//Deleting retuns the deleted message.
RMMessageContext msg = rsh.deleteMessage(messageNo);
- String msgId = msg.getMessageID();
+ //If we jave already deleted then no message to return.
+ if (msg != null) {
- //Add msg to bin if id isnt null.
- if (msgId != null)
- queueBin.put(msgId, msg);
+ String msgId = msg.getMessageID();
+ System.out
+ .println("INFO: Moving out going messages to bin : msgId "
+ + msgId);
+ //Add msg to bin if id isnt null.
+ if (msgId != null)
+ queueBin.put(msgId, msg);
+ }
}
}
public void movePriorityMsgToBin(String messageId) {
synchronized (priorityQueue) {
-
int size = priorityQueue.size();
-
for (int i = 0; i < size; i++) {
RMMessageContext msg = (RMMessageContext) priorityQueue.get(i);
if (msg.getMessageID().equals(messageId)) {
+
priorityQueue.remove(i);
queueBin.put(messageId, msg);
return;
@@ -616,27 +594,28 @@
}
}
}
-
- public long getNextOutgoingMessageNumber (String seq){
- ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap.get(seq);
- if(rsh==null) //saquence not created yet.
- return 1;
-
- synchronized (rsh){
-
-
- Iterator keys = rsh.getAllKeys().iterator();
-
- long l = 0;
-
- while(keys.hasNext()){
- long temp = ((Long) keys.next()).longValue();
- if(temp>l)
- l = temp;
- }
-
- return (l);
-
+
+ public long getNextOutgoingMessageNumber(String seq) {
+ ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap.get(seq);
+ if (rsh == null) { //saquence not created yet.
+ return 1;
+ }
+
+ synchronized (rsh) {
+ Iterator keys = rsh.getAllKeys().iterator();
+
+ long msgNo = 0;
+
+ while (keys.hasNext()) {
+ long temp = ((Long) keys.next()).longValue();
+ if (temp > msgNo)
+ msgNo = temp;
+
+ }
+ msgNo++;
+
+ return (msgNo);
+
}
}
}