You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mina.apache.org by Bernd Fondermann <bf...@brainlounge.de> on 2012/04/19 00:09:12 UTC

[VYSPER] Changes in BOSH [WAS: Re: svn commit: r1327524]

Hi,

I made some bigger changes to the BOSH component, esp. 
BoshBackedSessionContext.

Any comments are very welcome.
Especially, please provide feedback if anyone is still running into 
VYSPER-304 with Vysper trunk.

Thanks,

   Bernd

On 18.04.12 16:36, berndf@apache.org wrote:
> Author: berndf
> Date: Wed Apr 18 14:36:32 2012
> New Revision: 1327524
>
> URL: http://svn.apache.org/viewvc?rev=1327524&view=rev
> Log:
> VYSPER-304, VYSPER-305: BOSH: guard central context state fields against race conditions, make some methods and var names more expressive, add debug logging, use static final object for empty stanza, do not queue empty stanzas
>
> Modified:
>      mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java
>      mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java
>      mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshServlet.java
>      mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java
>      mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandlerTest.java
>
> Modified: mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java
> URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java?rev=1327524&r1=1327523&r2=1327524&view=diff
> ==============================================================================
> --- mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java (original)
> +++ mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java Wed Apr 18 14:36:32 2012
> @@ -24,6 +24,7 @@ import java.util.Queue;
>   import java.util.SortedMap;
>   import java.util.TreeMap;
>
> +import org.apache.commons.lang.StringUtils;
>   import org.apache.vysper.xml.fragment.Renderer;
>   import org.apache.vysper.xml.fragment.XMLElement;
>   import org.apache.vysper.xmpp.protocol.SessionStateHolder;
> @@ -69,19 +70,19 @@ public class BoshBackedSessionContext ex
>        *
>        * The BOSH requests are sorted by their RIDs.
>        */
> -    private final SortedMap<Long, BoshRequest>  requestsWindow;
> +    private final SortedMap<Long, BoshRequest>  requestsWindow = new TreeMap<Long, BoshRequest>();
>
>       /*
>        * Keeps the asynchronous messages sent from server that cannot be delivered to the client because there are
>        * no available HTTP requests to respond to (requestsWindow is empty).
>        */
> -    private final Queue<Stanza>  delayedResponseQueue;
> +    private final Queue<Stanza>  delayedResponseQueue = new LinkedList<Stanza>();
>
>       /*
>        * A cache of sent responses to the BOSH client, kept in the event of delivery failure and retransmission requests.
>        * See Broken Connections in XEP-0124.
>        */
> -    private final SortedMap<Long, BoshResponse>  sentResponses;
> +    private final SortedMap<Long, BoshResponse>  sentResponses = new TreeMap<Long, BoshResponse>();
>
>       private int requests = 2;
>
> @@ -99,11 +100,20 @@ public class BoshBackedSessionContext ex
>        * The highest RID that can be read and processed, this is the highest (rightmost) contiguous RID.
>        * The requests from the client can come theoretically with missing updates:
>        * rid_1, rid_2, rid_4 (missing rid_3, highestReadRid is rid_2)
> +     *
> +     * must be synchronized along with requestsWindow
>        */
>       private Long highestReadRid = null;
> -
> +
> +    /**
> +     *
> +     * must be synchronized along with requestsWindow
> +     */
>       private Long currentProcessingRequest = null;
> -
> +
> +    /**
> +     * must be synchronized along with requestsWindow
> +     */
>       private BoshRequest latestEmptyPollingRequest = null;
>
>       /*
> @@ -137,10 +147,7 @@ public class BoshBackedSessionContext ex
>           sessionStateHolder.setState(SessionState.ENCRYPTED);
>
>           this.boshHandler = boshHandler;
> -        requestsWindow = new TreeMap<Long, BoshRequest>();
> -        delayedResponseQueue = new LinkedList<Stanza>();
> -        sentResponses = new TreeMap<Long, BoshResponse>();
> -
> +
>           this.inactivityChecker = inactivityChecker;
>           updateInactivityChecker();
>       }
> @@ -149,11 +156,11 @@ public class BoshBackedSessionContext ex
>           return isWatchedByInactivityChecker;
>       }
>
> -    private void updateInactivityChecker() {
> +    private synchronized void updateInactivityChecker() {
>           Long newInactivityExpireTime = null;
>           if (requestsWindow.isEmpty()) {
>               newInactivityExpireTime = latestWriteTimestamp + currentInactivity * 1000;
> -            if (newInactivityExpireTime == lastInactivityExpireTime) {
> +            if (newInactivityExpireTime.equals(lastInactivityExpireTime)) {
>                   return;
>               }
>           } else if (!isWatchedByInactivityChecker) {
> @@ -189,7 +196,9 @@ public class BoshBackedSessionContext ex
>        * This method is synchronized on the session object to prevent concurrent writes to the same BOSH client
>        */
>       synchronized public void write(Stanza stanza) {
> -        write0(boshHandler.wrapStanza(stanza));
> +        if (stanza == null) throw new IllegalArgumentException("stanza must not be null.");
> +        LOGGER.debug("adding server stanza for writing to BOSH client");
> +        writeBOSHResponse(boshHandler.wrapStanza(stanza));
>       }
>
>       /**
> @@ -198,29 +207,43 @@ public class BoshBackedSessionContext ex
>        *<p>
>        * (package access)
>        *
> -     * @param response The BOSH response to write
> +     * @param responseStanza The BOSH response to write
>        */
> -    void write0(Stanza response) {
> +    /*package*/ void writeBOSHResponse(Stanza responseStanza) {
> +        if (responseStanza == null) throw new IllegalArgumentException();
> +        final boolean isEmtpyResponse = responseStanza == BoshHandler.EMPTY_BOSH_RESPONSE;
> +
>           BoshRequest req;
> -        if (requestsWindow.isEmpty() || requestsWindow.firstKey()>  highestReadRid) {
> -            delayedResponseQueue.offer(response);
> -            return;
> -        } else {
> +        BoshResponse boshResponse;
> +        synchronized (requestsWindow) {
> +            if (requestsWindow.isEmpty() || requestsWindow.firstKey()>  highestReadRid) {
> +                if (isEmtpyResponse) return; // do not delay empty responses
> +                final boolean accepted = delayedResponseQueue.offer(responseStanza);
> +                if (!accepted) {
> +                    LOGGER.debug("rid = {} - request not queued. BOSH delayedResponseQueue is full: {}",
> +                            requestsWindow.firstKey(), delayedResponseQueue.size());
> +                    // TODO do not silently drop this stanza
> +                }
> +                return;
> +            }
>               req = requestsWindow.remove(requestsWindow.firstKey());
> +            boshResponse = getBoshResponse(responseStanza, req.getRid().equals(highestReadRid) ? null : highestReadRid);
> +            if (LOGGER.isDebugEnabled()) {
> +                String emptyHint = isEmtpyResponse ? "empty " : StringUtils.EMPTY;
> +                LOGGER.debug("rid = " + req.getRid() + " - BOSH writing {}response: {}", emptyHint, new String(boshResponse.getContent()));
> +            }
>           }
> -        BoshResponse boshResponse = getBoshResponse(response, req.getRid().equals(highestReadRid) ? null : highestReadRid);
> -        if (LOGGER.isDebugEnabled()) {
> -            LOGGER.debug("BOSH writing response: {}", new String(boshResponse.getContent()));
> -        }
> -
> -        if (isResponseSavable(req, response)) {
> -            sentResponses.put(req.getRid(), boshResponse);
> -            // The number of responses to non-pause requests kept in the buffer SHOULD be either the same as the maximum
> -            // number of simultaneous requests allowed or, if Acknowledgements are being used, the number of responses
> -            // that have not yet been acknowledged (this part is handled in insertRequest(BoshRequest)), or
> -            // the hard limit maximumSentResponses (not in the specification) that prevents excessive memory consumption.
> -            if (sentResponses.size()>  maximumSentResponses || (!isClientAcknowledgements()&&  sentResponses.size()>  requests)) {
> -                sentResponses.remove(sentResponses.firstKey());
> +
> +        if (isResponseSavable(req, responseStanza)) {
> +            synchronized (sentResponses) {
> +                sentResponses.put(req.getRid(), boshResponse);
> +                // The number of responses to non-pause requests kept in the buffer SHOULD be either the same as the maximum
> +                // number of simultaneous requests allowed or, if Acknowledgements are being used, the number of responses
> +                // that have not yet been acknowledged (this part is handled in insertRequest(BoshRequest)), or
> +                // the hard limit maximumSentResponses (not in the specification) that prevents excessive memory consumption.
> +                if (sentResponses.size()>  maximumSentResponses || (!isClientAcknowledgements()&&  sentResponses.size()>  requests)) {
> +                    sentResponses.remove(sentResponses.firstKey());
> +                }
>               }
>           }
>
> @@ -231,7 +254,7 @@ public class BoshBackedSessionContext ex
>           updateInactivityChecker();
>       }
>
> -    private boolean isResponseSavable(BoshRequest req, Stanza response) {
> +    private static boolean isResponseSavable(BoshRequest req, Stanza response) {
>           // responses to pause requests are not saved
>           if (req.getBody().getAttributeValue("pause") != null) {
>               return false;
> @@ -255,13 +278,14 @@ public class BoshBackedSessionContext ex
>        * @param condition the error condition
>        */
>       private void error(BoshRequest br, String condition) {
> -        requestsWindow.put(br.getRid(), br);
> +        final Long rid = br.getRid();
> +        requestsWindow.put(rid, br);
>           BoshRequest req = requestsWindow.remove(requestsWindow.firstKey());
>           Stanza body = boshHandler.getTerminateResponse();
>           body = boshHandler.addAttribute(body, "condition", condition);
>           BoshResponse boshResponse = getBoshResponse(body, null);
>           if (LOGGER.isDebugEnabled()) {
> -            LOGGER.debug("BOSH writing response: {}", new String(boshResponse.getContent()));
> +            LOGGER.debug("rid = {} - BOSH writing response: {}", rid, new String(boshResponse.getContent()));
>           }
>           Continuation continuation = ContinuationSupport.getContinuation(req.getHttpServletRequest());
>           continuation.setAttribute("response", boshResponse);
> @@ -279,7 +303,7 @@ public class BoshBackedSessionContext ex
>               Stanza body = boshHandler.getTerminateResponse();
>               BoshResponse boshResponse = getBoshResponse(body, null);
>               if (LOGGER.isDebugEnabled()) {
> -                LOGGER.debug("BOSH writing response: {}", new String(boshResponse.getContent()));
> +                LOGGER.debug("rid = {} - BOSH writing response: {}", req.getRid(), new String(boshResponse.getContent()));
>               }
>               Continuation continuation = ContinuationSupport.getContinuation(req.getHttpServletRequest());
>               continuation.setAttribute("response", boshResponse);
> @@ -443,8 +467,9 @@ public class BoshBackedSessionContext ex
>               LOGGER.warn("Continuation expired without having an associated request!");
>               return;
>           }
> +        LOGGER.debug("rid = {} - BOSH request expired", req.getRid());
>           while (!requestsWindow.isEmpty()&&  requestsWindow.firstKey()<= req.getRid()) {
> -            write0(boshHandler.getEmptyResponse());
> +            writeBOSHResponse(BoshHandler.EMPTY_BOSH_RESPONSE);
>           }
>       }
>
> @@ -455,6 +480,10 @@ public class BoshBackedSessionContext ex
>        * @param req the HTTP request
>        */
>       public void insertRequest(BoshRequest br) {
> +
> +        final Long rid = br.getRid();
> +        LOGGER.debug("rid = {} - inserting new BOSH request", rid);
> +
>           // reset the inactivity
>           currentInactivity = inactivity;
>
> @@ -463,78 +492,87 @@ public class BoshBackedSessionContext ex
>           continuation.setTimeout(wait * 1000);
>           continuation.setAttribute("request", br);
>           continuation.suspend();
> -
> -        if (highestReadRid != null&&  highestReadRid + requests<  br.getRid()) {
> -            LOGGER.warn("BOSH received RID greater than the permitted window of concurrent requests");
> -            error(br, "item-not-found");
> -            return;
> -        }
> -        if (highestReadRid != null&&  br.getRid()<= highestReadRid) {
> -            if (sentResponses.containsKey(br.getRid())) {
> -                // Resending the old response
> -                resendResponse(br);
> -            } else {
> -                LOGGER.warn("BOSH response not in buffer error");
> +
> +        final int currentRequests = requests;
> +        synchronized (requestsWindow) {
> +            if (highestReadRid != null&&  highestReadRid + currentRequests<  rid) {
> +                LOGGER.warn("rid = {} - received RID>= the permitted window of concurrent requests ({})",
> +                        rid, highestReadRid);
>                   error(br, "item-not-found");
> +                return;
>               }
> -            return;
> -        }
> -        if (requestsWindow.size() + 1>  requests&&  !"terminate".equals(br.getBody().getAttributeValue("type"))
> -&&  br.getBody().getAttributeValue("pause") == null) {
> -            LOGGER.warn("BOSH Overactivity: Too many simultaneous requests");
> -            error(br, "policy-violation");
> -            return;
> -        }
> -        if (requestsWindow.size() + 1 == requests&&  !"terminate".equals(br.getBody().getAttributeValue("type"))
> -&&  br.getBody().getAttributeValue("pause") == null&&  br.getBody().getInnerElements().isEmpty()) {
> -            if (!requestsWindow.isEmpty()
> -&&  br.getTimestamp() - requestsWindow.get(requestsWindow.lastKey()).getTimestamp()<  polling * 1000) {
> -                LOGGER.warn("BOSH Overactivity: Too frequent requests");
> -                error(br, "policy-violation");
> +            if (highestReadRid != null&&  rid<= highestReadRid) {
> +                synchronized (sentResponses) {
> +                    if (sentResponses.containsKey(rid)) {
> +                        // Resending the old response
> +                        resendResponse(br);
> +                    } else {
> +                        LOGGER.warn("rid = {} - BOSH response not in buffer error");
> +                        error(br, "item-not-found");
> +                    }
> +                }
>                   return;
>               }
> -        }
> -        if ((wait == 0 || hold == 0)&&  br.getBody().getInnerElements().isEmpty()) {
> -            if (latestEmptyPollingRequest != null&&  br.getTimestamp() - latestEmptyPollingRequest.getTimestamp()<  polling * 1000) {
> -                LOGGER.warn("BOSH Overactivity for polling: Too frequent requests");
> +            if (requestsWindow.size() + 1>  currentRequests&&  !"terminate".equals(br.getBody().getAttributeValue("type"))
> +&&  br.getBody().getAttributeValue("pause") == null) {
> +                LOGGER.warn("BOSH Overactivity: Too many simultaneous requests");
>                   error(br, "policy-violation");
>                   return;
>               }
> -            latestEmptyPollingRequest = br;
> -        }
> -
> -        requestsWindow.put(br.getRid(), br);
> -        updateInactivityChecker();
> -
> -        if (highestReadRid == null) {
> -            highestReadRid = br.getRid();
> -        }
> -        for (;;) {
> -            // update the highestReadRid to the latest value
> -            // it is possible to have higher RIDs than the highestReadRid with a gap between them (e.g. lost client request)
> -            if (requestsWindow.containsKey(highestReadRid + 1)) {
> -                highestReadRid++;
> -            } else {
> -                break;
> +            if (requestsWindow.size() + 1 == currentRequests&&  !"terminate".equals(br.getBody().getAttributeValue("type"))
> +&&  br.getBody().getAttributeValue("pause") == null&&  br.getBody().getInnerElements().isEmpty()) {
> +                if (!requestsWindow.isEmpty()
> +&&  br.getTimestamp() - requestsWindow.get(requestsWindow.lastKey()).getTimestamp()<  polling * 1000) {
> +                    LOGGER.warn("BOSH Overactivity: Too frequent requests");
> +                    error(br, "policy-violation");
> +                    return;
> +                }
> +            }
> +            if ((wait == 0 || hold == 0)&&  br.getBody().getInnerElements().isEmpty()) {
> +                if (latestEmptyPollingRequest != null&&  br.getTimestamp() - latestEmptyPollingRequest.getTimestamp()<  polling * 1000) {
> +                    LOGGER.warn("BOSH Overactivity for polling: Too frequent requests");
> +                    error(br, "policy-violation");
> +                    return;
> +                }
> +                latestEmptyPollingRequest = br;
> +            }
> +
> +            requestsWindow.put(rid, br);
> +            updateInactivityChecker();
> +
> +            if (highestReadRid == null) {
> +                highestReadRid = rid;
> +            }
> +            for (;;) {
> +                // update the highestReadRid to the latest value
> +                // it is possible to have higher RIDs than the highestReadRid with a gap between them (e.g. lost client request)
> +                if (requestsWindow.containsKey(highestReadRid + 1)) {
> +                    highestReadRid++;
> +                } else {
> +                    break;
> +                }
>               }
>           }
> -
> +
> +
>           if (isClientAcknowledgements()) {
> -            if (br.getBody().getAttribute("ack") == null) {
> -                // if there is no ack attribute present then the client confirmed it received all the responses to all the previous requests
> -                // and we clear the cache
> -                sentResponses.clear();
> -            } else if (!sentResponses.isEmpty()) {
> -                // After receiving a request with an 'ack' value less than the 'rid' of the last request that it has already responded to,
> -                // the connection manager MAY inform the client of the situation. In this case it SHOULD include a 'report' attribute set
> -                // to one greater than the 'ack' attribute it received from the client, and a 'time' attribute set to the number of milliseconds
> -                // since it sent the response associated with the 'report' attribute.
> -                long ack = Long.parseLong(br.getBody().getAttributeValue("ack"));
> -                if (ack<  sentResponses.lastKey()&&  sentResponses.containsKey(ack + 1)) {
> -                    long delta = System.currentTimeMillis() - sentResponses.get(ack + 1).getTimestamp();
> -                    if (delta>= brokenConnectionReportTimeout) {
> -                        sendBrokenConnectionReport(ack + 1, delta);
> -                        return;
> +            synchronized (sentResponses) {
> +                if (br.getBody().getAttribute("ack") == null) {
> +                    // if there is no ack attribute present then the client confirmed it received all the responses to all the previous requests
> +                    // and we clear the cache
> +                    sentResponses.clear();
> +                } else if (!sentResponses.isEmpty()) {
> +                    // After receiving a request with an 'ack' value less than the 'rid' of the last request that it has already responded to,
> +                    // the connection manager MAY inform the client of the situation. In this case it SHOULD include a 'report' attribute set
> +                    // to one greater than the 'ack' attribute it received from the client, and a 'time' attribute set to the number of milliseconds
> +                    // since it sent the response associated with the 'report' attribute.
> +                    long ack = Long.parseLong(br.getBody().getAttributeValue("ack"));
> +                    if (ack<  sentResponses.lastKey()&&  sentResponses.containsKey(ack + 1)) {
> +                        long delta = System.currentTimeMillis() - sentResponses.get(ack + 1).getTimestamp();
> +                        if (delta>= brokenConnectionReportTimeout) {
> +                            sendBrokenConnectionReport(ack + 1, delta);
> +                            return;
> +                        }
>                       }
>                   }
>               }
> @@ -542,19 +580,21 @@ public class BoshBackedSessionContext ex
>
>           // we cannot pause if there are missing requests, this is tested with
>           // br.getRid().equals(requestsWindow.lastKey())&&  highestReadRid.equals(br.getRid())
> -        if (br.getBody().getAttribute("pause") != null&&  br.getRid().equals(requestsWindow.lastKey())&&  highestReadRid.equals(br.getRid())) {
> -            int pause = Integer.parseInt(br.getBody().getAttributeValue("pause"));
> -            if (pause>  maxpause) {
> -                // do not allow to pause more than maxpause
> -                pause = maxpause;
> -            }
> -            if (pause<  0) {
> -                pause = 0;
> +        synchronized (requestsWindow) {
> +            if (br.getBody().getAttribute("pause") != null&&  rid.equals(requestsWindow.lastKey())&&  highestReadRid.equals(rid)) {
> +                int pause = Integer.parseInt(br.getBody().getAttributeValue("pause"));
> +                if (pause>  maxpause) {
> +                    // do not allow to pause more than maxpause
> +                    pause = maxpause;
> +                }
> +                if (pause<  0) {
> +                    pause = 0;
> +                }
> +                respondToPause(pause);
> +                return;
>               }
> -            respondToPause(pause);
> -            return;
>           }
> -
> +
>           // If there are delayed responses waiting to be sent to the BOSH client, then we wrap them all in
>           // a<body/>  element and send them as a HTTP response to the current HTTP request.
>           Stanza delayedResponse;
> @@ -563,14 +603,14 @@ public class BoshBackedSessionContext ex
>               mergedResponse = boshHandler.mergeResponses(mergedResponse, delayedResponse);
>           }
>           if (mergedResponse != null) {
> -            write0(mergedResponse);
> +            writeBOSHResponse(mergedResponse);
>               return;
>           }
>
>           // If there are more suspended enqueued requests than it is allowed by the BOSH 'hold' parameter,
>           // than we release the oldest one by sending an empty response.
>           if (requestsWindow.size()>  hold) {
> -            write0(boshHandler.getEmptyResponse());
> +            writeBOSHResponse(BoshHandler.EMPTY_BOSH_RESPONSE);
>           }
>       }
>
> @@ -582,7 +622,7 @@ public class BoshBackedSessionContext ex
>               if (boshRequest == null) {
>                   break;
>               }
> -            write0(boshHandler.getEmptyResponse());
> +            writeBOSHResponse(BoshHandler.EMPTY_BOSH_RESPONSE);
>           }
>       }
>
> @@ -590,7 +630,7 @@ public class BoshBackedSessionContext ex
>           Stanza body = boshHandler.getTerminateResponse();
>           body = boshHandler.addAttribute(body, "report", Long.toString(report));
>           body = boshHandler.addAttribute(body, "time", Long.toString(delta));
> -        write0(body);
> +        writeBOSHResponse(body);
>       }
>
>       private void addContinuationExpirationListener(Continuation continuation) {
> @@ -609,9 +649,14 @@ public class BoshBackedSessionContext ex
>       }
>
>       private void resendResponse(BoshRequest br) {
> -        BoshResponse boshResponse = sentResponses.get(br.getRid());
> +        final Long rid = br.getRid();
> +        BoshResponse boshResponse = sentResponses.get(rid);
> +        if (boshResponse == null) {
> +            LOGGER.debug("rid = {} - BOSH response could not (no longer) be retrieved for resending.", rid);
> +            return;
> +        }
>           if (LOGGER.isDebugEnabled()) {
> -            LOGGER.debug("BOSH writing response: {}", new String(boshResponse.getContent()));
> +            LOGGER.debug("rid = {} - BOSH writing response (resending): {}", rid, new String(boshResponse.getContent()));
>           }
>           Continuation continuation = ContinuationSupport.getContinuation(br.getHttpServletRequest());
>           continuation.setAttribute("response", boshResponse);
> @@ -634,17 +679,19 @@ public class BoshBackedSessionContext ex
>        * @return the next (by RID order) body to process
>        */
>       public BoshRequest getNextRequest() {
> -        if (requestsWindow.isEmpty()) {
> -            return null;
> -        }
> -        if (currentProcessingRequest == null || currentProcessingRequest<  requestsWindow.firstKey()) {
> -            currentProcessingRequest = requestsWindow.firstKey();
> -        }
> -        if (currentProcessingRequest>  highestReadRid) {
> -            return null;
> -        } else {
> -            currentProcessingRequest++;
> -            return requestsWindow.get(currentProcessingRequest - 1);
> +        synchronized (requestsWindow) {
> +            if (requestsWindow.isEmpty()) {
> +                return null;
> +            }
> +            if (currentProcessingRequest == null || currentProcessingRequest<  requestsWindow.firstKey()) {
> +                currentProcessingRequest = requestsWindow.firstKey();
> +            }
> +            if (currentProcessingRequest>  highestReadRid) {
> +                return null;
> +            } else {
> +                currentProcessingRequest++;
> +                return requestsWindow.get(currentProcessingRequest - 1);
> +            }
>           }
>       }
>
>
> Modified: mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java
> URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java?rev=1327524&r1=1327523&r2=1327524&view=diff
> ==============================================================================
> --- mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java (original)
> +++ mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java Wed Apr 18 14:36:32 2012
> @@ -46,6 +46,13 @@ import org.slf4j.LoggerFactory;
>   public class BoshHandler {
>
>       private static final Logger LOGGER = LoggerFactory.getLogger(BoshHandler.class);
> +
> +    /**
> +     * the empty BOSH response.
> +     *<p>
> +     * Looks like<code>&lt;body xmlns='http://jabber.org/protocol/httpbind'/&gt;</code>
> +     */
> +    protected static final Stanza EMPTY_BOSH_RESPONSE = new StanzaBuilder("body", NamespaceURIs.XEP0124_BOSH).build();
>
>       private ServerRuntimeContext serverRuntimeContext;
>
> @@ -143,7 +150,7 @@ public class BoshHandler {
>           } else if (session.getState() == SessionState.AUTHENTICATED) {
>               if ("true".equals(br.getBody().getAttributeValue(NamespaceURIs.URN_XMPP_XBOSH, "restart"))) {
>                   // restart request
> -                session.write0(getRestartResponse());
> +                session.writeBOSHResponse(getRestartResponse());
>               } else {
>                   // any other request
>                   for (XMLElement element : br.getBody().getInnerElements()) {
> @@ -160,7 +167,7 @@ public class BoshHandler {
>
>       private void terminateSession(BoshBackedSessionContext session) {
>           sessions.remove(session.getSessionId());
> -        session.write0(getTerminateResponse());
> +        session.writeBOSHResponse(getTerminateResponse());
>           session.close();
>       }
>
> @@ -203,7 +210,7 @@ public class BoshHandler {
>           session.insertRequest(br);
>           sessions.put(session.getSessionId(), session);
>
> -        session.write0(getSessionCreationResponse(session));
> +        session.writeBOSHResponse(getSessionCreationResponse(session));
>       }
>
>       private Stanza getSessionCreationResponse(BoshBackedSessionContext session) {
> @@ -230,17 +237,6 @@ public class BoshHandler {
>       }
>
>       /**
> -     * Creates an empty BOSH response.
> -     *<p>
> -     * The empty BOSH response looks like<code>&lt;body xmlns='http://jabber.org/protocol/httpbind'/&gt;</code>
> -     * @return the empty BOSH response
> -     */
> -    public Stanza getEmptyResponse() {
> -        StanzaBuilder stanzaBuilder = new StanzaBuilder("body", NamespaceURIs.XEP0124_BOSH);
> -        return stanzaBuilder.build();
> -    }
> -
> -    /**
>        * Creates a BOSH response by wrapping a stanza in a&lt;body/&gt; element
>        * @param stanza the XMPP stanza to wrap
>        * @return the BOSH response
>
> Modified: mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshServlet.java
> URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshServlet.java?rev=1327524&r1=1327523&r2=1327524&view=diff
> ==============================================================================
> --- mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshServlet.java (original)
> +++ mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshServlet.java Wed Apr 18 14:36:32 2012
> @@ -52,7 +52,7 @@ public class BoshServlet extends HttpSer
>
>       private static final String INFO_GET = "This is an XMPP BOSH connection manager, only POST is allowed";
>
> -    private static final String SERVER_IDENTIFICATION = "Vysper/0.5";
> +    private static final String SERVER_IDENTIFICATION = "Vysper/0.8";
>
>       private final Logger logger = LoggerFactory.getLogger(BoshServlet.class);
>
>
> Modified: mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java
> URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java?rev=1327524&r1=1327523&r2=1327524&view=diff
> ==============================================================================
> --- mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java (original)
> +++ mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java Wed Apr 18 14:36:32 2012
> @@ -88,7 +88,7 @@ public class BoshBackedSessionContextTes
>           BoshBackedSessionContext boshBackedSessionContext = new BoshBackedSessionContext(boshHandler, serverRuntimeContext, inactivityChecker);
>           Stanza body = new StanzaBuilder("body", NamespaceURIs.XEP0124_BOSH).build();
>           boshBackedSessionContext.insertRequest(new BoshRequest(httpServletRequest, body, 1L));
> -        boshBackedSessionContext.write0(body);
> +        boshBackedSessionContext.writeBOSHResponse(body);
>           mocksControl.verify();
>
>           BoshResponse boshResponse = captured.getValue();
> @@ -116,7 +116,7 @@ public class BoshBackedSessionContextTes
>
>       @Test
>       public void testRequestExpired() {
> -        Stanza body = new StanzaBuilder("body", NamespaceURIs.XEP0124_BOSH).build();
> +        Stanza emtpyStanza = new StanzaBuilder("body", NamespaceURIs.XEP0124_BOSH).build();
>
>           // addRequest
>           HttpServletRequest httpServletRequest = mocksControl.createMock(HttpServletRequest.class);
> @@ -130,16 +130,13 @@ public class BoshBackedSessionContextTes
>           Capture<ContinuationListener>  listenerCaptured = new Capture<ContinuationListener>();
>           continuation.addContinuationListener(EasyMock.<ContinuationListener>  capture(listenerCaptured));
>
> -        BoshRequest br = new BoshRequest(httpServletRequest, body, 1L);
> +        BoshRequest br = new BoshRequest(httpServletRequest, emtpyStanza, 1L);
>
>           // requestExpired
>           expect(continuation.getAttribute("request")).andReturn(br);
>           Capture<BoshResponse>  responseCaptured = new Capture<BoshResponse>();
>           continuation.setAttribute(eq("response"), EasyMock.<BoshResponse>  capture(responseCaptured));
>
> -        expect(boshHandler.getEmptyResponse()).andReturn(body);
> -        expectLastCall().atLeastOnce();
> -
>           // write0
>           continuation.resume();
>
> @@ -150,7 +147,7 @@ public class BoshBackedSessionContextTes
>           listenerCaptured.getValue().onTimeout(continuation);
>           mocksControl.verify();
>
> -        assertEquals(new Renderer(body).getComplete(), new String(responseCaptured.getValue().getContent()));
> +        assertEquals(new Renderer(emtpyStanza).getComplete(), new String(responseCaptured.getValue().getContent()));
>           assertEquals(BoshServlet.XML_CONTENT_TYPE, responseCaptured.getValue().getContentType());
>       }
>
> @@ -191,7 +188,7 @@ public class BoshBackedSessionContextTes
>           // consecutive writes with RID 1 and 2
>           boshBackedSessionContext.insertRequest(new BoshRequest(httpServletRequest1, body, 1L));
>           boshBackedSessionContext.insertRequest(new BoshRequest(httpServletRequest2, body, 2L));
> -        boshBackedSessionContext.write0(body);
> +        boshBackedSessionContext.writeBOSHResponse(body);
>           mocksControl.verify();
>
>           assertEquals(httpServletRequest1, br1.getValue().getHttpServletRequest());
> @@ -227,8 +224,8 @@ public class BoshBackedSessionContextTes
>
>           BoshBackedSessionContext boshBackedSessionContext = new BoshBackedSessionContext(boshHandler,
>                   serverRuntimeContext, inactivityChecker);
> -        boshBackedSessionContext.write0(body1);
> -        boshBackedSessionContext.write0(body2);
> +        boshBackedSessionContext.writeBOSHResponse(body1);
> +        boshBackedSessionContext.writeBOSHResponse(body2);
>           boshBackedSessionContext.insertRequest(new BoshRequest(httpServletRequest, body, 1L));
>           mocksControl.verify();
>       }
>
> Modified: mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandlerTest.java
> URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandlerTest.java?rev=1327524&r1=1327523&r2=1327524&view=diff
> ==============================================================================
> --- mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandlerTest.java (original)
> +++ mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandlerTest.java Wed Apr 18 14:36:32 2012
> @@ -155,7 +155,7 @@ public class BoshHandlerTest {
>
>       @Test
>       public void testGetEmptyResponse() {
> -        Stanza response = boshHandler.getEmptyResponse();
> +        Stanza response = BoshHandler.EMPTY_BOSH_RESPONSE;
>           assertNotNull(response);
>           assertEquals("body", response.getName());
>           assertEquals(NamespaceURIs.XEP0124_BOSH, response.getNamespaceURI());
>
>
>