You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by bo...@apache.org on 2010/09/27 01:39:48 UTC
svn commit: r1001557 - in
/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124:
BoshBackedSessionContext.java BoshHandler.java InactivityChecker.java
Author: bogdan
Date: Sun Sep 26 23:39:48 2010
New Revision: 1001557
URL: http://svn.apache.org/viewvc?rev=1001557&view=rev
Log:
Bosh inactivity checking thread (VYSPER-243)
Added:
mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/InactivityChecker.java
Modified:
mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java
mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java
Modified: mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java?rev=1001557&r1=1001556&r2=1001557&view=diff
==============================================================================
--- mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java (original)
+++ mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java Sun Sep 26 23:39:48 2010
@@ -22,8 +22,6 @@ package org.apache.vysper.xmpp.extension
import java.util.LinkedList;
import java.util.Queue;
import java.util.SortedMap;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.TreeMap;
import org.apache.vysper.xml.fragment.Renderer;
@@ -120,7 +118,11 @@ public class BoshBackedSessionContext ex
*/
private long latestWriteTimestamp = System.currentTimeMillis();
- private Timer inactivityChecker = new Timer(true);
+ private InactivityChecker inactivityChecker;
+
+ private Long lastInactivityExpireTime;
+
+ private boolean isWatchedByInactivityChecker;
/**
* Creates a new context for a session
@@ -140,22 +142,29 @@ public class BoshBackedSessionContext ex
}
/**
- * Periodically checks if the session reached the maximum inactivity period.
+ * Configures the inactivity checker instance
*/
- public void startInactivityChecker() {
- inactivityChecker.schedule(new TimerTask() {
-
- @Override
- public void run() {
- synchronized (BoshBackedSessionContext.this) {
- if (System.currentTimeMillis() - latestWriteTimestamp >= currentInactivity * 1000 && requestsWindow.isEmpty()) {
- LOGGER.info("BOSH session reached maximum inactivity period, closing session...");
- close();
- }
- }
+ public void setInactivityChecker(InactivityChecker inactivityChecker) {
+ this.inactivityChecker = inactivityChecker;
+ updateInactivityChecker();
+ }
+
+ public boolean isWatchedByInactivityChecker() {
+ return isWatchedByInactivityChecker;
+ }
+
+ private void updateInactivityChecker() {
+ Long newInactivityExpireTime = null;
+ if (requestsWindow.isEmpty()) {
+ newInactivityExpireTime = latestWriteTimestamp + currentInactivity * 1000;
+ if (newInactivityExpireTime == lastInactivityExpireTime) {
+ return;
}
-
- }, 1000, 1000);
+ } else if (!isWatchedByInactivityChecker) {
+ return;
+ }
+ isWatchedByInactivityChecker = inactivityChecker.updateExpireTime(this, lastInactivityExpireTime, newInactivityExpireTime);
+ lastInactivityExpireTime = newInactivityExpireTime;
}
/**
@@ -223,6 +232,7 @@ public class BoshBackedSessionContext ex
continuation.setAttribute("response", boshResponse);
continuation.resume();
latestWriteTimestamp = System.currentTimeMillis();
+ updateInactivityChecker();
}
private boolean isResponseSavable(BoshRequest req, Stanza response) {
@@ -283,7 +293,8 @@ public class BoshBackedSessionContext ex
serverRuntimeContext.getResourceRegistry().unbindSession(this);
sessionStateHolder.setState(SessionState.CLOSED);
- inactivityChecker.cancel();
+ inactivityChecker.updateExpireTime(this, lastInactivityExpireTime, null);
+ lastInactivityExpireTime = null;
LOGGER.info("BOSH session {} closed", getSessionId());
}
@@ -497,6 +508,8 @@ public class BoshBackedSessionContext ex
}
requestsWindow.put(br.getRid(), br);
+ updateInactivityChecker();
+
if (highestReadRid == null) {
highestReadRid = br.getRid();
}
@@ -608,6 +621,7 @@ public class BoshBackedSessionContext ex
continuation.setAttribute("response", boshResponse);
continuation.resume();
latestWriteTimestamp = System.currentTimeMillis();
+ updateInactivityChecker();
}
private BoshResponse getBoshResponse(Stanza stanza, Long ack) {
Modified: mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java?rev=1001557&r1=1001556&r2=1001557&view=diff
==============================================================================
--- mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java (original)
+++ mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshHandler.java Sun Sep 26 23:39:48 2010
@@ -50,12 +50,16 @@ public class BoshHandler {
private ServerRuntimeContext serverRuntimeContext;
private Map<String, BoshBackedSessionContext> sessions;
+
+ private InactivityChecker inactivityChecker;
public BoshHandler() {
// The sessions are stored in a ConcurrentHashMap to maintain the "happens before relationship" memory consistency.
// Although the operations on specific sessions are synchronized, the session creation and retrieval need the memory
// consistency guarantee too.
sessions = new ConcurrentHashMap<String, BoshBackedSessionContext>();
+ inactivityChecker = new InactivityChecker();
+ inactivityChecker.start();
}
@@ -196,7 +200,7 @@ public class BoshHandler {
if ("1".equals(br.getBody().getAttributeValue("ack"))) {
session.setClientAcknowledgements(true);
}
- session.startInactivityChecker();
+ session.setInactivityChecker(inactivityChecker);
session.insertRequest(br);
sessions.put(session.getSessionId(), session);
Added: mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/InactivityChecker.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/InactivityChecker.java?rev=1001557&view=auto
==============================================================================
--- mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/InactivityChecker.java (added)
+++ mina/vysper/trunk/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/InactivityChecker.java Sun Sep 26 23:39:48 2010
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.vysper.xmpp.extension.xep0124;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Periodically checks the BOSH sessions to see if there are inactive sessions,
+ * in this case it will close the inactive sessions.
+ * <p>
+ * This class efficiently checks the inactive sessions.
+ * Supposing that at a specific moment there are N total sessions connected and from these sessions
+ * there are M sessions that are inactive (M is lower than or equal to N) then this class will detect the inactive sessions with
+ * O(M) time complexity.
+ * <p>
+ * <b>Note:</b> A modification of the expire (when becoming inactive) time of a sessions has approximatively
+ * O(log N) time complexity.
+ * <p>
+ * This class is thread safe.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ */
+public class InactivityChecker extends Thread {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(InactivityChecker.class);
+
+ /*
+ * The interval in milliseconds between two consecutive inactivity checks.
+ */
+ private final int CHECKING_INTERVAL = 1000;
+
+ /*
+ * Keeps the BOSH sessions sorted according to the time they expire (the key of the map).
+ *
+ * The value associated with a key in the map can be a BoshBackedSessionContext
+ * or a Set<BoshBackedSessionContext> (in case more than one session expires at the same time).
+ */
+ private final SortedMap<Long, Object> sessions;
+
+ public InactivityChecker() {
+ setName(InactivityChecker.class.getSimpleName());
+ setDaemon(true);
+ sessions = new TreeMap<Long, Object>();
+ }
+
+ /**
+ * Updates (or removes) a session expire time.
+ * <p>
+ * If it is a new session then oldExpireTime will be null and newExpireTime will be the expire time,
+ * if the session is removed from the inactivity checker then the newExpireTime will be null and the oldExpireTime
+ * will be the latest expire time the session had. If it is an update for an old expire time then oldExpireTime will be
+ * the latest expire time and newExpireTime will be the updated expire time.
+ * <p>
+ * <b>Note:</b> The session should be added to the inactivity checker only when BoshBackedSessionContext#requestsWindow.isEmpty()
+ * returns true (as stated in the specification). Also when the BoshBackedSessionContext#requestsWindow.isEmpty() returns
+ * false the session needs to be removed from the inactivity checker.
+ *
+ * @param session the {@link BoshBackedSessionContext} that the expire time is modified for
+ * @param oldExpireTime the latest expire time the session had
+ * @param newExpireTime the new updated expire time, (if this is null and the session is watched by the inactivity checker
+ * then the session will be removed from the inactivity checker)
+ * @return true if the inactivity checker is watching the session (to detect inactivity), false otherwise
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public boolean updateExpireTime(BoshBackedSessionContext session, Long oldExpireTime, Long newExpireTime) {
+ boolean ret = session.isWatchedByInactivityChecker();
+ if ((oldExpireTime == null && newExpireTime == null)
+ || (newExpireTime != null && newExpireTime.equals(oldExpireTime))) {
+ return ret;
+ }
+ synchronized (sessions) {
+ if (oldExpireTime != null) {
+ Object oldValue = sessions.get(oldExpireTime);
+ if (oldValue instanceof Set) {
+ ((Set) oldValue).remove(session);
+ if (((Set) oldValue).isEmpty()) {
+ sessions.remove(oldExpireTime);
+ }
+ } else if (oldValue != null) {
+ sessions.remove(oldExpireTime);
+ }
+ ret = false;
+ }
+ if (newExpireTime != null) {
+ Object value = sessions.get(newExpireTime);
+ if (value instanceof Set) {
+ ((Set) value).add(session);
+ } else if (value == null) {
+ sessions.put(newExpireTime, session);
+ } else {
+ Set<BoshBackedSessionContext> set = new HashSet<BoshBackedSessionContext>();
+ sessions.put(newExpireTime, set);
+ set.add((BoshBackedSessionContext) value);
+ set.add(session);
+ }
+ ret = true;
+ }
+ }
+ return ret;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ for (;;) {
+ if (Thread.interrupted()) {
+ break;
+ }
+
+ synchronized (this) {
+ try {
+ wait(CHECKING_INTERVAL);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ long time = System.currentTimeMillis();
+
+ // the inactive sessions are saved in a list to close them after the synchronized block to prevent a deadlock
+ // that could happen when trying to close the sessions inside the synchronized block
+ List<BoshBackedSessionContext> inactiveSessions = null;
+
+ synchronized (sessions) {
+ // get the oldest key
+ Long expireTime = sessions.isEmpty() ? null : sessions.firstKey();
+
+ while (expireTime != null) {
+ // as long as we find expired sessions we save them in the list and remove them from our sorted map
+ if (time >= expireTime) {
+ if (inactiveSessions == null) {
+ inactiveSessions = new ArrayList<BoshBackedSessionContext>();
+ }
+ Object value = sessions.get(expireTime);
+ if (value instanceof Set) {
+ inactiveSessions.addAll((Set<BoshBackedSessionContext>) value);
+ } else if (value != null) {
+ inactiveSessions.add((BoshBackedSessionContext) value);
+ }
+ sessions.remove(expireTime);
+ expireTime = sessions.isEmpty() ? null : sessions.firstKey();
+ } else {
+ // at the first non-expired session, we know that all the next sessions are more recent and cannot
+ // be expired if the current session is Ok, so we break the loop
+ break;
+ }
+ }
+ }
+
+ if (inactiveSessions != null) {
+ for (BoshBackedSessionContext session : inactiveSessions) {
+ LOGGER.error("BOSH session {} reached maximum inactivity period, closing session...", session);
+ session.close();
+ }
+ }
+ }
+ }
+
+}