You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@labs.apache.org by be...@apache.org on 2008/04/22 21:22:20 UTC

svn commit: r650619 - in /labs/vysper/src/main: config/ java/org/apache/vysper/xmpp/delivery/ java/org/apache/vysper/xmpp/modules/core/base/handler/ java/org/apache/vysper/xmpp/server/

Author: berndf
Date: Tue Apr 22 12:22:18 2008
New Revision: 650619

URL: http://svn.apache.org/viewvc?rev=650619&view=rev
Log:
[vysper] rework stanza relaying: make some things configurable, distinguish between internal/external delivery (ongoing) [LABS-124]

Added:
    labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/DeliveringStanzaRelay.java
      - copied, changed from r649961, labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/MasterStanzaRelay.java
Modified:
    labs/vysper/src/main/config/spring-config.xml
    labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/MasterStanzaRelay.java
    labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/RecordingStanzaRelay.java
    labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/StanzaReceiverRelay.java
    labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/StanzaRelay.java
    labs/vysper/src/main/java/org/apache/vysper/xmpp/modules/core/base/handler/MessageHandler.java
    labs/vysper/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java

Modified: labs/vysper/src/main/config/spring-config.xml
URL: http://svn.apache.org/viewvc/labs/vysper/src/main/config/spring-config.xml?rev=650619&r1=650618&r2=650619&view=diff
==============================================================================
--- labs/vysper/src/main/config/spring-config.xml (original)
+++ labs/vysper/src/main/config/spring-config.xml Tue Apr 22 12:22:18 2008
@@ -33,11 +33,18 @@
     <!--
         Vysper Server singletons
     -->
-    <bean name="resourceRegistry" class="org.apache.vysper.xmpp.resourcebinding.ResourceRegistry" />
     
-    <bean id="stanzaRelay" class="org.apache.vysper.xmpp.delivery.MasterStanzaRelay" >
+    <bean id="internalRelay" class="org.apache.vysper.xmpp.delivery.DeliveringStanzaRelay" >
         <constructor-arg ref="resourceRegistry"/>
     </bean>
+
+    <bean id="stanzaRelay" class="org.apache.vysper.xmpp.delivery.MasterStanzaRelay" >
+        <property name="internalRelay" ref="internalRelay" />
+        <property name="externalRelay"><bean class="org.apache.vysper.xmpp.delivery.RecordingStanzaRelay"/></property>
+        <property name="serverRuntimeContext" ref="server" />
+    </bean>
+    
+    <bean name="resourceRegistry" class="org.apache.vysper.xmpp.resourcebinding.ResourceRegistry" />
     
     <bean id="serverFeatures" class="org.apache.vysper.xmpp.server.ServerFeatures">
         <constructor-arg value="true" />

Copied: labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/DeliveringStanzaRelay.java (from r649961, labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/MasterStanzaRelay.java)
URL: http://svn.apache.org/viewvc/labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/DeliveringStanzaRelay.java?p2=labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/DeliveringStanzaRelay.java&p1=labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/MasterStanzaRelay.java&r1=649961&r2=650619&rev=650619&view=diff
==============================================================================
--- labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/MasterStanzaRelay.java (original)
+++ labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/DeliveringStanzaRelay.java Tue Apr 22 12:22:18 2008
@@ -25,31 +25,38 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.List;
 
 /**
  * relays stanzas to internal or external servers
  * TODO: but right now, it only delivers to internal sessions
  */
-public class MasterStanzaRelay implements StanzaRelay {
+public class DeliveringStanzaRelay implements StanzaRelay {
 
     protected ResourceRegistry resourceRegistry;
-    protected final ExecutorService executor = Executors.newSingleThreadExecutor();
+    protected ExecutorService executor; 
 
-    public MasterStanzaRelay(ResourceRegistry resourceRegistry) {
+    public DeliveringStanzaRelay(ResourceRegistry resourceRegistry) {
         this.resourceRegistry = resourceRegistry;
+        int coreThreadCount = 10;
+        int maxThreadCount = 20;
+        int threadTimeoutSeconds = 2 * 60 * 1000;
+        this.executor = new ThreadPoolExecutor(coreThreadCount, maxThreadCount, threadTimeoutSeconds, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
     }
     
     public void relay(Entity receiver, Stanza stanza) throws DeliveryException {
-        executor.submit(new Relay(receiver, stanza));
+        Future<RelayResult> resultFuture = executor.submit(new Relay(receiver, stanza));
     }
 
-    public boolean receiverExists(Entity receiver) {
-        // TODO this is a temp solution, since we only check for 'online' users yet
+    public boolean receiverIsOnline(Entity receiver) {
         return !resourceRegistry.getSessions(receiver).isEmpty();
     }
        
-    private class Relay implements Callable<Boolean> {
+    private class Relay implements Callable<RelayResult> {
         private Entity receiver;
         private Stanza stanza;
 
@@ -66,14 +73,40 @@
             return stanza;
         }
 
-        public Boolean call() throws Exception {
+        public RelayResult call() {
             List<SessionContext> receivingSessions = resourceRegistry.getSessions(receiver);
             for (SessionContext sessionContext : receivingSessions) {
-                StanzaWriter stanzaWriter = sessionContext.getResponseWriter();
+                StanzaWriter stanzaWriter = null;
+                try {
+                    stanzaWriter = sessionContext.getResponseWriter();
+                } catch (Exception e) {
+                    return new RelayResult(e);
+                }
                 stanzaWriter.write(stanza);
             }
-            return Boolean.TRUE;
+            return new RelayResult();
         }
     }
        
-}
+    private class RelayResult {
+        private Throwable processingError;
+        private boolean relayed;
+
+        public RelayResult(Throwable processingError) {
+            this.processingError = processingError;
+            this.relayed = false;
+        }
+
+        public RelayResult() {
+            this.relayed = true;
+        }
+
+        public Throwable getProcessingError() {
+            return processingError;
+        }
+
+        public boolean isRelayed() {
+            return relayed;
+        }
+    }
+}
\ No newline at end of file

Modified: labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/MasterStanzaRelay.java
URL: http://svn.apache.org/viewvc/labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/MasterStanzaRelay.java?rev=650619&r1=650618&r2=650619&view=diff
==============================================================================
--- labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/MasterStanzaRelay.java (original)
+++ labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/MasterStanzaRelay.java Tue Apr 22 12:22:18 2008
@@ -17,63 +17,41 @@
 package org.apache.vysper.xmpp.delivery;
 
 import org.apache.vysper.xmpp.addressing.Entity;
-import org.apache.vysper.xmpp.resourcebinding.ResourceRegistry;
+import org.apache.vysper.xmpp.server.ServerRuntimeContext;
 import org.apache.vysper.xmpp.stanza.Stanza;
-import org.apache.vysper.xmpp.server.SessionContext;
-import org.apache.vysper.xmpp.writer.StanzaWriter;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.List;
 
 /**
  * relays stanzas to internal or external servers
- * TODO: but right now, it only delivers to internal sessions
  */
 public class MasterStanzaRelay implements StanzaRelay {
 
-    protected ResourceRegistry resourceRegistry;
-    protected final ExecutorService executor = Executors.newSingleThreadExecutor();
+    protected StanzaRelay internalRelay;
+    protected StanzaRelay externalRelay;
+    protected ServerRuntimeContext serverRuntimeContext;
 
-    public MasterStanzaRelay(ResourceRegistry resourceRegistry) {
-        this.resourceRegistry = resourceRegistry;
+    public void setInternalRelay(StanzaRelay internalRelay) {
+        this.internalRelay = internalRelay;
     }
-    
-    public void relay(Entity receiver, Stanza stanza) throws DeliveryException {
-        executor.submit(new Relay(receiver, stanza));
+
+    public void setExternalRelay(StanzaRelay externalRelay) {
+        this.externalRelay = externalRelay;
     }
 
-    public boolean receiverExists(Entity receiver) {
-        // TODO this is a temp solution, since we only check for 'online' users yet
-        return !resourceRegistry.getSessions(receiver).isEmpty();
+    public void setServerRuntimeContext(ServerRuntimeContext serverRuntimeContext) {
+        this.serverRuntimeContext = serverRuntimeContext;
     }
-       
-    private class Relay implements Callable<Boolean> {
-        private Entity receiver;
-        private Stanza stanza;
-
-        Relay(Entity receiver, Stanza stanza) {
-            this.receiver = receiver;
-            this.stanza = stanza;
-        }
 
-        public Entity getReceiver() {
-            return receiver;
-        }
+    public void relay(Entity receiver, Stanza stanza) throws DeliveryException {
+        String domain = receiver.getDomain();
 
-        public Stanza getStanza() {
-            return stanza;
-        }
+        boolean relayToExternal = serverRuntimeContext.getServerFeatures().isRelayingToFederationServers();
 
-        public Boolean call() throws Exception {
-            List<SessionContext> receivingSessions = resourceRegistry.getSessions(receiver);
-            for (SessionContext sessionContext : receivingSessions) {
-                StanzaWriter stanzaWriter = sessionContext.getResponseWriter();
-                stanzaWriter.write(stanza);
-            }
-            return Boolean.TRUE;
+        if (domain.equals(serverRuntimeContext.getServerEnitity().getDomain())) {
+            internalRelay.relay(receiver, stanza);
+        } else {
+            if (!relayToExternal) throw new IllegalStateException("this server is not relaying to external currently");
+            externalRelay.relay(receiver, stanza);
         }
     }
-       
+
 }

Modified: labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/RecordingStanzaRelay.java
URL: http://svn.apache.org/viewvc/labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/RecordingStanzaRelay.java?rev=650619&r1=650618&r2=650619&view=diff
==============================================================================
--- labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/RecordingStanzaRelay.java (original)
+++ labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/RecordingStanzaRelay.java Tue Apr 22 12:22:18 2008
@@ -32,13 +32,10 @@
     private boolean acceptingMode = true;
 
     public void relay(Entity receiver, Stanza stanza) throws DeliveryException {
+        if (!acceptingMode) return;
         entityStanzaPairs.add(new Pair(receiver, stanza));
     }
 
-    public boolean receiverExists(Entity receiver) {
-        return acceptingMode;
-    }
-    
     public Iterator<Pair> iterator() {
         return entityStanzaPairs.iterator();
     }

Modified: labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/StanzaReceiverRelay.java
URL: http://svn.apache.org/viewvc/labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/StanzaReceiverRelay.java?rev=650619&r1=650618&r2=650619&view=diff
==============================================================================
--- labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/StanzaReceiverRelay.java (original)
+++ labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/StanzaReceiverRelay.java Tue Apr 22 12:22:18 2008
@@ -36,11 +36,8 @@
 
     public void relay(Entity receiver, Stanza stanza) throws DeliveryException {
         if (receiver == null) throw new DeliveryException("receiver cannot be NULL");
-        if (!receiverExists(receiver)) throw new DeliveryException("cannot find receiver" + receiver.getFullQualifiedName());
+        if (receiverMap.get(receiver) == null) throw new DeliveryException("cannot find receiver" + receiver.getFullQualifiedName());
         receiverMap.get(receiver).deliver(stanza);
     }
 
-    public boolean receiverExists(Entity receiver) {
-        return receiverMap.get(receiver) != null;
-    }
 }

Modified: labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/StanzaRelay.java
URL: http://svn.apache.org/viewvc/labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/StanzaRelay.java?rev=650619&r1=650618&r2=650619&view=diff
==============================================================================
--- labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/StanzaRelay.java (original)
+++ labs/vysper/src/main/java/org/apache/vysper/xmpp/delivery/StanzaRelay.java Tue Apr 22 12:22:18 2008
@@ -27,6 +27,4 @@
 
     public void relay(Entity receiver, Stanza stanza) throws DeliveryException;
 
-    public boolean receiverExists(Entity receiver);
-
 }

Modified: labs/vysper/src/main/java/org/apache/vysper/xmpp/modules/core/base/handler/MessageHandler.java
URL: http://svn.apache.org/viewvc/labs/vysper/src/main/java/org/apache/vysper/xmpp/modules/core/base/handler/MessageHandler.java?rev=650619&r1=650618&r2=650619&view=diff
==============================================================================
--- labs/vysper/src/main/java/org/apache/vysper/xmpp/modules/core/base/handler/MessageHandler.java (original)
+++ labs/vysper/src/main/java/org/apache/vysper/xmpp/modules/core/base/handler/MessageHandler.java Tue Apr 22 12:22:18 2008
@@ -81,10 +81,6 @@
         // TODO check if the user should be on this server. relay if not.
         
         StanzaRelay stanzaRelay = sessionContext.getServerRuntimeContext().getStanzaRelay();
-        if (!stanzaRelay.receiverExists(to)) {
-            // TODO return error stanza
-            return null;
-        }
         try {
             stanzaRelay.relay(to, stanza);
         } catch (Exception e) {

Modified: labs/vysper/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java
URL: http://svn.apache.org/viewvc/labs/vysper/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java?rev=650619&r1=650618&r2=650619&view=diff
==============================================================================
--- labs/vysper/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java (original)
+++ labs/vysper/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java Tue Apr 22 12:22:18 2008
@@ -28,8 +28,24 @@
 public class ServerFeatures {
 
     private boolean startTLSRequired = true;
+
     private final List<SASLMechanism> authenticationMethods = new ArrayList<SASLMechanism>();
 
+    /**
+     * flag indicating whether message stanzas are relayed (sent to internal/external recipients)
+     */
+    private boolean relayMessages = true;
+
+    /**
+     * flag indicating whether message stanzas are relayed (sent to internal/external recipients)
+     */
+    private boolean relayPresence = true;
+
+    /**
+     * flag indicating whether stanzas are sent to remote servers or not
+     */
+    private boolean relayToFederationServers = false; 
+
     public ServerFeatures() {
         // default constructor
     }
@@ -52,5 +68,29 @@
 
     public List<SASLMechanism> getAuthenticationMethods() {
         return Collections.unmodifiableList(authenticationMethods);
+    }
+
+    public boolean isRelayingMessages() {
+        return relayMessages;
+    }
+
+    public void setRelayingMessages(boolean relayMessages) {
+        this.relayMessages = relayMessages;
+    }
+
+    public boolean isRelayingPresence() {
+        return relayPresence;
+    }
+
+    public void setRelayingPresence(boolean relayPresence) {
+        this.relayPresence = relayPresence;
+    }
+
+    public boolean isRelayingToFederationServers() {
+        return relayToFederationServers;
+    }
+
+    public void setRelayingToFederationServers(boolean relayToFederationServers) {
+        this.relayToFederationServers = relayToFederationServers;
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@labs.apache.org
For additional commands, e-mail: commits-help@labs.apache.org