You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by be...@apache.org on 2011/01/18 16:26:12 UTC

svn commit: r1060416 - in /mina/vysper/trunk/server/core/src: main/java/org/apache/vysper/storage/inmemory/ main/java/org/apache/vysper/xmpp/delivery/inbound/ main/java/org/apache/vysper/xmpp/modules/core/im/handler/ main/java/org/apache/vysper/xmpp/mo...

Author: berndf
Date: Tue Jan 18 15:26:11 2011
New Revision: 1060416

URL: http://svn.apache.org/viewvc?rev=1060416&view=rev
Log:
VYSPER-180 - Add hook for catching stanzas for offline-storage. contributed by Thomas Kratz. Thanks for contributing.

Added:
    mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/
    mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/AbstractOfflineStorageProvider.java
    mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/MemoryOfflineStorageProvider.java
    mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/OfflineStorageProvider.java
Modified:
    mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/storage/inmemory/MemoryStorageProviderRegistry.java
    mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInternalInboundStanzaRelay.java
    mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceAvailabilityHandler.java
    mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/XMPPServer.java
    mina/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInteralInboundStanzaRelayTestCase.java

Modified: mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/storage/inmemory/MemoryStorageProviderRegistry.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/storage/inmemory/MemoryStorageProviderRegistry.java?rev=1060416&r1=1060415&r2=1060416&view=diff
==============================================================================
--- mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/storage/inmemory/MemoryStorageProviderRegistry.java (original)
+++ mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/storage/inmemory/MemoryStorageProviderRegistry.java Tue Jan 18 15:26:11 2011
@@ -37,6 +37,7 @@ public class MemoryStorageProviderRegist
         // provider from external modules, low coupling, fail when modules are not present
         add("org.apache.vysper.xmpp.modules.extension.xep0060_pubsub.storageprovider.LeafNodeInMemoryStorageProvider");
         add("org.apache.vysper.xmpp.modules.extension.xep0060_pubsub.storageprovider.CollectionNodeInMemoryStorageProvider");
+        //add("org.apache.vysper.xmpp.modules.extension.xep0160_offline_storage.MemoryOfflineStorageProvider");
     }
 
 }
\ No newline at end of file

Modified: mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInternalInboundStanzaRelay.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInternalInboundStanzaRelay.java?rev=1060416&r1=1060415&r2=1060416&view=diff
==============================================================================
--- mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInternalInboundStanzaRelay.java (original)
+++ mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInternalInboundStanzaRelay.java Tue Jan 18 15:26:11 2011
@@ -40,6 +40,7 @@ import org.apache.vysper.xmpp.delivery.f
 import org.apache.vysper.xmpp.delivery.failure.LocalRecipientOfflineException;
 import org.apache.vysper.xmpp.delivery.failure.NoSuchLocalUserException;
 import org.apache.vysper.xmpp.delivery.failure.ServiceNotAvailableException;
+import org.apache.vysper.xmpp.modules.extension.xep0160_offline_storage.OfflineStorageProvider;
 import org.apache.vysper.xmpp.protocol.SessionStateHolder;
 import org.apache.vysper.xmpp.protocol.StanzaHandler;
 import org.apache.vysper.xmpp.protocol.StanzaProcessor;
@@ -88,14 +89,15 @@ public class DeliveringInternalInboundSt
     public DeliveringInternalInboundStanzaRelay(Entity serverEntity, ResourceRegistry resourceRegistry,
             StorageProviderRegistry storageProviderRegistry) {
         this(serverEntity, resourceRegistry, (AccountManagement) storageProviderRegistry
-                .retrieve(AccountManagement.class));
+                .retrieve(AccountManagement.class),(OfflineStanzaReceiver)storageProviderRegistry.retrieve(OfflineStorageProvider.class));
     }
 
     public DeliveringInternalInboundStanzaRelay(Entity serverEntity, ResourceRegistry resourceRegistry,
-            AccountManagement accountVerification) {
+            AccountManagement accountVerification, OfflineStanzaReceiver offlineStanzaReceiver) {
         this.serverEntity = serverEntity;
         this.resourceRegistry = resourceRegistry;
         this.accountVerification = accountVerification;
+        this.offlineStanzaReceiver =offlineStanzaReceiver;
         int coreThreadCount = 10;
         int maxThreadCount = 20;
         int threadTimeoutSeconds = 2 * 60 * 1000;
@@ -323,6 +325,10 @@ public class DeliveringInternalInboundSt
             List<SessionContext> receivingSessions = prioThreshold == null ? resourceRegistry.getSessions(receiver)
                     : resourceRegistry.getSessions(receiver, prioThreshold);
 
+            if (receivingSessions.size() == 0) {
+                return relayNotPossible();
+            }
+
             if (receivingSessions.size() > 1) {
                 logger.warn("multiplexing: {} sessions will be processing {} ", receivingSessions.size(), stanza);
             }

Modified: mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceAvailabilityHandler.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceAvailabilityHandler.java?rev=1060416&r1=1060415&r2=1060416&view=diff
==============================================================================
--- mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceAvailabilityHandler.java (original)
+++ mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceAvailabilityHandler.java Tue Jan 18 15:26:11 2011
@@ -27,6 +27,7 @@ import static org.apache.vysper.xmpp.sta
 import static org.apache.vysper.xmpp.stanza.PresenceStanzaType.isSubscriptionType;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +41,7 @@ import org.apache.vysper.xmpp.addressing
 import org.apache.vysper.xmpp.delivery.failure.DeliveryException;
 import org.apache.vysper.xmpp.delivery.failure.IgnoreFailureStrategy;
 import org.apache.vysper.xmpp.modules.core.base.handler.XMPPCoreStanzaHandler;
+import org.apache.vysper.xmpp.modules.extension.xep0160_offline_storage.OfflineStorageProvider;
 import org.apache.vysper.xmpp.modules.roster.RosterException;
 import org.apache.vysper.xmpp.modules.roster.RosterItem;
 import org.apache.vysper.xmpp.modules.roster.RosterUtils;
@@ -253,6 +255,19 @@ public class PresenceAvailabilityHandler
         // message delivery (see RFC3921bis-05#8.3.1.1)
         registry.setResourcePriority(resourceId, presenceStanza.getPrioritySafe());
 
+		// check for pending offline stored stanzas, and send them out 
+		OfflineStorageProvider offlineProvider = (OfflineStorageProvider) serverRuntimeContext
+				.getStorageProvider(OfflineStorageProvider.class);
+		if (offlineProvider == null) {
+			logger.warn("No Offline Storage Provider configured");
+		} else {
+			Collection<Stanza> offlineStanzas = offlineProvider.getStanzasForBareJID(user.getBareJID().getFullQualifiedName());
+			for (Stanza stanza : offlineStanzas) {
+				logger.debug("Sending out delayed offline stanza");
+				relayStanza(user, stanza, sessionContext);
+			}
+		}
+
         List<Entity> contacts = new ArrayList<Entity>();
 
         Map<SubscriptionType, List<RosterItem>> itemMap = RosterUtils.getRosterItemsByState(rosterManager, user);

Added: mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/AbstractOfflineStorageProvider.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/AbstractOfflineStorageProvider.java?rev=1060416&view=auto
==============================================================================
--- mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/AbstractOfflineStorageProvider.java (added)
+++ mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/AbstractOfflineStorageProvider.java Tue Jan 18 15:26:11 2011
@@ -0,0 +1,86 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.vysper.xmpp.modules.extension.xep0160_offline_storage;
+
+import org.apache.vysper.xmpp.stanza.MessageStanza;
+import org.apache.vysper.xmpp.stanza.MessageStanzaType;
+import org.apache.vysper.xmpp.stanza.PresenceStanza;
+import org.apache.vysper.xmpp.stanza.PresenceStanzaType;
+import org.apache.vysper.xmpp.stanza.Stanza;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractOfflineStorageProvider implements
+		OfflineStorageProvider {
+	
+	final Logger logger = LoggerFactory.getLogger(AbstractOfflineStorageProvider.class);
+	
+	/** checks if a stanza should be stored for offline receivers */
+	public void receive(Stanza stanza) {
+		// according to XEP-0160 only certain stanzas should be stored
+		boolean store = false;
+		logger.debug("Received Stanza for offline storage:" +stanza.getClass().getSimpleName());
+		if (stanza instanceof MessageStanza) {
+			MessageStanza messageStanza = (MessageStanza) stanza;
+			MessageStanzaType type = messageStanza.getMessageType();
+			switch (type) {
+                case NORMAL:
+                case CHAT:
+                    store = true;
+                    break;
+                case GROUPCHAT:
+                case ERROR:
+                case HEADLINE:
+                    store = false;
+                    break;
+                default:
+                    throw new RuntimeException("unknown mesage type " + type);
+			}
+		} else if (stanza instanceof PresenceStanza) {
+			PresenceStanza presenceStanza = (PresenceStanza) stanza;
+			PresenceStanzaType type = presenceStanza.getPresenceType();
+			switch (type) {
+                case SUBSCRIBE:
+                case SUBSCRIBED:
+                case UNSUBSCRIBE:
+                case UNSUBSCRIBED:
+                    store = true;
+                    break;
+                case ERROR:
+                case PROBE:
+                case UNAVAILABLE:
+                    store = false;
+                    break;
+                default:
+                    throw new RuntimeException("unknown presence type " + type);
+			}
+		}
+		if (!store) {
+			logger.debug("Stanza is not intended for offline storage");
+			return;
+		}
+		logger.debug("Stanza will be stored offline");
+		storeStanza(stanza);
+	}
+	
+	/** does the actual storage mechanism */
+	protected abstract void storeStanza(Stanza stanza);
+
+}

Added: mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/MemoryOfflineStorageProvider.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/MemoryOfflineStorageProvider.java?rev=1060416&view=auto
==============================================================================
--- mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/MemoryOfflineStorageProvider.java (added)
+++ mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/MemoryOfflineStorageProvider.java Tue Jan 18 15:26:11 2011
@@ -0,0 +1,138 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */package org.apache.vysper.xmpp.modules.extension.xep0160_offline_storage;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.vysper.xmpp.addressing.Entity;
+import org.apache.vysper.xmpp.stanza.Stanza;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryOfflineStorageProvider extends AbstractOfflineStorageProvider {
+
+	final Logger logger = LoggerFactory.getLogger(MemoryOfflineStorageProvider.class);
+
+	private long timeout;
+
+	private Map<String, List<Entry>> offlineStorageMap = new HashMap<String, List<Entry>>();
+
+	public MemoryOfflineStorageProvider() {
+        this(7 * 24 * 3600 * 1000); // default to seven days;
+    }
+    
+	public MemoryOfflineStorageProvider(long timeout) {
+		this.timeout = timeout;
+		Thread checker = new Thread(new TimeoutChecker(), "OfflineTimeoutCheckerThread");
+		checker.start();
+	}
+
+	@Override
+	protected void storeStanza(Stanza stanza) {
+		Entity to = stanza.getTo();
+		String bareJID = to.getBareJID().getFullQualifiedName();
+		synchronized (offlineStorageMap) {
+			List<Entry> entriesForJID = offlineStorageMap.get(bareJID);
+			if (entriesForJID == null) {
+				entriesForJID = new ArrayList<Entry>();
+				offlineStorageMap.put(bareJID, entriesForJID);
+			}
+			entriesForJID.add(new Entry(stanza, new Date().getTime()));
+		}
+	}
+
+	public Collection<Stanza> getStanzasForBareJID(String bareJID) {
+		synchronized (offlineStorageMap) {
+			List<Entry> entries = offlineStorageMap.remove(bareJID);
+			if (entries == null) {
+				return Collections.emptyList();
+			} else {
+				
+				List<Stanza> stanzas = new ArrayList<Stanza>();
+				for (Entry entry : entries) {
+					// TODO add timestamp to messages
+					stanzas.add(entry.getStanza());
+				}
+				return stanzas;
+			}
+		}
+	}
+
+	private class Entry {
+
+		private Stanza stanza;
+
+		public Entry(Stanza stanza, long timeStamp) {
+			super();
+			this.stanza = stanza;
+			this.timeStamp = timeStamp;
+		}
+
+		private long timeStamp;
+
+		public long getTimeStamp() {
+			return timeStamp;
+		}
+
+		public Stanza getStanza() {
+			return stanza;
+		}
+
+	}
+
+	private class TimeoutChecker implements Runnable {
+		public void run() {
+			while (true) {
+				try {
+					Thread.sleep(60 * 100 * 1000);
+				} catch (InterruptedException e) {
+					logger.warn("Interrupted", e);
+				}
+				logger.debug("Running timeout checker for offline stanzas");
+				long timestamp = new Date().getTime() - timeout;
+				Set<String> jids = offlineStorageMap.keySet();
+				for (String jid : jids) {
+					synchronized (offlineStorageMap) {
+						List<Entry> entries = offlineStorageMap.get(jid);
+						if (entries != null) {
+							for (Iterator<Entry> it = entries.iterator(); it.hasNext();) {
+								Entry entry = it.next();
+								if (entry.getTimeStamp() < timestamp) {
+									logger.debug("Removed timed out offline stanza");
+									it.remove();
+								}
+							}
+						}
+
+					}
+				}
+			}
+
+		}
+	}
+
+}

Added: mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/OfflineStorageProvider.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/OfflineStorageProvider.java?rev=1060416&view=auto
==============================================================================
--- mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/OfflineStorageProvider.java (added)
+++ mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0160_offline_storage/OfflineStorageProvider.java Tue Jan 18 15:26:11 2011
@@ -0,0 +1,33 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.vysper.xmpp.modules.extension.xep0160_offline_storage;
+
+import java.util.Collection;
+
+import org.apache.vysper.storage.StorageProvider;
+import org.apache.vysper.xmpp.delivery.OfflineStanzaReceiver;
+import org.apache.vysper.xmpp.stanza.Stanza;
+
+public interface OfflineStorageProvider extends OfflineStanzaReceiver,
+		StorageProvider {
+	
+	public Collection<Stanza> getStanzasForBareJID(String bareJID);
+
+}

Modified: mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/XMPPServer.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/XMPPServer.java?rev=1060416&r1=1060415&r2=1060416&view=diff
==============================================================================
--- mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/XMPPServer.java (original)
+++ mina/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/XMPPServer.java Tue Jan 18 15:26:11 2011
@@ -33,11 +33,13 @@ import org.apache.vysper.xmpp.authorizat
 import org.apache.vysper.xmpp.authorization.SASLMechanism;
 import org.apache.vysper.xmpp.cryptography.BogusTrustManagerFactory;
 import org.apache.vysper.xmpp.cryptography.InputStreamBasedTLSContextFactory;
+import org.apache.vysper.xmpp.delivery.OfflineStanzaReceiver;
 import org.apache.vysper.xmpp.delivery.RecordingStanzaRelay;
 import org.apache.vysper.xmpp.delivery.StanzaRelayBroker;
 import org.apache.vysper.xmpp.delivery.inbound.DeliveringInternalInboundStanzaRelay;
 import org.apache.vysper.xmpp.delivery.inbound.DeliveringExternalInboundStanzaRelay;
 import org.apache.vysper.xmpp.modules.Module;
+import org.apache.vysper.xmpp.modules.extension.xep0160_offline_storage.OfflineStorageProvider;
 import org.apache.vysper.xmpp.modules.roster.RosterModule;
 import org.apache.vysper.xmpp.modules.servicediscovery.ServiceDiscoveryModule;
 import org.apache.vysper.xmpp.protocol.HandlerDictionary;
@@ -119,8 +121,9 @@ public class XMPPServer {
 
         AccountManagement accountManagement = (AccountManagement) storageProviderRegistry
                 .retrieve(AccountManagement.class);
+        OfflineStanzaReceiver offlineReceiver = (OfflineStanzaReceiver) storageProviderRegistry.retrieve(OfflineStorageProvider.class);
         DeliveringInternalInboundStanzaRelay internalStanzaRelay = new DeliveringInternalInboundStanzaRelay(serverEntity,
-                resourceRegistry, accountManagement);
+                resourceRegistry, accountManagement,offlineReceiver);
         DeliveringExternalInboundStanzaRelay externalStanzaRelay = new DeliveringExternalInboundStanzaRelay();
 
         StanzaRelayBroker stanzaRelayBroker = new StanzaRelayBroker();

Modified: mina/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInteralInboundStanzaRelayTestCase.java
URL: http://svn.apache.org/viewvc/mina/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInteralInboundStanzaRelayTestCase.java?rev=1060416&r1=1060415&r2=1060416&view=diff
==============================================================================
--- mina/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInteralInboundStanzaRelayTestCase.java (original)
+++ mina/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInteralInboundStanzaRelayTestCase.java Tue Jan 18 15:26:11 2011
@@ -67,7 +67,7 @@ public class DeliveringInteralInboundSta
 
         accountVerification = new AccountVerificationMock();
         stanzaRelay = new DeliveringInternalInboundStanzaRelay(EntityImpl.parse("vysper.org"), resourceRegistry,
-                accountVerification);
+                accountVerification, null);
     }
 
     public void testSimpleRelay() throws EntityFormatException, XMLSemanticError, DeliveryException {