You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ja...@apache.org on 2004/09/07 12:55:13 UTC
cvs commit: ws-fx/sandesha/src/org/apache/sandesha/server/queue ServerQueue.java
jaliya 2004/09/07 03:55:13
Added: sandesha/src/org/apache/sandesha/server/queue
ServerQueue.java
Log:
Implementation of message queue in the Server side.
Revision Changes Path
1.1 ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java
Index: ServerQueue.java
===================================================================
/*
* Copyright 1999-2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.sandesha.server.queue;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import java.util.Vector;
import org.apache.log4j.Priority;
import org.apache.sandesha.Constants;
import org.apache.sandesha.RMMessage;
import org.apache.sandesha.RMMessageContext;
import org.apache.sandesha.RMSequence;
import org.apache.sandesha.RMSequenceContext;
import org.apache.sandesha.server.ReTransmissionProcessor;
/*
* Created on Aug 4, 2004 at 4:49:49 PM
*/
/**
* @author Chamikara Jayalath
* @author Jaliya Ekanayaka
*/
public class ServerQueue {
private static ServerQueue queue=null;
HashMap sequenceMap; //In comming messages.
HashMap responseMap; //Response messages
ArrayList responseQueue; // Acks and create seq. responses.
HashMap responseQueueBin; // Messaged processed from out queue will be moved to this.
private ServerQueue(){
sequenceMap = new HashMap();
responseMap = new HashMap();
responseQueue = new ArrayList();
responseQueueBin = new HashMap();
}
public static ServerQueue getInstance() {
if(queue==null){
queue = new ServerQueue();
}
return queue;
}
//This is wrong. No sequence context object is inside queue.
//Only sequence id and messages (in sequence hash).
/*public void addRMSequenceContext(RMSequenceContext seq) throws QueueException{
if(seq==null || seq.getSequenceID()==null || seq.getSequenceID().equals(""))
throw new QueueException("Invalid Sequence");
String seqId = seq.getSequenceID();
createNewSequence(seqId);
}*/
//This is wrong. No sequence context object is inside queue.
//Only sequence id and messages (in sequence hash).
/*public RMSequenceContext getRMSequenceContext(String sequenceID){
if(sequenceID==null)
return null;
Object obj = sequenceMap.get(sequenceID);
if(obj!=null && (obj instanceof RMSequenceContext))
return (RMSequenceContext) sequenceMap.get(sequenceID);
else
return null;
}*/
/**
* This will not replace messages automatically.
*/
public boolean addMessageToSequence(String seqId,Long messageNo,RMMessageContext msgCon) throws QueueException{
boolean successful = false;
if(seqId==null || msgCon==null)
throw new QueueException("Error in adding message");
if(isSequenceExists(seqId)){
SequenceHash seqHash = (SequenceHash) sequenceMap.get(seqId);
synchronized (seqHash) {
if(seqHash==null)
throw new QueueException("Inconsistent queue");
if(seqHash.hasMessage(messageNo))
throw new QueueException("Message already exists");
//Messages will not be replaced automatically.
seqHash.putNewMessage(messageNo,msgCon);
}
}
return successful;
}
/**
*
*/
public boolean addMessageToResponseSequence(String seqId,RMMessageContext msgCon) throws QueueException{
boolean successful = false;
if(seqId==null || msgCon==null)
throw new QueueException("Error in adding message");
if(isResponseSequenceExists(seqId)){
ResponseSequenceHash resSeqHash = (ResponseSequenceHash) responseMap.get(seqId);
synchronized (resSeqHash) {
if(resSeqHash==null)
throw new QueueException("Inconsistent queue");
resSeqHash.putNewMessage(msgCon);
}
}
return successful;
}
public boolean messagePresentInSequence(String sequenceId,Long messageNo) throws QueueException{
SequenceHash seqHash = (SequenceHash) sequenceMap.get(sequenceId);
if(seqHash==null)
throw new QueueException("Sequence not present");
synchronized (seqHash){
return seqHash.hasMessage(messageNo);
}
}
public boolean isSequenceExists(String seqId){
synchronized (sequenceMap){
return sequenceMap.containsKey(seqId);
}
}
public boolean isResponseSequenceExists(String resSeqId){
synchronized (responseMap){
return responseMap.containsKey(resSeqId);
}
}
public String nextSequenceIdToProcess(){
synchronized (sequenceMap){
int count = sequenceMap.size();
Iterator it = sequenceMap.keySet().iterator();
SequenceHash sh = null;
String seqId = null;
whileLoop:
while(it.hasNext()){
String tempSeqId = (String) it.next();
sh = (SequenceHash) sequenceMap.get(tempSeqId);
if(sh.hasProcessableMessages()){
seqId = tempSeqId;
break whileLoop;
}
}
return seqId;
}
}
public RMMessageContext nextMessageToProcess(String sequenceId) throws QueueException{
if(sequenceId==null)
return null;
SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
synchronized (sh) {
if(sh==null)
throw new QueueException("Sequence id doen not exist");
if(!sh.hasProcessableMessages())
return null;
RMMessageContext msgCon = sh.getNextMessageToProcess();
return msgCon;
}
}
public RMMessageContext nextResponseMessageToSend() throws QueueException {
RMMessageContext msg=null;
synchronized (responseMap) {
Iterator it = responseMap.keySet().iterator();
whileLoop:
while(it.hasNext()){
RMMessageContext tempMsg;
String tempKey = (String) it.next();
ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(tempKey);
if(rsh.isOutSeqApproved()){
tempMsg = rsh.getNextMessageToSend();
if(tempMsg!=null){
msg = tempMsg;
break whileLoop;
}
}
}
}
return msg;
}
public void createNewSequence(String sequenceId) throws QueueException {
if(sequenceId==null)
throw new QueueException("Sequence Id is null");
synchronized (sequenceMap){
SequenceHash sh = new SequenceHash(sequenceId);
sequenceMap.put(sequenceId,sh);
}
}
public void createNewResponseSequence(String sequenceId) throws QueueException {
if(sequenceId==null)
throw new QueueException("Sequence Id is null");
synchronized (responseMap){
ResponseSequenceHash rsh = new ResponseSequenceHash(sequenceId);
responseMap.put(sequenceId,rsh);
}
}
/**
* Adds a new message to the responses queue.
*
*/
public void addPriorityMessage(RMMessageContext msg) throws QueueException {
synchronized (responseQueue){
if(msg==null)
throw new QueueException("Message is null");
responseQueue.add(msg);
}
}
public RMMessageContext nextPriorityMessageToSend() throws QueueException{
synchronized (responseQueue){
if(responseQueue.size()<=0)
return null;
//RMMessageContext msg = (RMMessageContext) responseQueue.get(0);
RMMessageContext msg = null;
int size = responseQueue.size();
synchronized (responseQueue){
forLoop: //Label
for(int i=0;i<size;i++){
RMMessageContext tempMsg = (RMMessageContext) responseQueue.get(i);
if(tempMsg!=null){
switch (tempMsg.getMessageType()){
//Create seq messages will not be removed.
case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
long lastSentTime = tempMsg.getLastSentTime();
Date d = new Date();
long currentTime = d.getTime();
if(currentTime >= lastSentTime+ Constants.RETRANSMISSION_INTERVAL){
tempMsg.setLastSentTime(currentTime);
msg = tempMsg;
break forLoop;
}
break;
//Other msgs will be removed.
//These include CreareSeqResponses and Acknowledgements.
default :
responseQueue.remove(i);
responseQueueBin.put(tempMsg.getMessageID(),tempMsg);
msg = tempMsg;
break forLoop;
}
}
}
}
return msg;
}
}
/*public RMMessageContext getNextToProcessIfHasNew(String sequenceId){
SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
if(sh==null)
return null;
synchronized (sh) {
if(!sh.hasNewMessages())
return null;
Long key = sh.
}
}*/
public Vector nextAllMessagesToProcess(String sequenceId) throws QueueException{
SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
synchronized (sh){
Vector v = sh.getNextMessagesToProcess();
return v;
}
}
//Folowing func. may cause errors.
/*public Vector nextAllResponseMessagesToSend(String sequenceId) throws QueueException{
ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(sequenceId);
Vector v = new Vector();
synchronized (rsh){
RMMessageContext msg = nextAllResponseMessagesToSend()
while(msg!=null){
v.add(msg);
msg = rsh.getNextMessageToSend();
}
return v;
}
}*/
public Vector nextAllSeqIdsToProcess(){
Vector ids = new Vector();
synchronized (sequenceMap){
Iterator it = sequenceMap.keySet().iterator();
while(it.hasNext()){
Object tempKey = it.next();
SequenceHash sh = (SequenceHash) sequenceMap.get(tempKey);
if(sh.hasProcessableMessages() && !sh.isSequenceLocked())
ids.add(sh.getSequenceId());
}
return ids;
}
}
/*public Vector nextAllResponseSeqIdsToSend(){
Vector ids = new Vector();
synchronized (responseMap){
Iterator it = responseMap.keySet().iterator();
while(it.hasNext()){
Object tempKey = it.next();
ResponseSequenceHash sh = (ResponseSequenceHash) responseMap.get(tempKey);
if(sh.hasProcessableMessages())
ids.add(sh.getSequenceId());
}
}
return ids;
}*/
public void clear(boolean yes){
if(!yes)
return;
sequenceMap.clear();
responseQueue.clear();
responseMap.clear();
responseQueueBin.clear();
}
public void removeAllMsgsFromSeqence(String seqId,boolean yes){
if(!yes)
return;
SequenceHash sh = (SequenceHash) sequenceMap.get(seqId);
sh.clearSequence(yes);
}
public void removeAllMsgsFromResponseSeqence(String seqId,boolean yes){
if(!yes)
return;
ResponseSequenceHash sh = (ResponseSequenceHash) responseMap.get(seqId);
sh.clearSequence(yes);
}
public void removeSequence(String sequenceId,boolean yes){
if(!yes)
return;
sequenceMap.remove(sequenceId);
}
public void removeResponseSequence(String sequenceId,boolean yes){
if(!yes)
return;
synchronized (responseMap) {
responseMap.remove(sequenceId);
}
}
public void setSequenceLock(String sequenceId,boolean lock){
SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
sh.setProcessLock(lock);
}
public Set getAllReceivedMsgNumsOfSeq(String sequenceId){
Vector v = new Vector();
SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
return sh.getAllKeys();
}
public Set getAllReceivedMsgNumsOfResponseSeq(String sequenceId){
Vector v = new Vector();
ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(sequenceId);
synchronized (rsh) {
return rsh.getAllKeys();
}
}
public boolean isMessageExists(String sequenceId,Long messageNo){
SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
//sh can be null if there are no messages at the initial point.
if(sh!=null)
return sh.hasMessage(messageNo);
else
return false;
}
public void setOutSequence(String seqId,String outSeqId){
ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(seqId);
if(rsh==null){
System.out.println("ERROR: RESPONSE SEQ IS NULL");
return;
}
synchronized (rsh) {
rsh.setOutSequenceId(outSeqId);
}
}
public void setOutSequenceApproved(String seqId,boolean approved){
ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(seqId);
if(rsh==null){
System.out.println("ERROR: RESPONSE SEQ IS NULL");
return;
}
synchronized (rsh) {
rsh.setOutSeqApproved(approved);
}
}
public String getSequenceOfOutSequence(String outSequence){
if(outSequence==null)
return null;
Iterator it = responseMap.keySet().iterator();
synchronized (responseMap){
while(it.hasNext()){
String tempSeqId = (String) it.next();
ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(tempSeqId);
String tempOutSequence = rsh.getOutSequenceId();
if(outSequence.equals(tempOutSequence))
return tempSeqId;
}
}
return null;
}
public void displayResponseMap(){
Iterator it = responseMap.keySet().iterator();
System.out.println("***************************************");
System.out.println(" DISPLAYING RESPONSE MAP");
System.out.println(" -----------------------");
while(it.hasNext()){
String s = (String) it.next();
System.out.println("\n Sequence id - "+s);
ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(s);
Iterator it1 = rsh.getAllKeys().iterator();
while(it1.hasNext()){
Long l = (Long) it1.next();
String msgId = rsh.getMessageId(l);
System.out.println("* key -"+l.longValue()+"- MessageID -"+msgId+"-");
}
}
System.out.println("***************************************");
}
public void displaySequenceMap(){
Iterator it = sequenceMap.keySet().iterator();
System.out.println("***************************************");
System.out.println(" DISPLAYING SEQUENCE MAP");
System.out.println(" -----------------------");
while(it.hasNext()){
String s = (String) it.next();
System.out.println("\n Sequence id - "+s);
SequenceHash sh = (SequenceHash) sequenceMap.get(s);
Iterator it1 = sh.getAllKeys().iterator();
while(it1.hasNext()){
Long l = (Long) it1.next();
String msgId = sh.getMessageId(l);
System.out.println("* key -"+l.longValue()+"- MessageID -"+msgId+"-");
}
}
System.out.println("***************************************");
}
public void displayPriorityQueue(){
System.out.println("***************************************");
System.out.println(" DISPLAYING PRIORITY QUEUE");
System.out.println(" -------------------------");
Iterator it = responseQueue.iterator();
while(it.hasNext()){
RMMessageContext msg = (RMMessageContext) it.next();
String id = msg.getMessageID();
int type = msg.getMessageType();
System.out.println("Message "+id+" Type "+type);
}
System.out.println("***************************************");
}
public void moveResponseMsgToBin(String sequenceId,Long messageNo){
ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(sequenceId);
if(rsh==null){
System.out.println("ERROR: RESPONSE SEQ IS NULL");
return;
}
synchronized (rsh){
//Deleting retuns the deleted message.
RMMessageContext msg = rsh.deleteMessage(messageNo);
String msgId = msg.getMessageID();
//Add msg to bin if id isnt null.
if(msgId!=null)
responseQueueBin.put(msgId,msg);
}
}
public void movePriorityMsgToBin(String messageId){
synchronized (responseQueue){
int size = responseQueue.size();
for(int i=0;i<size;i++){
RMMessageContext msg = (RMMessageContext) responseQueue.get(i);
if(msg.getMessageID().equals(messageId)){
responseQueue.remove(i);
responseQueueBin.put(messageId,msg);
return;
}
}
}
}
}