You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by be...@apache.org on 2013/07/02 16:58:38 UTC

[3/3] git commit: VYSPER-344: overall improvements of server-to-server: add logging, harden routing, exeption handling. fix receiving of keep-alive pings

VYSPER-344: overall improvements of server-to-server: add logging, harden routing, exeption handling.
fix receiving of keep-alive pings


Project: http://git-wip-us.apache.org/repos/asf/mina-vysper/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-vysper/commit/ef2276ca
Tree: http://git-wip-us.apache.org/repos/asf/mina-vysper/tree/ef2276ca
Diff: http://git-wip-us.apache.org/repos/asf/mina-vysper/diff/ef2276ca

Branch: refs/heads/master
Commit: ef2276cad7d6fbbc5cf41ea025d5e2751a21f38d
Parents: 28ca5c5
Author: Bernd Fondermann <be...@brainlounge.de>
Authored: Tue Jul 2 16:57:29 2013 +0200
Committer: Bernd Fondermann <be...@brainlounge.de>
Committed: Tue Jul 2 16:57:29 2013 +0200

----------------------------------------------------------------------
 .../vysper/mina/XmppIoHandlerAdapter.java       |   2 +-
 .../server/s2s/DefaultXMPPServerConnector.java  | 302 ++++++++-----------
 .../s2s/DefaultXMPPServerConnectorRegistry.java |  21 +-
 .../server/s2s/ServerConnectorIoHandler.java    |  81 +++++
 .../xmpp/server/s2s/XMPPServerConnector.java    |   6 +
 5 files changed, 239 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-vysper/blob/ef2276ca/server/core/src/main/java/org/apache/vysper/mina/XmppIoHandlerAdapter.java
----------------------------------------------------------------------
diff --git a/server/core/src/main/java/org/apache/vysper/mina/XmppIoHandlerAdapter.java b/server/core/src/main/java/org/apache/vysper/mina/XmppIoHandlerAdapter.java
index db1f0b7..6332033 100644
--- a/server/core/src/main/java/org/apache/vysper/mina/XmppIoHandlerAdapter.java
+++ b/server/core/src/main/java/org/apache/vysper/mina/XmppIoHandlerAdapter.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXParseException;
 
 /**
- *
+ * handler for client-to-server sessions
  * @author The Apache MINA Project (dev@mina.apache.org)
  */
 public class XmppIoHandlerAdapter implements IoHandler {

http://git-wip-us.apache.org/repos/asf/mina-vysper/blob/ef2276ca/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnector.java
----------------------------------------------------------------------
diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnector.java b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnector.java
index 6752ee1..690e069 100644
--- a/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnector.java
+++ b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnector.java
@@ -18,7 +18,6 @@
  *
  */
 package org.apache.vysper.xmpp.server.s2s;
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.channels.UnresolvedAddressException;
 import java.util.Arrays;
@@ -31,10 +30,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
 import org.apache.mina.core.future.ConnectFuture;
 import org.apache.mina.core.service.IoConnector;
-import org.apache.mina.core.service.IoHandlerAdapter;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.ssl.SslFilter;
 import org.apache.mina.transport.socket.nio.NioSocketConnector;
 import org.apache.vysper.mina.MinaBackedSessionContext;
 import org.apache.vysper.mina.StanzaLoggingFilter;
@@ -48,6 +45,7 @@ import org.apache.vysper.xmpp.modules.extension.xep0220_server_dailback.DbResult
 import org.apache.vysper.xmpp.modules.extension.xep0220_server_dailback.DbVerifyHandler;
 import org.apache.vysper.xmpp.modules.extension.xep0220_server_dailback.DialbackIdGenerator;
 import org.apache.vysper.xmpp.protocol.NamespaceURIs;
+import org.apache.vysper.xmpp.protocol.ProtocolException;
 import org.apache.vysper.xmpp.protocol.ResponseStanzaContainer;
 import org.apache.vysper.xmpp.protocol.SessionStateHolder;
 import org.apache.vysper.xmpp.protocol.StanzaHandler;
@@ -70,10 +68,17 @@ import org.slf4j.LoggerFactory;
 public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerConnector {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultXMPPServerConnector.class);
+
+    private final static List<StanzaHandler> S2S_HANDSHAKE_HANDLERS = Arrays.asList(
+            new DbVerifyHandler(),
+            new DbResultHandler(),
+            new TlsProceedHandler(),
+            new FeaturesHandler()
+    ); 
     
     private ServerRuntimeContext serverRuntimeContext;
     private MinaBackedSessionContext sessionContext;
-    private Entity otherServer;
+    private Entity remoteServer;
     private SessionStateHolder sessionStateHolder = new SessionStateHolder();
     private IoConnector connector;
     
@@ -89,10 +94,12 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
     private SessionStateHolder dialbackSessionStateHolder;
     
     private Timer pingTimer;
-    
-    public DefaultXMPPServerConnector(Entity otherServer, ServerRuntimeContext serverRuntimeContext, SessionContext dialbackSessionContext, SessionStateHolder dialbackSessionStateHolder) {
+    protected ServerConnectorIoHandler serverConnectorIoHandler;
+    protected final CountDownLatch authenticatedLatch = new CountDownLatch(1);
+
+    public DefaultXMPPServerConnector(Entity remoteServer, ServerRuntimeContext serverRuntimeContext, SessionContext dialbackSessionContext, SessionStateHolder dialbackSessionStateHolder) {
         this.serverRuntimeContext = serverRuntimeContext;
-        this.otherServer = otherServer;
+        this.remoteServer = remoteServer;
         this.dialbackSessionContext = dialbackSessionContext;
         this.dialbackSessionStateHolder = dialbackSessionStateHolder;
     }
@@ -101,29 +108,26 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
      * Connect and authenticate the XMPP server connector
      */
     public synchronized void start() throws RemoteServerNotFoundException, RemoteServerTimeoutException {
-        LOG.info("Starting XMPP server connector to {}", otherServer);
+        LOG.info("Starting XMPP server connector to {}", remoteServer);
 
-        // make this method synchronous
-        final CountDownLatch authenticatedLatch = new CountDownLatch(1);
-        
         boolean successfullyConnected = false;
         
         XmppEndpointResolver resolver = new XmppEndpointResolver();
-        List<ResolvedAddress> addresses = resolver.resolveXmppServer(otherServer.getDomain());
+        List<ResolvedAddress> addresses = resolver.resolveXmppServer(remoteServer.getDomain());
         
         Throwable lastException = null;
         
         if(!addresses.isEmpty()) {
-            LOG.info("resolved {} address(es) for {}", addresses.size(), otherServer);
+            LOG.info("resolved {} address(es) for {}", addresses.size(), remoteServer);
             for(ResolvedAddress address : addresses) {
                 final InetSocketAddress ipAddress = address.getAddress();
-                LOG.info("Connecting to XMPP server {} at {}", otherServer, ipAddress);
+                LOG.info("Connecting to XMPP server {} at {}", remoteServer, ipAddress);
                 
-                connector = createConnector(authenticatedLatch);
+                connector = createConnector();
                 ConnectFuture connectFuture = connector.connect(ipAddress);
                 if(connectFuture.awaitUninterruptibly(connectTimeout) && connectFuture.isConnected()) {
                     // success on the TCP/IP level, now wait for the XMPP handshake
-                    LOG.info("XMPP server {} connected at {}", otherServer, ipAddress);
+                    LOG.info("XMPP server {} connected at {}", remoteServer, ipAddress);
                     try {
                         if(authenticatedLatch.await(xmppHandshakeTimeout, TimeUnit.MILLISECONDS)) {
                             // success, break out of connect loop
@@ -131,15 +135,15 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
                             break;
                         } else {
                             // attempt next
-                            LOG.warn("XMPP handshake with {} at {} timed out", otherServer, ipAddress);
+                            LOG.warn("XMPP handshake with {} at {} timed out", remoteServer, ipAddress);
                         }
                     } catch (InterruptedException e) {
-                        throw new RemoteServerTimeoutException("Connection to " + otherServer + " was interrupted", e);
+                        throw new RemoteServerTimeoutException("Connection to " + remoteServer + " was interrupted", e);
                     }
                 } 
 
                 lastException = connectFuture.getException();
-                LOG.warn("Failed connecting to XMPP server " + otherServer + " at " + ipAddress, connectFuture.getException());
+                LOG.warn("Failed connecting to XMPP server " + remoteServer + " at " + ipAddress, connectFuture.getException());
                 disposeAndNullifyConnector();
             }
         } else {
@@ -148,7 +152,7 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
         }
         
         if(!successfullyConnected) {
-            String exceptionMsg = "Failed to connect to XMPP server at " + otherServer;
+            String exceptionMsg = "Failed to connect to XMPP server at " + remoteServer;
             
             if(lastException instanceof UnresolvedAddressException) {
                 throw new RemoteServerNotFoundException(exceptionMsg);
@@ -166,13 +170,14 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
         connector = null;
     }
 
-    private NioSocketConnector createConnector(CountDownLatch authenticatedLatch) {
+    private NioSocketConnector createConnector() {
         NioSocketConnector connector = new NioSocketConnector();
         DefaultIoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
         filterChainBuilder.addLast("xmppCodec", new ProtocolCodecFilter(new XMPPProtocolCodecFactory()));
         filterChainBuilder.addLast("loggingFilter", new StanzaLoggingFilter());
         connector.setFilterChainBuilder(filterChainBuilder);
-        connector.setHandler(new ConnectorIoHandler(authenticatedLatch));
+        serverConnectorIoHandler = new ServerConnectorIoHandler(remoteServer, this);
+        connector.setHandler(serverConnectorIoHandler);
         return connector;
     }
     
@@ -184,6 +189,105 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
         }
     }
 
+    private StanzaHandler lookupS2SHandler(Stanza stanza) {
+        for (StanzaHandler handler : S2S_HANDSHAKE_HANDLERS) {
+            if (handler.verify(stanza)) {
+                return handler;
+            }
+        }
+        return null;
+    }
+    
+    public void handleReceivedStanza(Stanza stanza) {
+        
+        // check for basic stanza handlers
+        StanzaHandler s2sHandler = lookupS2SHandler(stanza);
+        
+        if(s2sHandler != null) {
+            ResponseStanzaContainer container;
+            try {
+                container = s2sHandler.execute(stanza, serverRuntimeContext, false, sessionContext, sessionStateHolder);
+            } catch (ProtocolException e) {
+                return;
+            }
+            if(container != null && container.hasResponse()) {
+                sessionContext.write(container.getResponseStanza());
+            }
+            
+            if(sessionStateHolder.getState() == SessionState.AUTHENTICATED) {
+                LOG.info("XMPP server connector to {} authenticated", remoteServer);
+                authenticatedLatch.countDown();
+                
+                // connection established, start pinging
+                startPinging();
+            }
+        // none of the handlers matched, stream start is handled separately
+        } else if(stanza.getName().equals("stream")) {
+            sessionContext.setSessionId(stanza.getAttributeValue("id"));
+            sessionContext.setInitiatingEntity(remoteServer);
+            
+            String version = stanza.getAttributeValue("version");
+            if(version == null) {
+                // old protocol, assume dialback
+                String dailbackId = new DialbackIdGenerator().generate(remoteServer, serverRuntimeContext.getServerEnitity(), sessionContext.getSessionId());
+                
+                Stanza dbResult = new StanzaBuilder("result", NamespaceURIs.JABBER_SERVER_DIALBACK, "db")
+                    .addAttribute("from", serverRuntimeContext.getServerEnitity().getDomain())
+                    .addAttribute("to", remoteServer.getDomain())
+                    .addText(dailbackId)
+                    .build();
+                write(dbResult);
+            }
+            
+            if(dialbackSessionContext != null) {
+                // connector is being used for dialback verification, don't do further authentication
+                sessionContext.putAttribute("DIALBACK_SESSION_CONTEXT", dialbackSessionContext);
+                sessionContext.putAttribute("DIALBACK_SESSION_STATE_HOLDER", dialbackSessionStateHolder);
+             
+                sessionContext.setInitiatingEntity(remoteServer);
+                sessionStateHolder.setState(SessionState.AUTHENTICATED);
+                authenticatedLatch.countDown();
+            }
+        } else {
+            
+            if(sessionStateHolder.getState() != SessionState.AUTHENTICATED) {
+                LOG.warn("regular stanza sent before s2s session to {} was authenticated, closing", remoteServer);
+                sessionContext.close();
+                return;
+            }
+            // only deliver messages to directly server directly 
+            if (!serverRuntimeContext.getServerEnitity().equals(stanza.getTo())) {
+                LOG.info("not handling messages to clients here received from {} to {}", remoteServer, stanza.getTo());
+                sessionContext.close();
+                return;
+            }
+            
+            serverRuntimeContext.getStanzaProcessor().processStanza(serverRuntimeContext, sessionContext, stanza, sessionStateHolder);
+        }
+    }
+
+    public void handleSessionSecured() {
+        // connection secured, send stream opener
+        sessionStateHolder.setState(SessionState.ENCRYPTED);
+
+        LOG.info("XMPP server connector to {} secured using TLS", remoteServer);
+        LOG.debug("XMPP server connector to {} restarting stream", remoteServer);
+
+        sessionContext.setIsReopeningXMLStream();
+
+        Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), remoteServer, XMPPVersion.VERSION_1_0, sessionContext);
+
+        sessionContext.write(opener);
+    }
+
+    public void handleSessionOpened(IoSession session) {
+        LOG.info("XMPP server session opened to {}", remoteServer);
+        sessionContext = new MinaBackedSessionContext(serverRuntimeContext, sessionStateHolder, session);
+        sessionStateHolder.setState(SessionState.INITIATED);
+        Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), remoteServer, XMPPVersion.VERSION_1_0, sessionContext);
+        sessionContext.write(opener);
+    }
+
     /**
      * {@inheritDoc}
      */
@@ -195,13 +299,14 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
      * {@inheritDoc}
      */
     public void close() {
+        LOG.info("XMPP server connector socket closed, closing connector (current status: {})", closed ? "closed" : "open");
         try {
             if(!closed) {
-                LOG.info("XMPP server connector to {} closing", otherServer);
+                LOG.info("XMPP server connector to {} is closing", remoteServer);
                 if (pingTimer != null) pingTimer.cancel();
                 sessionContext.close();
                 disposeAndNullifyConnector();
-                LOG.info("XMPP server connector to {} closed", otherServer);
+                LOG.info("XMPP server connector to {} is closed", remoteServer);
             }
         } finally {
             closed = true;
@@ -219,7 +324,7 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
      * {@inheritDoc}
      */
     public void timeout() {
-        LOG.debug("XMPP server connector to {} timed out, closing", otherServer);
+        LOG.debug("XMPP server connector to {} timed out, closing", remoteServer);
         close();
     }
 
@@ -231,153 +336,12 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
         return closed;
     }
 
-    private final class ConnectorIoHandler extends IoHandlerAdapter {
-        
-        private final List<StanzaHandler> handlers = Arrays.asList(
-            new DbVerifyHandler(),
-            new DbResultHandler(),
-            new TlsProceedHandler(),
-            new FeaturesHandler()
-            ); 
-        
-        private final CountDownLatch authenticatedLatch;
-
-        private ConnectorIoHandler(CountDownLatch authenticatedLatch) {
-            this.authenticatedLatch = authenticatedLatch;
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
-            if (cause instanceof IOException) {
-                if (cause instanceof javax.net.ssl.SSLHandshakeException) {
-                    LOG.warn("failed to complete SSL handshake with server {}: {}", otherServer, cause.getMessage());
-                } else if (cause instanceof javax.net.ssl.SSLException) {
-                    LOG.warn("failure in SSL with server {}: {}", otherServer, cause.getMessage());
-                } else {
-                    LOG.info("I/O exception with server {}: {}", otherServer, cause.getMessage());
-                }
-                close();
-            } else {
-                LOG.warn("Exception {} thrown by XMPP server connector to " + otherServer + ", probably a bug in Vysper: {}", cause.getClass().getName(), cause.getMessage());
-            }
-        }
-
-        private StanzaHandler lookupHandler(Stanza stanza) {
-            for (StanzaHandler handler : handlers) {
-                if (handler.verify(stanza)) {
-                    return handler;
-                }
-            }
-            return null;
-        }
-        
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public void messageReceived(IoSession session, Object message) throws Exception {
-            if(message == SslFilter.SESSION_SECURED) {
-                // connection secured, send stream opener
-                sessionStateHolder.setState(SessionState.ENCRYPTED);
-                
-                LOG.info("XMPP server connector to {} secured using TLS", otherServer);
-                LOG.debug("XMPP server connector to {} restarting stream", otherServer);
-                
-                sessionContext.setIsReopeningXMLStream();
-                
-                Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
-                
-                sessionContext.write(opener);
-            } else if(message == SslFilter.SESSION_UNSECURED) {
-                // unsecured, closing
-                close();
-            } else if(message instanceof Stanza) {
-                Stanza stanza = (Stanza) message;
-                
-                // check for basic stanza handlers
-                StanzaHandler handler = lookupHandler(stanza);
-                
-                if(handler != null) {
-                    ResponseStanzaContainer container = handler.execute(stanza, serverRuntimeContext, false, sessionContext, sessionStateHolder);
-                    if(container != null && container.hasResponse()) {
-                        sessionContext.write(container.getResponseStanza());
-                    }
-                    
-                    if(sessionStateHolder.getState() == SessionState.AUTHENTICATED) {
-                        LOG.info("XMPP server connector to {} authenticated", otherServer);
-                        authenticatedLatch.countDown();
-                        
-                        // connection established, start pinging
-                        startPinging();
-                    }
-                // none of the handlers matched, stream start is handled separately
-                } else if(stanza.getName().equals("stream")) {
-                    sessionContext.setSessionId(stanza.getAttributeValue("id"));
-                    sessionContext.setInitiatingEntity(otherServer);
-                    
-                    String version = stanza.getAttributeValue("version");
-                    if(version == null) {
-                        // old protocol, assume dialback
-                        String dailbackId = new DialbackIdGenerator().generate(otherServer, serverRuntimeContext.getServerEnitity(), sessionContext.getSessionId());
-                        
-                        Stanza dbResult = new StanzaBuilder("result", NamespaceURIs.JABBER_SERVER_DIALBACK, "db")
-                            .addAttribute("from", serverRuntimeContext.getServerEnitity().getDomain())
-                            .addAttribute("to", otherServer.getDomain())
-                            .addText(dailbackId)
-                            .build();
-                        write(dbResult);
-                    }
-                    
-                    if(dialbackSessionContext != null) {
-                        // connector is being used for dialback verification, don't do further authentication
-                        sessionContext.putAttribute("DIALBACK_SESSION_CONTEXT", dialbackSessionContext);
-                        sessionContext.putAttribute("DIALBACK_SESSION_STATE_HOLDER", dialbackSessionStateHolder);
-                     
-                        sessionContext.setInitiatingEntity(otherServer);
-                        sessionStateHolder.setState(SessionState.AUTHENTICATED);
-                        authenticatedLatch.countDown();
-                    }
-                } else {
-                    // TODO other stanzas coming here?
-                    if (message != null) LOG.warn("unhandled stanza in S2S ConnectorIoHandler: " + message);
-                }
-            } else {
-                throw new RuntimeException("Only handles SSL events and stanzas, got: " + message.getClass());
-            }
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public void sessionClosed(IoSession session) throws Exception {
-            // Socket was closed, make sure we close the connector
-            LOG.info("XMPP server connector socket closed, closing connector");
-            close();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public void sessionOpened(IoSession session) throws Exception {
-            LOG.info("XMPP server session opened to {}", otherServer);
-            sessionContext = new MinaBackedSessionContext(serverRuntimeContext, sessionStateHolder, session);
-            sessionStateHolder.setState(SessionState.INITIATED);
-            Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
-            
-            sessionContext.write(opener);
-        }
-    }
-
     private class PingTask extends TimerTask {
+        @Override
         public void run() {
             XmppPingModule pingModule = serverRuntimeContext.getModule(XmppPingModule.class);
-            LOG.info("pinging federated XMPP server {}", otherServer);
-            pingModule.ping(DefaultXMPPServerConnector.this, serverRuntimeContext.getServerEnitity(), otherServer, pingTimeout, DefaultXMPPServerConnector.this);
+            LOG.info("pinging federated XMPP server {}", remoteServer);
+            pingModule.ping(DefaultXMPPServerConnector.this, serverRuntimeContext.getServerEnitity(), remoteServer, pingTimeout, DefaultXMPPServerConnector.this);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-vysper/blob/ef2276ca/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnectorRegistry.java
----------------------------------------------------------------------
diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnectorRegistry.java b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnectorRegistry.java
index bda53c4..3cbec0d 100644
--- a/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnectorRegistry.java
+++ b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnectorRegistry.java
@@ -28,6 +28,8 @@ import org.apache.vysper.xmpp.delivery.failure.RemoteServerTimeoutException;
 import org.apache.vysper.xmpp.protocol.SessionStateHolder;
 import org.apache.vysper.xmpp.server.ServerRuntimeContext;
 import org.apache.vysper.xmpp.server.SessionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default implementation of {@link XMPPServerConnectorRegistry} 
@@ -36,6 +38,8 @@ import org.apache.vysper.xmpp.server.SessionContext;
  */
 public class DefaultXMPPServerConnectorRegistry implements XMPPServerConnectorRegistry {
 
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultXMPPServerConnectorRegistry.class);
+    
     private ServerRuntimeContext serverRuntimeContext;
     private Map<Entity, XMPPServerConnector> connectors = new ConcurrentHashMap<Entity, XMPPServerConnector>();
     
@@ -56,10 +60,14 @@ public class DefaultXMPPServerConnectorRegistry implements XMPPServerConnectorRe
         } 
         
         if(connector == null) {
+            LOG.debug("starting s2s connector to " + server);
             connector = createConnector(server, serverRuntimeContext, null, null);
-            connector.start();
+            // TODO start async, because we are in a synchronized method
+            connector.start(); // this can take some time to complete
+            LOG.debug("started s2s connector to " + server);
 
             connectors.put(server, connector);
+            LOG.debug("s2s connector registry size is now " + connectors.size());
         }
         
         return connector;        
@@ -67,8 +75,10 @@ public class DefaultXMPPServerConnectorRegistry implements XMPPServerConnectorRe
     
     @SpecCompliant(spec = "draft-ietf-xmpp-3920bis-22", section = "10.4", status = SpecCompliant.ComplianceStatus.FINISHED, coverage = SpecCompliant.ComplianceCoverage.COMPLETE)
     public synchronized XMPPServerConnector connectForDialback(Entity server, SessionContext orginalSessionContext, SessionStateHolder originalSessionStateHolder) throws RemoteServerNotFoundException, RemoteServerTimeoutException {
+        LOG.debug("starting s2s connector for dialback to " + server);
         XMPPServerConnector connector = createConnector(server, serverRuntimeContext, orginalSessionContext, originalSessionStateHolder);
         connector.start();
+        LOG.debug("started s2s connector for dialback to " + server);
         return connector;
     }
     
@@ -79,9 +89,14 @@ public class DefaultXMPPServerConnectorRegistry implements XMPPServerConnectorRe
     /* (non-Javadoc)
      * @see org.apache.vysper.xmpp.server.s2s.XMPPServerConnectorRegistry#close()
      */
-    public void close() {
+    public synchronized void close() {
+        LOG.debug("closing now all {} s2s connectors", connectors.size());
         for(XMPPServerConnector connector : connectors.values()) {
-            connector.close();
+            try {
+                connector.close();
+            } catch (Throwable e) {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-vysper/blob/ef2276ca/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/ServerConnectorIoHandler.java
----------------------------------------------------------------------
diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/ServerConnectorIoHandler.java b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/ServerConnectorIoHandler.java
new file mode 100644
index 0000000..e1ed160
--- /dev/null
+++ b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/ServerConnectorIoHandler.java
@@ -0,0 +1,81 @@
+package org.apache.vysper.xmpp.server.s2s;
+
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.ssl.SslFilter;
+import org.apache.vysper.xmpp.addressing.Entity;
+import org.apache.vysper.xmpp.stanza.Stanza;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * handler for server-to-server connections
+*/
+public class ServerConnectorIoHandler extends IoHandlerAdapter {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(ServerConnectorIoHandler.class);
+    
+    protected final Entity remoteServer;
+    protected final XMPPServerConnector serverConnector;
+
+    ServerConnectorIoHandler(Entity remoteServer, XMPPServerConnector serverConnector) {
+        this.remoteServer = remoteServer;
+        this.serverConnector = serverConnector;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+        if (cause instanceof IOException) {
+            if (cause instanceof javax.net.ssl.SSLHandshakeException) {
+                LOG.warn("failed to complete SSL handshake with server {}: {}", remoteServer, cause.getMessage());
+            } else if (cause instanceof javax.net.ssl.SSLException) {
+                LOG.warn("failure in SSL with server {}: {}", remoteServer, cause.getMessage());
+            } else {
+                LOG.info("I/O exception with server {}: {}", remoteServer, cause.getMessage());
+            }
+            serverConnector.close();
+        } else {
+            LOG.warn("Exception {} thrown by XMPP server connector to " + remoteServer + ", probably a bug in Vysper: {}", cause.getClass().getName(), cause.getMessage());
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void messageReceived(IoSession session, Object message) {
+        if(message == SslFilter.SESSION_SECURED) {
+            serverConnector.handleSessionSecured();
+        } else if(message == SslFilter.SESSION_UNSECURED) {
+            // unsecured, closing
+            serverConnector.close();
+        } else if(message instanceof Stanza) {
+            Stanza stanza = (Stanza) message;
+            serverConnector.handleReceivedStanza(stanza);
+        } else {
+            throw new RuntimeException("Only handles SSL events and stanzas, got: " + message.getClass());
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void sessionClosed(IoSession session) throws Exception {
+        // Socket was closed, make sure we close the connector
+        serverConnector.close();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void sessionOpened(IoSession session) throws Exception {
+        serverConnector.handleSessionOpened(session);
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-vysper/blob/ef2276ca/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
----------------------------------------------------------------------
diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
index f7308c9..7670e3f 100644
--- a/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
+++ b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
@@ -19,6 +19,7 @@
  */
 package org.apache.vysper.xmpp.server.s2s;
 
+import org.apache.mina.core.session.IoSession;
 import org.apache.vysper.xmpp.delivery.failure.RemoteServerNotFoundException;
 import org.apache.vysper.xmpp.delivery.failure.RemoteServerTimeoutException;
 import org.apache.vysper.xmpp.stanza.Stanza;
@@ -49,4 +50,9 @@ public interface XMPPServerConnector extends StanzaWriter {
      */
     void write(Stanza stanza);
 
+    void handleReceivedStanza(Stanza stanza);
+
+    void handleSessionSecured();
+
+    void handleSessionOpened(IoSession session);
 }
\ No newline at end of file