You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by ng...@apache.org on 2010/08/16 13:58:25 UTC

svn commit: r985891 - in /mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src: main/java/org/apache/vysper/xmpp/extension/xep0124/ test/java/org/apache/vysper/xmpp/extension/xep0124/

Author: ngn
Date: Mon Aug 16 11:58:25 2010
New Revision: 985891

URL: http://svn.apache.org/viewvc?rev=985891&view=rev
Log:
Implementation of client acknowledgement and handling resending responses (VYSPER-238, by Bogdan Pistol)

Modified:
    mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java
    mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java
    mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java

Modified: mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java?rev=985891&r1=985890&r2=985891&view=diff
==============================================================================
--- mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java (original)
+++ mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java Mon Aug 16 11:58:25 2010
@@ -25,6 +25,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 
 import org.apache.vysper.xml.fragment.Renderer;
+import org.apache.vysper.xml.fragment.XMLElement;
 import org.apache.vysper.xmpp.protocol.SessionStateHolder;
 import org.apache.vysper.xmpp.server.AbstractSessionContext;
 import org.apache.vysper.xmpp.server.ServerRuntimeContext;
@@ -51,6 +52,28 @@ public class BoshBackedSessionContext ex
     private final int inactivity = 60;
 
     private final int polling = 15;
+    
+    private final int maximumSentResponses = 10;
+    
+    /*
+     * Keeps the suspended HTTP requests (does not respond to them) until the server has an asynchronous message
+     * to send to the client. (Comet HTTP Long Polling technique - described in XEP-0124)
+     * 
+     * The BOSH requests are sorted by their RIDs.
+     */
+    private final SortedMap<Long, BoshRequest> requestsWindow;
+
+    /*
+     * Keeps the asynchronous messages sent from server that cannot be delivered to the client because there are
+     * no available HTTP requests to respond to (requestsWindow is empty).
+     */
+    private final Queue<Stanza> delayedResponseQueue;
+    
+    /*
+     * A cache of sent responses to the BOSH client, kept in the event of delivery failure and retransmission requests.
+     * See Broken Connections in XEP-0124.
+     */
+    private final SortedMap<Long, BoshResponse> sentResponses;
 
     private int requests = 2;
 
@@ -74,18 +97,10 @@ public class BoshBackedSessionContext ex
     private BoshRequest latestEmptyPollingRequest = null;
     
     /*
-     * Keeps the suspended HTTP requests (does not respond to them) until the server has an asynchronous message
-     * to send to the client. (Comet HTTP Long Polling technique - described in XEP-0124)
-     * 
-     * The BOSH requests are sorted by their RIDs.
-     */
-    private SortedMap<Long, BoshRequest> requestsWindow;
-
-    /*
-     * Keeps the asynchronous messages sent from server that cannot be delivered to the client because there are
-     * no available HTTP requests to respond to (requestsWindow is empty).
+     * Indicate if the BOSH client will use acknowledgements throughout the session and that the absence of an 'ack'
+     * attribute in any request is meaningful.
      */
-    private Queue<Stanza> delayedResponseQueue;
+    private boolean clientAcknowledgements;
 
     /**
      * Creates a new context for a session
@@ -101,6 +116,7 @@ public class BoshBackedSessionContext ex
         this.boshHandler = boshHandler;
         requestsWindow = new TreeMap<Long, BoshRequest>();
         delayedResponseQueue = new LinkedList<Stanza>();
+        sentResponses = new TreeMap<Long, BoshResponse>();
     }
     
     /**
@@ -152,28 +168,58 @@ public class BoshBackedSessionContext ex
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("BOSH writing response: {}", new String(boshResponse.getContent()));
         }
+        
+        if (isResponseSavable(req, response)) {
+            sentResponses.put(req.getRid(), boshResponse);
+            // The number of responses to non-pause requests kept in the buffer SHOULD be either the same as the maximum
+            // number of simultaneous requests allowed or, if Acknowledgements are being used, the number of responses
+            // that have not yet been acknowledged (this part is handled in insertRequest(BoshRequest)), or 
+            // the hard limit maximumSentResponses (not in the specification) that prevents excessive memory consumption.
+            if (sentResponses.size() > maximumSentResponses || (!isClientAcknowledgements() && sentResponses.size() > requests)) {
+                sentResponses.remove(sentResponses.firstKey());
+            }
+        }
+        
         Continuation continuation = ContinuationSupport.getContinuation(req.getHttpServletRequest());
         continuation.setAttribute("response", boshResponse);
         continuation.resume();
     }
     
+    private boolean isResponseSavable(BoshRequest req, Stanza response) {
+        // responses to pause requests are not saved
+        if (req.getBody().getAttributeValue("pause") != null) {
+            return false;
+        }
+        // responses with binding error are not saved
+        for (XMLElement element : response.getInnerElements()) {
+            if ("iq".equals(element.getName()) && "error".equals(element.getAttributeValue("type"))) {
+                for (XMLElement subelement : element.getInnerElements()) {
+                    if ("bind".equals(subelement.getName())) {
+                        return false;
+                    }
+                }
+            }
+        }
+        return true;
+    }
+    
     /**
      * Writes an error to the client and closes the connection
+     * @param br 
      * @param condition the error condition
      */
-    synchronized public void error(String condition) {
-        if (!requestsWindow.isEmpty()) {
-            BoshRequest req = requestsWindow.remove(requestsWindow.firstKey());
-            Stanza stanza = boshHandler.getTerminateResponse();
-            stanza = boshHandler.addAttribute(stanza, "condition", condition);
-            BoshResponse boshResponse = getBoshResponse(stanza, null);
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("BOSH writing response: {}", new String(boshResponse.getContent()));
-            }
-            Continuation continuation = ContinuationSupport.getContinuation(req.getHttpServletRequest());
-            continuation.setAttribute("response", boshResponse);
-            continuation.resume();
+    private void error(BoshRequest br, String condition) {
+        requestsWindow.put(br.getRid(), br);
+        BoshRequest req = requestsWindow.remove(requestsWindow.firstKey());
+        Stanza stanza = boshHandler.getTerminateResponse();
+        stanza = boshHandler.addAttribute(stanza, "condition", condition);
+        BoshResponse boshResponse = getBoshResponse(stanza, null);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("BOSH writing response: {}", new String(boshResponse.getContent()));
         }
+        Continuation continuation = ContinuationSupport.getContinuation(req.getHttpServletRequest());
+        continuation.setAttribute("response", boshResponse);
+        continuation.resume();
         close();
     }
 
@@ -251,6 +297,22 @@ public class BoshBackedSessionContext ex
     public int getHold() {
         return hold;
     }
+    
+    /**
+     * Setter for the client acknowledgements throughout the session
+     * @param value true is enabled, false otherwise
+     */
+    public void setClientAcknowledgements(boolean value) {
+        clientAcknowledgements = value;
+    }
+    
+    /**
+     * Getter for client acknowledgements
+     * @return true if enabled, false otherwise
+     */
+    public boolean isClientAcknowledgements() {
+        return clientAcknowledgements;
+    }
 
     /**
      * Setter for the highest version of the BOSH protocol that the connection manager supports, or the version
@@ -328,19 +390,31 @@ public class BoshBackedSessionContext ex
      * @param req the HTTP request
      */
     public void insertRequest(BoshRequest br) {
+        Continuation continuation = ContinuationSupport.getContinuation(br.getHttpServletRequest());
+        addContinuationExpirationListener(continuation);
+        continuation.setTimeout(wait * 1000);
+        continuation.setAttribute("request", br);
+        continuation.suspend();
+        
         if (highestReadRid != null && highestReadRid + requests < br.getRid()) {
             LOGGER.warn("BOSH received RID greater than the permitted window of concurrent requests");
-            error("item-not-found");
+            error(br, "item-not-found");
             return;
         }
-        if (highestReadRid != null && br.getRid() <= highestReadRid || requestsWindow.containsKey(br.getRid())) {
-            // TODO: return the old response
+        if (highestReadRid != null && br.getRid() <= highestReadRid) {
+            if (sentResponses.containsKey(br.getRid())) {
+                // Resending the old response
+                resendResponse(br);
+            } else {
+                LOGGER.warn("BOSH response not in buffer error");
+                error(br, "item-not-found");
+            }
             return;
         }
         if (requestsWindow.size() + 1 > requests && !"terminate".equals(br.getBody().getAttributeValue("type"))
                 && br.getBody().getAttributeValue("pause") == null) {
             LOGGER.warn("BOSH Overactivity: Too many simultaneous requests");
-            error("policy-violation");
+            error(br, "policy-violation");
             return;
         }
         if (requestsWindow.size() + 1 == requests && !"terminate".equals(br.getBody().getAttributeValue("type"))
@@ -348,49 +422,37 @@ public class BoshBackedSessionContext ex
             if (!requestsWindow.isEmpty()
                     && br.getTimestamp() - requestsWindow.get(requestsWindow.lastKey()).getTimestamp() < polling * 1000) {
                 LOGGER.warn("BOSH Overactivity: Too frequent requests");
-                error("policy-violation");
+                error(br, "policy-violation");
                 return;
             }
         }
         if ((wait == 0 || hold == 0) && br.getBody().getInnerElements().isEmpty()) {
             if (latestEmptyPollingRequest != null && br.getTimestamp() - latestEmptyPollingRequest.getTimestamp() < polling * 1000) {
                 LOGGER.warn("BOSH Overactivity for polling: Too frequent requests");
-                error("policy-violation");
+                error(br, "policy-violation");
                 return;
             }
             latestEmptyPollingRequest = br;
         }
-        Continuation continuation = ContinuationSupport.getContinuation(br.getHttpServletRequest());
-        continuation.setTimeout(wait * 1000);
-        continuation.suspend();
-        continuation.setAttribute("request", br);
+        if (isClientAcknowledgements()) {
+            // TODO: if received client ack is not the expected one, then send a response report to the client informing him about this,
+            // so that he could rerequest the missing responses.
+        }
+        
         requestsWindow.put(br.getRid(), br);
         if (highestReadRid == null) {
             highestReadRid = br.getRid();
         }
         for (;;) {
-            // update the highestAcknowledgedRid to the latest value
-            // it is possible to have higher RIDs than the highestAcknowledgedRid with a gap between them (e.g. lost client request)
+            // update the highestReadRid to the latest value
+            // it is possible to have higher RIDs than the highestReadRid with a gap between them (e.g. lost client request)
             if (requestsWindow.containsKey(highestReadRid + 1)) {
                 highestReadRid++;
             } else {
                 break;
             }
         }
-
-        // listen the continuation to be notified when the request expires
-        continuation.addContinuationListener(new ContinuationListener() {
-
-            public void onTimeout(Continuation continuation) {
-                requestExpired(continuation);
-            }
-
-            public void onComplete(Continuation continuation) {
-                // ignore
-            }
-
-        });
-
+        
         // If there are delayed responses waiting to be sent to the BOSH client, then we wrap them all in
         // a <body/> element and send them as a HTTP response to the current HTTP request.
         Stanza delayedResponse;
@@ -409,6 +471,31 @@ public class BoshBackedSessionContext ex
             write0(boshHandler.getEmptyResponse());
         }
     }
+    
+    private void addContinuationExpirationListener(Continuation continuation) {
+        // listen the continuation to be notified when the request expires
+        continuation.addContinuationListener(new ContinuationListener() {
+
+            public void onTimeout(Continuation continuation) {
+                requestExpired(continuation);
+            }
+
+            public void onComplete(Continuation continuation) {
+                // ignore
+            }
+
+        });
+    }
+    
+    private void resendResponse(BoshRequest br) {
+        BoshResponse boshResponse = sentResponses.get(br.getRid());
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("BOSH writing response: {}", new String(boshResponse.getContent()));
+        }
+        Continuation continuation = ContinuationSupport.getContinuation(br.getHttpServletRequest());
+        continuation.setAttribute("response", boshResponse);
+        continuation.resume();
+    }
 
     private BoshResponse getBoshResponse(Stanza stanza, Long ack) {
         if (ack != null) {

Modified: mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java?rev=985891&r1=985890&r2=985891&view=diff
==============================================================================
--- mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java (original)
+++ mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java Mon Aug 16 11:58:25 2010
@@ -193,6 +193,9 @@ public class BoshHandler {
             String lang = br.getBody().getAttributeValue(NamespaceURIs.XML, "lang");
             session.setXMLLang(lang);
         }
+        if ("1".equals(br.getBody().getAttributeValue("ack"))) {
+            session.setClientAcknowledgements(true);
+        }
         session.insertRequest(br);
         sessions.put(session.getSessionId(), session);
 

Modified: mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java?rev=985891&r1=985890&r2=985891&view=diff
==============================================================================
--- mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java (original)
+++ mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java Mon Aug 16 11:58:25 2010
@@ -210,8 +210,9 @@ public class BoshBackedSessionContextTes
 
         Stanza body1 = mocksControl.createMock(Stanza.class);
         Stanza body2 = mocksControl.createMock(Stanza.class);
-        expect(boshHandler.mergeResponses(EasyMock.<Stanza> anyObject(), EasyMock.<Stanza> anyObject())).andReturn(
-                new StanzaBuilder("body", NamespaceURIs.XEP0124_BOSH).build());
+        Stanza body = new StanzaBuilder("body", NamespaceURIs.XEP0124_BOSH).build();
+        expect(boshHandler.mergeResponses(EasyMock.<Stanza> anyObject(), EasyMock.<Stanza> anyObject()))
+                .andReturn(body);
         expectLastCall().times(2);
 
         continuation.setAttribute(eq("response"), EasyMock.<BoshResponse> anyObject());
@@ -219,10 +220,11 @@ public class BoshBackedSessionContextTes
 
         mocksControl.replay();
 
-        BoshBackedSessionContext boshBackedSessionContext = new BoshBackedSessionContext(boshHandler, serverRuntimeContext);
+        BoshBackedSessionContext boshBackedSessionContext = new BoshBackedSessionContext(boshHandler,
+                serverRuntimeContext);
         boshBackedSessionContext.write0(body1);
         boshBackedSessionContext.write0(body2);
-        boshBackedSessionContext.insertRequest(new BoshRequest(httpServletRequest, body1, 1L));
+        boshBackedSessionContext.insertRequest(new BoshRequest(httpServletRequest, body, 1L));
         mocksControl.verify();
     }