You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by ng...@apache.org on 2010/08/16 13:58:25 UTC
svn commit: r985891 - in
/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src:
main/java/org/apache/vysper/xmpp/extension/xep0124/
test/java/org/apache/vysper/xmpp/extension/xep0124/
Author: ngn
Date: Mon Aug 16 11:58:25 2010
New Revision: 985891
URL: http://svn.apache.org/viewvc?rev=985891&view=rev
Log:
Implementation of client acknowledgement and handling resending responses (VYSPER-238, by Bogdan Pistol)
Modified:
mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java
mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java
mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java
Modified: mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java?rev=985891&r1=985890&r2=985891&view=diff
==============================================================================
--- mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java (original)
+++ mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java Mon Aug 16 11:58:25 2010
@@ -25,6 +25,7 @@ import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.vysper.xml.fragment.Renderer;
+import org.apache.vysper.xml.fragment.XMLElement;
import org.apache.vysper.xmpp.protocol.SessionStateHolder;
import org.apache.vysper.xmpp.server.AbstractSessionContext;
import org.apache.vysper.xmpp.server.ServerRuntimeContext;
@@ -51,6 +52,28 @@ public class BoshBackedSessionContext ex
private final int inactivity = 60;
private final int polling = 15;
+
+ private final int maximumSentResponses = 10;
+
+ /*
+ * Keeps the suspended HTTP requests (does not respond to them) until the server has an asynchronous message
+ * to send to the client. (Comet HTTP Long Polling technique - described in XEP-0124)
+ *
+ * The BOSH requests are sorted by their RIDs.
+ */
+ private final SortedMap<Long, BoshRequest> requestsWindow;
+
+ /*
+ * Keeps the asynchronous messages sent from server that cannot be delivered to the client because there are
+ * no available HTTP requests to respond to (requestsWindow is empty).
+ */
+ private final Queue<Stanza> delayedResponseQueue;
+
+ /*
+ * A cache of sent responses to the BOSH client, kept in the event of delivery failure and retransmission requests.
+ * See Broken Connections in XEP-0124.
+ */
+ private final SortedMap<Long, BoshResponse> sentResponses;
private int requests = 2;
@@ -74,18 +97,10 @@ public class BoshBackedSessionContext ex
private BoshRequest latestEmptyPollingRequest = null;
/*
- * Keeps the suspended HTTP requests (does not respond to them) until the server has an asynchronous message
- * to send to the client. (Comet HTTP Long Polling technique - described in XEP-0124)
- *
- * The BOSH requests are sorted by their RIDs.
- */
- private SortedMap<Long, BoshRequest> requestsWindow;
-
- /*
- * Keeps the asynchronous messages sent from server that cannot be delivered to the client because there are
- * no available HTTP requests to respond to (requestsWindow is empty).
+ * Indicate if the BOSH client will use acknowledgements throughout the session and that the absence of an 'ack'
+ * attribute in any request is meaningful.
*/
- private Queue<Stanza> delayedResponseQueue;
+ private boolean clientAcknowledgements;
/**
* Creates a new context for a session
@@ -101,6 +116,7 @@ public class BoshBackedSessionContext ex
this.boshHandler = boshHandler;
requestsWindow = new TreeMap<Long, BoshRequest>();
delayedResponseQueue = new LinkedList<Stanza>();
+ sentResponses = new TreeMap<Long, BoshResponse>();
}
/**
@@ -152,28 +168,58 @@ public class BoshBackedSessionContext ex
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("BOSH writing response: {}", new String(boshResponse.getContent()));
}
+
+ if (isResponseSavable(req, response)) {
+ sentResponses.put(req.getRid(), boshResponse);
+ // The number of responses to non-pause requests kept in the buffer SHOULD be either the same as the maximum
+ // number of simultaneous requests allowed or, if Acknowledgements are being used, the number of responses
+ // that have not yet been acknowledged (this part is handled in insertRequest(BoshRequest)), or
+ // the hard limit maximumSentResponses (not in the specification) that prevents excessive memory consumption.
+ if (sentResponses.size() > maximumSentResponses || (!isClientAcknowledgements() && sentResponses.size() > requests)) {
+ sentResponses.remove(sentResponses.firstKey());
+ }
+ }
+
Continuation continuation = ContinuationSupport.getContinuation(req.getHttpServletRequest());
continuation.setAttribute("response", boshResponse);
continuation.resume();
}
+ private boolean isResponseSavable(BoshRequest req, Stanza response) {
+ // responses to pause requests are not saved
+ if (req.getBody().getAttributeValue("pause") != null) {
+ return false;
+ }
+ // responses with binding error are not saved
+ for (XMLElement element : response.getInnerElements()) {
+ if ("iq".equals(element.getName()) && "error".equals(element.getAttributeValue("type"))) {
+ for (XMLElement subelement : element.getInnerElements()) {
+ if ("bind".equals(subelement.getName())) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
/**
* Writes an error to the client and closes the connection
+ * @param br
* @param condition the error condition
*/
- synchronized public void error(String condition) {
- if (!requestsWindow.isEmpty()) {
- BoshRequest req = requestsWindow.remove(requestsWindow.firstKey());
- Stanza stanza = boshHandler.getTerminateResponse();
- stanza = boshHandler.addAttribute(stanza, "condition", condition);
- BoshResponse boshResponse = getBoshResponse(stanza, null);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("BOSH writing response: {}", new String(boshResponse.getContent()));
- }
- Continuation continuation = ContinuationSupport.getContinuation(req.getHttpServletRequest());
- continuation.setAttribute("response", boshResponse);
- continuation.resume();
+ private void error(BoshRequest br, String condition) {
+ requestsWindow.put(br.getRid(), br);
+ BoshRequest req = requestsWindow.remove(requestsWindow.firstKey());
+ Stanza stanza = boshHandler.getTerminateResponse();
+ stanza = boshHandler.addAttribute(stanza, "condition", condition);
+ BoshResponse boshResponse = getBoshResponse(stanza, null);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("BOSH writing response: {}", new String(boshResponse.getContent()));
}
+ Continuation continuation = ContinuationSupport.getContinuation(req.getHttpServletRequest());
+ continuation.setAttribute("response", boshResponse);
+ continuation.resume();
close();
}
@@ -251,6 +297,22 @@ public class BoshBackedSessionContext ex
public int getHold() {
return hold;
}
+
+ /**
+ * Setter for the client acknowledgements throughout the session
+ * @param value true is enabled, false otherwise
+ */
+ public void setClientAcknowledgements(boolean value) {
+ clientAcknowledgements = value;
+ }
+
+ /**
+ * Getter for client acknowledgements
+ * @return true if enabled, false otherwise
+ */
+ public boolean isClientAcknowledgements() {
+ return clientAcknowledgements;
+ }
/**
* Setter for the highest version of the BOSH protocol that the connection manager supports, or the version
@@ -328,19 +390,31 @@ public class BoshBackedSessionContext ex
* @param req the HTTP request
*/
public void insertRequest(BoshRequest br) {
+ Continuation continuation = ContinuationSupport.getContinuation(br.getHttpServletRequest());
+ addContinuationExpirationListener(continuation);
+ continuation.setTimeout(wait * 1000);
+ continuation.setAttribute("request", br);
+ continuation.suspend();
+
if (highestReadRid != null && highestReadRid + requests < br.getRid()) {
LOGGER.warn("BOSH received RID greater than the permitted window of concurrent requests");
- error("item-not-found");
+ error(br, "item-not-found");
return;
}
- if (highestReadRid != null && br.getRid() <= highestReadRid || requestsWindow.containsKey(br.getRid())) {
- // TODO: return the old response
+ if (highestReadRid != null && br.getRid() <= highestReadRid) {
+ if (sentResponses.containsKey(br.getRid())) {
+ // Resending the old response
+ resendResponse(br);
+ } else {
+ LOGGER.warn("BOSH response not in buffer error");
+ error(br, "item-not-found");
+ }
return;
}
if (requestsWindow.size() + 1 > requests && !"terminate".equals(br.getBody().getAttributeValue("type"))
&& br.getBody().getAttributeValue("pause") == null) {
LOGGER.warn("BOSH Overactivity: Too many simultaneous requests");
- error("policy-violation");
+ error(br, "policy-violation");
return;
}
if (requestsWindow.size() + 1 == requests && !"terminate".equals(br.getBody().getAttributeValue("type"))
@@ -348,49 +422,37 @@ public class BoshBackedSessionContext ex
if (!requestsWindow.isEmpty()
&& br.getTimestamp() - requestsWindow.get(requestsWindow.lastKey()).getTimestamp() < polling * 1000) {
LOGGER.warn("BOSH Overactivity: Too frequent requests");
- error("policy-violation");
+ error(br, "policy-violation");
return;
}
}
if ((wait == 0 || hold == 0) && br.getBody().getInnerElements().isEmpty()) {
if (latestEmptyPollingRequest != null && br.getTimestamp() - latestEmptyPollingRequest.getTimestamp() < polling * 1000) {
LOGGER.warn("BOSH Overactivity for polling: Too frequent requests");
- error("policy-violation");
+ error(br, "policy-violation");
return;
}
latestEmptyPollingRequest = br;
}
- Continuation continuation = ContinuationSupport.getContinuation(br.getHttpServletRequest());
- continuation.setTimeout(wait * 1000);
- continuation.suspend();
- continuation.setAttribute("request", br);
+ if (isClientAcknowledgements()) {
+ // TODO: if received client ack is not the expected one, then send a response report to the client informing him about this,
+ // so that he could rerequest the missing responses.
+ }
+
requestsWindow.put(br.getRid(), br);
if (highestReadRid == null) {
highestReadRid = br.getRid();
}
for (;;) {
- // update the highestAcknowledgedRid to the latest value
- // it is possible to have higher RIDs than the highestAcknowledgedRid with a gap between them (e.g. lost client request)
+ // update the highestReadRid to the latest value
+ // it is possible to have higher RIDs than the highestReadRid with a gap between them (e.g. lost client request)
if (requestsWindow.containsKey(highestReadRid + 1)) {
highestReadRid++;
} else {
break;
}
}
-
- // listen the continuation to be notified when the request expires
- continuation.addContinuationListener(new ContinuationListener() {
-
- public void onTimeout(Continuation continuation) {
- requestExpired(continuation);
- }
-
- public void onComplete(Continuation continuation) {
- // ignore
- }
-
- });
-
+
// If there are delayed responses waiting to be sent to the BOSH client, then we wrap them all in
// a <body/> element and send them as a HTTP response to the current HTTP request.
Stanza delayedResponse;
@@ -409,6 +471,31 @@ public class BoshBackedSessionContext ex
write0(boshHandler.getEmptyResponse());
}
}
+
+ private void addContinuationExpirationListener(Continuation continuation) {
+ // listen the continuation to be notified when the request expires
+ continuation.addContinuationListener(new ContinuationListener() {
+
+ public void onTimeout(Continuation continuation) {
+ requestExpired(continuation);
+ }
+
+ public void onComplete(Continuation continuation) {
+ // ignore
+ }
+
+ });
+ }
+
+ private void resendResponse(BoshRequest br) {
+ BoshResponse boshResponse = sentResponses.get(br.getRid());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("BOSH writing response: {}", new String(boshResponse.getContent()));
+ }
+ Continuation continuation = ContinuationSupport.getContinuation(br.getHttpServletRequest());
+ continuation.setAttribute("response", boshResponse);
+ continuation.resume();
+ }
private BoshResponse getBoshResponse(Stanza stanza, Long ack) {
if (ack != null) {
Modified: mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java?rev=985891&r1=985890&r2=985891&view=diff
==============================================================================
--- mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java (original)
+++ mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java Mon Aug 16 11:58:25 2010
@@ -193,6 +193,9 @@ public class BoshHandler {
String lang = br.getBody().getAttributeValue(NamespaceURIs.XML, "lang");
session.setXMLLang(lang);
}
+ if ("1".equals(br.getBody().getAttributeValue("ack"))) {
+ session.setClientAcknowledgements(true);
+ }
session.insertRequest(br);
sessions.put(session.getSessionId(), session);
Modified: mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java?rev=985891&r1=985890&r2=985891&view=diff
==============================================================================
--- mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java (original)
+++ mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContextTest.java Mon Aug 16 11:58:25 2010
@@ -210,8 +210,9 @@ public class BoshBackedSessionContextTes
Stanza body1 = mocksControl.createMock(Stanza.class);
Stanza body2 = mocksControl.createMock(Stanza.class);
- expect(boshHandler.mergeResponses(EasyMock.<Stanza> anyObject(), EasyMock.<Stanza> anyObject())).andReturn(
- new StanzaBuilder("body", NamespaceURIs.XEP0124_BOSH).build());
+ Stanza body = new StanzaBuilder("body", NamespaceURIs.XEP0124_BOSH).build();
+ expect(boshHandler.mergeResponses(EasyMock.<Stanza> anyObject(), EasyMock.<Stanza> anyObject()))
+ .andReturn(body);
expectLastCall().times(2);
continuation.setAttribute(eq("response"), EasyMock.<BoshResponse> anyObject());
@@ -219,10 +220,11 @@ public class BoshBackedSessionContextTes
mocksControl.replay();
- BoshBackedSessionContext boshBackedSessionContext = new BoshBackedSessionContext(boshHandler, serverRuntimeContext);
+ BoshBackedSessionContext boshBackedSessionContext = new BoshBackedSessionContext(boshHandler,
+ serverRuntimeContext);
boshBackedSessionContext.write0(body1);
boshBackedSessionContext.write0(body2);
- boshBackedSessionContext.insertRequest(new BoshRequest(httpServletRequest, body1, 1L));
+ boshBackedSessionContext.insertRequest(new BoshRequest(httpServletRequest, body, 1L));
mocksControl.verify();
}