You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by ng...@apache.org on 2010/12/29 22:54:41 UTC

svn commit: r1053752 - in /mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s: XMPPServerConnector.java XMPPServerConnectorRegistry.java XmppEndpointResolver.java

Author: ngn
Date: Wed Dec 29 21:54:40 2010
New Revision: 1053752

URL: http://svn.apache.org/viewvc?rev=1053752&view=rev
Log:
Handle reconnecting to all registred addresses for an XMPP server, or failing if all fail

Modified:
    mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
    mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java
    mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java

Modified: mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
URL: http://svn.apache.org/viewvc/mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java?rev=1053752&r1=1053751&r2=1053752&view=diff
==============================================================================
--- mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java (original)
+++ mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java Wed Dec 29 21:54:40 2010
@@ -1,11 +1,14 @@
 package org.apache.vysper.xmpp.server.s2s;
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
+import org.apache.mina.core.future.ConnectFuture;
 import org.apache.mina.core.service.IoConnector;
 import org.apache.mina.core.service.IoHandlerAdapter;
 import org.apache.mina.core.session.IoSession;
@@ -25,6 +28,7 @@ import org.apache.vysper.xmpp.server.Ser
 import org.apache.vysper.xmpp.server.SessionState;
 import org.apache.vysper.xmpp.server.XMPPVersion;
 import org.apache.vysper.xmpp.server.response.ServerResponses;
+import org.apache.vysper.xmpp.server.s2s.XmppEndpointResolver.ResolvedAddress;
 import org.apache.vysper.xmpp.stanza.Stanza;
 import org.apache.vysper.xmpp.stanza.StanzaBuilder;
 import org.apache.vysper.xmpp.writer.StanzaWriter;
@@ -39,8 +43,11 @@ public class XMPPServerConnector impleme
     private MinaBackedSessionContext sessionContext;
     private Entity otherServer;
     private SessionStateHolder sessionStateHolder = new SessionStateHolder();
-    private IoConnector connector = new NioSocketConnector();
+    private IoConnector connector;
     
+    private int connectTimeout = 30000;
+    private int xmppHandshakeTimeout = 30000;
+
     private int pingPeriod = 30000;
     private int pingTimeout = 10000;
     
@@ -48,138 +55,66 @@ public class XMPPServerConnector impleme
     
     private Timer pingTimer = new Timer("pingtimer", true);
     
-    private class PingTask extends TimerTask {
-        public void run() {
-            XmppPingModule pingModule = serverRuntimeContext.getModule(XmppPingModule.class);
-            if(pingModule != null) {
-                pingModule.ping(XMPPServerConnector.this, serverRuntimeContext.getServerEnitity(), otherServer, pingTimeout, XMPPServerConnector.this);
-            }
-        }
-    }
-    
     public XMPPServerConnector(Entity otherServer, ServerRuntimeContext serverRuntimeContext) {
         this.serverRuntimeContext = serverRuntimeContext;
         this.otherServer = otherServer;
-
-        DefaultIoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
-        filterChainBuilder.addLast("xmppCodec", new ProtocolCodecFilter(new XMPPProtocolCodecFactory()));
-        filterChainBuilder.addLast("loggingFilter", new StanzaLoggingFilter());
-        connector.setFilterChainBuilder(filterChainBuilder);
     }
 
-    public synchronized void start() {
+    public synchronized void start() throws IOException {
         LOG.info("Starting XMPP server connector to {}", otherServer);
+
+        // make this method synchronous
         final CountDownLatch authenticatedLatch = new CountDownLatch(1);
         
-        connector.setHandler(new IoHandlerAdapter() {
-            @Override
-            public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
-                LOG.info("Exception thrown by XMPP server connector to {}, probably a bug in Vysper", otherServer);
-            }
-
-            @Override
-            public void messageReceived(IoSession session, Object message) throws Exception {
-                if(message == SslFilter.SESSION_SECURED) {
-                    // TODO handle unsecure
-                    // connection secured, send stream opener
-                    sessionStateHolder.setState(SessionState.ENCRYPTED);
-                    
-                    LOG.info("XMPP server connector to {} secured using TLS", otherServer);
-                    LOG.debug("XMPP server connector to {} restarting stream", otherServer);
-                    
-                    sessionContext.setIsReopeningXMLStream();
-                    
-                    Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
-                    
-                    sessionContext.write(opener);
-                } else {
-                    Stanza msg = (Stanza) message;
-                    
-                    if(msg.getName().equals("stream")) {
-                        sessionContext.setSessionId(msg.getAttributeValue("id"));
-                    } else if(msg.getName().equals("features")) {
-                        if(startTlsSupported(msg)) {
-                            LOG.info("XMPP server connector to {} is starting TLS", otherServer);
-                            Stanza startTlsStanza = new StanzaBuilder("starttls", NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).build();
-                            
-                            sessionContext.write(startTlsStanza);
-                            
-                        } else if(dialbackSupported(msg)) {
-                            Entity originating = serverRuntimeContext.getServerEnitity();
-    
-                            String dailbackId = new DailbackIdGenerator().generate(otherServer, originating, sessionContext.getSessionId());
-                            
-                            Stanza dbResult = new StanzaBuilder("result", NamespaceURIs.JABBER_SERVER_DIALBACK, "db")
-                                .addAttribute("from", originating.getDomain())
-                                .addAttribute("to", otherServer.getDomain())
-                                .addText(dailbackId)
-                                .build();
-                            
-                            sessionContext.write(dbResult);
-                        } else {
-                            throw new RuntimeException("Unsupported features");
-                        }
-                    } else if(msg.getName().equals("result") && NamespaceURIs.JABBER_SERVER_DIALBACK.equals(msg.getNamespaceURI())) {
-                        // TODO check and handle dailback result
-                        sessionStateHolder.setState(SessionState.AUTHENTICATED);
-                        
-                        LOG.info("XMPP server connector to {} authenticated using dialback", otherServer);
-                        authenticatedLatch.countDown();
-                        
-                        // connection established, start pinging
-                        startPinging();
-                    } else if(msg.getName().equals("proceed") && NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS.equals(msg.getNamespaceURI())) {
-                        sessionStateHolder.setState(SessionState.ENCRYPTION_STARTED);
-                        
-                        LOG.debug("XMPP server connector to {} switching to TLS", otherServer);
-                        sessionContext.switchToTLS(false, true);
+        boolean successfullyConnected = false;
+        
+        XmppEndpointResolver resolver = new XmppEndpointResolver();
+        List<ResolvedAddress> addresses = resolver.resolveXmppServer(otherServer.getDomain());
+        
+        for(ResolvedAddress address : addresses) {
+            LOG.info("Connecting to XMPP server {} at {}", otherServer, address.getAddress());
+            
+            connector = createConnector(authenticatedLatch);
+            ConnectFuture connectFuture = connector.connect(address.getAddress());
+            if(connectFuture.awaitUninterruptibly(connectTimeout) && connectFuture.isConnected()) {
+                // success on the TCP/IP lever, now wait for the XMPP handshake
+
+                try {
+                    if(authenticatedLatch.await(xmppHandshakeTimeout, TimeUnit.MILLISECONDS)) {
+                        // success, break out of connect loop
+                        successfullyConnected = true;
+                        break;
                     } else {
-                        // TODO other stanzas coming here?
+                        // attempt next
+                        LOG.warn("XMPP handshake with {} at () timed out", otherServer, address.getAddress());
                     }
+                } catch (InterruptedException e) {
+                    throw new IOException("Connection to " + otherServer + " was interrupted", e);
                 }
-            }
-            
-            private boolean startTlsSupported(Stanza stanza) {
-                return !stanza.getInnerElementsNamed("starttls", NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).isEmpty();
-            }
-
-            private boolean dialbackSupported(Stanza stanza) {
-                // TODO check for dialback namespace
-                return !stanza.getInnerElementsNamed("dialback", NamespaceURIs.URN_XMPP_FEATURES_DIALBACK).isEmpty();
-            }
+            } 
 
-            @Override
-            public void sessionClosed(IoSession session) throws Exception {
-                // Socket was closed, make sure we close the connector
-                LOG.info("XMPP server connector socket closed, closing connector");
-                close();
-            }
-
-            @Override
-            public void sessionOpened(IoSession session) throws Exception {
-                sessionContext = new MinaBackedSessionContext(serverRuntimeContext, sessionStateHolder, session);
-                sessionStateHolder.setState(SessionState.INITIATED);
-                Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
-                
-                sessionContext.write(opener);
-            }
-        });
-        
-        XmppEndpointResolver resolver = new XmppEndpointResolver();
-        InetSocketAddress address = resolver.resolveXmppServer(otherServer.getDomain()).get(0).getAddress();
-        
-        LOG.debug("Connecting to XMPP server {} at {}", otherServer, address);
-        connector.connect(address);
+            LOG.warn("Failed connecting to XMPP server " + otherServer + " at " + address.getAddress(), connectFuture.getException());
+            connector.dispose();
+            connector = null;
+        }
         
-        // make this method sync
-        // TODO handle timeout
-        try {
-            authenticatedLatch.await(20000, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            // TODO handle
+        if(!successfullyConnected) {
+            throw new IOException("Failed to connect to XMPP server at " + otherServer);
         }
     }
     
+    private NioSocketConnector createConnector(CountDownLatch authenticatedLatch) {
+        NioSocketConnector connector = new NioSocketConnector();
+        DefaultIoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
+        filterChainBuilder.addLast("xmppCodec", new ProtocolCodecFilter(new XMPPProtocolCodecFactory()));
+        filterChainBuilder.addLast("loggingFilter", new StanzaLoggingFilter());
+        connector.setFilterChainBuilder(filterChainBuilder);
+        connector.setHandler(new ConnectorIoHandler(authenticatedLatch));
+        return connector;
+    }
+
+
+    
     private void startPinging() {
         pingTimer.schedule(new PingTask(), pingPeriod, pingPeriod);
     }
@@ -212,4 +147,115 @@ public class XMPPServerConnector impleme
     public boolean isClosed() {
         return closed;
     }
+
+    private final class ConnectorIoHandler extends IoHandlerAdapter {
+        private final CountDownLatch authenticatedLatch;
+
+        private ConnectorIoHandler(CountDownLatch authenticatedLatch) {
+            this.authenticatedLatch = authenticatedLatch;
+        }
+
+        @Override
+        public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+            LOG.info("Exception thrown by XMPP server connector to {}, probably a bug in Vysper", otherServer);
+        }
+
+        @Override
+        public void messageReceived(IoSession session, Object message) throws Exception {
+            if(message == SslFilter.SESSION_SECURED) {
+                // TODO handle unsecure
+                // connection secured, send stream opener
+                sessionStateHolder.setState(SessionState.ENCRYPTED);
+                
+                LOG.info("XMPP server connector to {} secured using TLS", otherServer);
+                LOG.debug("XMPP server connector to {} restarting stream", otherServer);
+                
+                sessionContext.setIsReopeningXMLStream();
+                
+                Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
+                
+                sessionContext.write(opener);
+            } else {
+                Stanza msg = (Stanza) message;
+                
+                if(msg.getName().equals("stream")) {
+                    sessionContext.setSessionId(msg.getAttributeValue("id"));
+                } else if(msg.getName().equals("features")) {
+                    if(startTlsSupported(msg)) {
+                        LOG.info("XMPP server connector to {} is starting TLS", otherServer);
+                        Stanza startTlsStanza = new StanzaBuilder("starttls", NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).build();
+                        
+                        sessionContext.write(startTlsStanza);
+                        
+                    } else if(dialbackSupported(msg)) {
+                        Entity originating = serverRuntimeContext.getServerEnitity();
+   
+                        String dailbackId = new DailbackIdGenerator().generate(otherServer, originating, sessionContext.getSessionId());
+                        
+                        Stanza dbResult = new StanzaBuilder("result", NamespaceURIs.JABBER_SERVER_DIALBACK, "db")
+                            .addAttribute("from", originating.getDomain())
+                            .addAttribute("to", otherServer.getDomain())
+                            .addText(dailbackId)
+                            .build();
+                        
+                        sessionContext.write(dbResult);
+                    } else {
+                        throw new RuntimeException("Unsupported features");
+                    }
+                } else if(msg.getName().equals("result") && NamespaceURIs.JABBER_SERVER_DIALBACK.equals(msg.getNamespaceURI())) {
+                    // TODO check and handle dailback result
+                    sessionStateHolder.setState(SessionState.AUTHENTICATED);
+                    
+                    LOG.info("XMPP server connector to {} authenticated using dialback", otherServer);
+                    authenticatedLatch.countDown();
+                    
+                    // connection established, start pinging
+                    startPinging();
+                } else if(msg.getName().equals("proceed") && NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS.equals(msg.getNamespaceURI())) {
+                    sessionStateHolder.setState(SessionState.ENCRYPTION_STARTED);
+                    
+                    LOG.debug("XMPP server connector to {} switching to TLS", otherServer);
+                    sessionContext.switchToTLS(false, true);
+                } else {
+                    // TODO other stanzas coming here?
+                }
+            }
+        }
+
+        private boolean startTlsSupported(Stanza stanza) {
+            return !stanza.getInnerElementsNamed("starttls", NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).isEmpty();
+        }
+
+        private boolean dialbackSupported(Stanza stanza) {
+            // TODO check for dialback namespace
+            return !stanza.getInnerElementsNamed("dialback", NamespaceURIs.URN_XMPP_FEATURES_DIALBACK).isEmpty();
+        }
+
+        @Override
+        public void sessionClosed(IoSession session) throws Exception {
+            // Socket was closed, make sure we close the connector
+            LOG.info("XMPP server connector socket closed, closing connector");
+            close();
+        }
+
+        @Override
+        public void sessionOpened(IoSession session) throws Exception {
+            sessionContext = new MinaBackedSessionContext(serverRuntimeContext, sessionStateHolder, session);
+            sessionStateHolder.setState(SessionState.INITIATED);
+            Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
+            
+            sessionContext.write(opener);
+        }
+    }
+
+    private class PingTask extends TimerTask {
+        public void run() {
+            XmppPingModule pingModule = serverRuntimeContext.getModule(XmppPingModule.class);
+            if(pingModule != null) {
+                pingModule.ping(XMPPServerConnector.this, serverRuntimeContext.getServerEnitity(), otherServer, pingTimeout, XMPPServerConnector.this);
+            }
+        }
+    }
+    
+
 }

Modified: mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java
URL: http://svn.apache.org/viewvc/mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java?rev=1053752&r1=1053751&r2=1053752&view=diff
==============================================================================
--- mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java (original)
+++ mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java Wed Dec 29 21:54:40 2010
@@ -1,4 +1,5 @@
 package org.apache.vysper.xmpp.server.s2s;
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -14,7 +15,7 @@ public class XMPPServerConnectorRegistry
         this.serverRuntimeContext = serverRuntimeContext;
     }
 
-    public synchronized XMPPServerConnector getConnector(Entity server) {
+    public synchronized XMPPServerConnector getConnector(Entity server) throws IOException {
         XMPPServerConnector connector = connectors.get(server);
 
         if(connector != null && connector.isClosed()) {

Modified: mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java
URL: http://svn.apache.org/viewvc/mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java?rev=1053752&r1=1053751&r2=1053752&view=diff
==============================================================================
--- mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java (original)
+++ mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java Wed Dec 29 21:54:40 2010
@@ -39,12 +39,6 @@ public class XmppEndpointResolver {
         }
     }
 
-    public static void main(String[] args) throws Exception {
-        XmppEndpointResolver resolver = new XmppEndpointResolver();
-        System.out.println(resolver.resolveXmppServer("protocol7.com"));
-        
-    }
-    
     public List<ResolvedAddress> resolveXmppServer(String domain) {
         List<ResolvedAddress> addresses = new ArrayList<ResolvedAddress>();
         try {