You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by ng...@apache.org on 2010/12/29 22:54:41 UTC
svn commit: r1053752 - in
/mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s:
XMPPServerConnector.java XMPPServerConnectorRegistry.java
XmppEndpointResolver.java
Author: ngn
Date: Wed Dec 29 21:54:40 2010
New Revision: 1053752
URL: http://svn.apache.org/viewvc?rev=1053752&view=rev
Log:
Handle reconnecting to all registred addresses for an XMPP server, or failing if all fail
Modified:
mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java
mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java
Modified: mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
URL: http://svn.apache.org/viewvc/mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java?rev=1053752&r1=1053751&r2=1053752&view=diff
==============================================================================
--- mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java (original)
+++ mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java Wed Dec 29 21:54:40 2010
@@ -1,11 +1,14 @@
package org.apache.vysper.xmpp.server.s2s;
+import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
+import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
@@ -25,6 +28,7 @@ import org.apache.vysper.xmpp.server.Ser
import org.apache.vysper.xmpp.server.SessionState;
import org.apache.vysper.xmpp.server.XMPPVersion;
import org.apache.vysper.xmpp.server.response.ServerResponses;
+import org.apache.vysper.xmpp.server.s2s.XmppEndpointResolver.ResolvedAddress;
import org.apache.vysper.xmpp.stanza.Stanza;
import org.apache.vysper.xmpp.stanza.StanzaBuilder;
import org.apache.vysper.xmpp.writer.StanzaWriter;
@@ -39,8 +43,11 @@ public class XMPPServerConnector impleme
private MinaBackedSessionContext sessionContext;
private Entity otherServer;
private SessionStateHolder sessionStateHolder = new SessionStateHolder();
- private IoConnector connector = new NioSocketConnector();
+ private IoConnector connector;
+ private int connectTimeout = 30000;
+ private int xmppHandshakeTimeout = 30000;
+
private int pingPeriod = 30000;
private int pingTimeout = 10000;
@@ -48,138 +55,66 @@ public class XMPPServerConnector impleme
private Timer pingTimer = new Timer("pingtimer", true);
- private class PingTask extends TimerTask {
- public void run() {
- XmppPingModule pingModule = serverRuntimeContext.getModule(XmppPingModule.class);
- if(pingModule != null) {
- pingModule.ping(XMPPServerConnector.this, serverRuntimeContext.getServerEnitity(), otherServer, pingTimeout, XMPPServerConnector.this);
- }
- }
- }
-
public XMPPServerConnector(Entity otherServer, ServerRuntimeContext serverRuntimeContext) {
this.serverRuntimeContext = serverRuntimeContext;
this.otherServer = otherServer;
-
- DefaultIoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
- filterChainBuilder.addLast("xmppCodec", new ProtocolCodecFilter(new XMPPProtocolCodecFactory()));
- filterChainBuilder.addLast("loggingFilter", new StanzaLoggingFilter());
- connector.setFilterChainBuilder(filterChainBuilder);
}
- public synchronized void start() {
+ public synchronized void start() throws IOException {
LOG.info("Starting XMPP server connector to {}", otherServer);
+
+ // make this method synchronous
final CountDownLatch authenticatedLatch = new CountDownLatch(1);
- connector.setHandler(new IoHandlerAdapter() {
- @Override
- public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
- LOG.info("Exception thrown by XMPP server connector to {}, probably a bug in Vysper", otherServer);
- }
-
- @Override
- public void messageReceived(IoSession session, Object message) throws Exception {
- if(message == SslFilter.SESSION_SECURED) {
- // TODO handle unsecure
- // connection secured, send stream opener
- sessionStateHolder.setState(SessionState.ENCRYPTED);
-
- LOG.info("XMPP server connector to {} secured using TLS", otherServer);
- LOG.debug("XMPP server connector to {} restarting stream", otherServer);
-
- sessionContext.setIsReopeningXMLStream();
-
- Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
-
- sessionContext.write(opener);
- } else {
- Stanza msg = (Stanza) message;
-
- if(msg.getName().equals("stream")) {
- sessionContext.setSessionId(msg.getAttributeValue("id"));
- } else if(msg.getName().equals("features")) {
- if(startTlsSupported(msg)) {
- LOG.info("XMPP server connector to {} is starting TLS", otherServer);
- Stanza startTlsStanza = new StanzaBuilder("starttls", NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).build();
-
- sessionContext.write(startTlsStanza);
-
- } else if(dialbackSupported(msg)) {
- Entity originating = serverRuntimeContext.getServerEnitity();
-
- String dailbackId = new DailbackIdGenerator().generate(otherServer, originating, sessionContext.getSessionId());
-
- Stanza dbResult = new StanzaBuilder("result", NamespaceURIs.JABBER_SERVER_DIALBACK, "db")
- .addAttribute("from", originating.getDomain())
- .addAttribute("to", otherServer.getDomain())
- .addText(dailbackId)
- .build();
-
- sessionContext.write(dbResult);
- } else {
- throw new RuntimeException("Unsupported features");
- }
- } else if(msg.getName().equals("result") && NamespaceURIs.JABBER_SERVER_DIALBACK.equals(msg.getNamespaceURI())) {
- // TODO check and handle dailback result
- sessionStateHolder.setState(SessionState.AUTHENTICATED);
-
- LOG.info("XMPP server connector to {} authenticated using dialback", otherServer);
- authenticatedLatch.countDown();
-
- // connection established, start pinging
- startPinging();
- } else if(msg.getName().equals("proceed") && NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS.equals(msg.getNamespaceURI())) {
- sessionStateHolder.setState(SessionState.ENCRYPTION_STARTED);
-
- LOG.debug("XMPP server connector to {} switching to TLS", otherServer);
- sessionContext.switchToTLS(false, true);
+ boolean successfullyConnected = false;
+
+ XmppEndpointResolver resolver = new XmppEndpointResolver();
+ List<ResolvedAddress> addresses = resolver.resolveXmppServer(otherServer.getDomain());
+
+ for(ResolvedAddress address : addresses) {
+ LOG.info("Connecting to XMPP server {} at {}", otherServer, address.getAddress());
+
+ connector = createConnector(authenticatedLatch);
+ ConnectFuture connectFuture = connector.connect(address.getAddress());
+ if(connectFuture.awaitUninterruptibly(connectTimeout) && connectFuture.isConnected()) {
+ // success on the TCP/IP lever, now wait for the XMPP handshake
+
+ try {
+ if(authenticatedLatch.await(xmppHandshakeTimeout, TimeUnit.MILLISECONDS)) {
+ // success, break out of connect loop
+ successfullyConnected = true;
+ break;
} else {
- // TODO other stanzas coming here?
+ // attempt next
+ LOG.warn("XMPP handshake with {} at () timed out", otherServer, address.getAddress());
}
+ } catch (InterruptedException e) {
+ throw new IOException("Connection to " + otherServer + " was interrupted", e);
}
- }
-
- private boolean startTlsSupported(Stanza stanza) {
- return !stanza.getInnerElementsNamed("starttls", NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).isEmpty();
- }
-
- private boolean dialbackSupported(Stanza stanza) {
- // TODO check for dialback namespace
- return !stanza.getInnerElementsNamed("dialback", NamespaceURIs.URN_XMPP_FEATURES_DIALBACK).isEmpty();
- }
+ }
- @Override
- public void sessionClosed(IoSession session) throws Exception {
- // Socket was closed, make sure we close the connector
- LOG.info("XMPP server connector socket closed, closing connector");
- close();
- }
-
- @Override
- public void sessionOpened(IoSession session) throws Exception {
- sessionContext = new MinaBackedSessionContext(serverRuntimeContext, sessionStateHolder, session);
- sessionStateHolder.setState(SessionState.INITIATED);
- Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
-
- sessionContext.write(opener);
- }
- });
-
- XmppEndpointResolver resolver = new XmppEndpointResolver();
- InetSocketAddress address = resolver.resolveXmppServer(otherServer.getDomain()).get(0).getAddress();
-
- LOG.debug("Connecting to XMPP server {} at {}", otherServer, address);
- connector.connect(address);
+ LOG.warn("Failed connecting to XMPP server " + otherServer + " at " + address.getAddress(), connectFuture.getException());
+ connector.dispose();
+ connector = null;
+ }
- // make this method sync
- // TODO handle timeout
- try {
- authenticatedLatch.await(20000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- // TODO handle
+ if(!successfullyConnected) {
+ throw new IOException("Failed to connect to XMPP server at " + otherServer);
}
}
+ private NioSocketConnector createConnector(CountDownLatch authenticatedLatch) {
+ NioSocketConnector connector = new NioSocketConnector();
+ DefaultIoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
+ filterChainBuilder.addLast("xmppCodec", new ProtocolCodecFilter(new XMPPProtocolCodecFactory()));
+ filterChainBuilder.addLast("loggingFilter", new StanzaLoggingFilter());
+ connector.setFilterChainBuilder(filterChainBuilder);
+ connector.setHandler(new ConnectorIoHandler(authenticatedLatch));
+ return connector;
+ }
+
+
+
private void startPinging() {
pingTimer.schedule(new PingTask(), pingPeriod, pingPeriod);
}
@@ -212,4 +147,115 @@ public class XMPPServerConnector impleme
public boolean isClosed() {
return closed;
}
+
+ private final class ConnectorIoHandler extends IoHandlerAdapter {
+ private final CountDownLatch authenticatedLatch;
+
+ private ConnectorIoHandler(CountDownLatch authenticatedLatch) {
+ this.authenticatedLatch = authenticatedLatch;
+ }
+
+ @Override
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+ LOG.info("Exception thrown by XMPP server connector to {}, probably a bug in Vysper", otherServer);
+ }
+
+ @Override
+ public void messageReceived(IoSession session, Object message) throws Exception {
+ if(message == SslFilter.SESSION_SECURED) {
+ // TODO handle unsecure
+ // connection secured, send stream opener
+ sessionStateHolder.setState(SessionState.ENCRYPTED);
+
+ LOG.info("XMPP server connector to {} secured using TLS", otherServer);
+ LOG.debug("XMPP server connector to {} restarting stream", otherServer);
+
+ sessionContext.setIsReopeningXMLStream();
+
+ Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
+
+ sessionContext.write(opener);
+ } else {
+ Stanza msg = (Stanza) message;
+
+ if(msg.getName().equals("stream")) {
+ sessionContext.setSessionId(msg.getAttributeValue("id"));
+ } else if(msg.getName().equals("features")) {
+ if(startTlsSupported(msg)) {
+ LOG.info("XMPP server connector to {} is starting TLS", otherServer);
+ Stanza startTlsStanza = new StanzaBuilder("starttls", NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).build();
+
+ sessionContext.write(startTlsStanza);
+
+ } else if(dialbackSupported(msg)) {
+ Entity originating = serverRuntimeContext.getServerEnitity();
+
+ String dailbackId = new DailbackIdGenerator().generate(otherServer, originating, sessionContext.getSessionId());
+
+ Stanza dbResult = new StanzaBuilder("result", NamespaceURIs.JABBER_SERVER_DIALBACK, "db")
+ .addAttribute("from", originating.getDomain())
+ .addAttribute("to", otherServer.getDomain())
+ .addText(dailbackId)
+ .build();
+
+ sessionContext.write(dbResult);
+ } else {
+ throw new RuntimeException("Unsupported features");
+ }
+ } else if(msg.getName().equals("result") && NamespaceURIs.JABBER_SERVER_DIALBACK.equals(msg.getNamespaceURI())) {
+ // TODO check and handle dailback result
+ sessionStateHolder.setState(SessionState.AUTHENTICATED);
+
+ LOG.info("XMPP server connector to {} authenticated using dialback", otherServer);
+ authenticatedLatch.countDown();
+
+ // connection established, start pinging
+ startPinging();
+ } else if(msg.getName().equals("proceed") && NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS.equals(msg.getNamespaceURI())) {
+ sessionStateHolder.setState(SessionState.ENCRYPTION_STARTED);
+
+ LOG.debug("XMPP server connector to {} switching to TLS", otherServer);
+ sessionContext.switchToTLS(false, true);
+ } else {
+ // TODO other stanzas coming here?
+ }
+ }
+ }
+
+ private boolean startTlsSupported(Stanza stanza) {
+ return !stanza.getInnerElementsNamed("starttls", NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).isEmpty();
+ }
+
+ private boolean dialbackSupported(Stanza stanza) {
+ // TODO check for dialback namespace
+ return !stanza.getInnerElementsNamed("dialback", NamespaceURIs.URN_XMPP_FEATURES_DIALBACK).isEmpty();
+ }
+
+ @Override
+ public void sessionClosed(IoSession session) throws Exception {
+ // Socket was closed, make sure we close the connector
+ LOG.info("XMPP server connector socket closed, closing connector");
+ close();
+ }
+
+ @Override
+ public void sessionOpened(IoSession session) throws Exception {
+ sessionContext = new MinaBackedSessionContext(serverRuntimeContext, sessionStateHolder, session);
+ sessionStateHolder.setState(SessionState.INITIATED);
+ Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
+
+ sessionContext.write(opener);
+ }
+ }
+
+ private class PingTask extends TimerTask {
+ public void run() {
+ XmppPingModule pingModule = serverRuntimeContext.getModule(XmppPingModule.class);
+ if(pingModule != null) {
+ pingModule.ping(XMPPServerConnector.this, serverRuntimeContext.getServerEnitity(), otherServer, pingTimeout, XMPPServerConnector.this);
+ }
+ }
+ }
+
+
}
Modified: mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java
URL: http://svn.apache.org/viewvc/mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java?rev=1053752&r1=1053751&r2=1053752&view=diff
==============================================================================
--- mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java (original)
+++ mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java Wed Dec 29 21:54:40 2010
@@ -1,4 +1,5 @@
package org.apache.vysper.xmpp.server.s2s;
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -14,7 +15,7 @@ public class XMPPServerConnectorRegistry
this.serverRuntimeContext = serverRuntimeContext;
}
- public synchronized XMPPServerConnector getConnector(Entity server) {
+ public synchronized XMPPServerConnector getConnector(Entity server) throws IOException {
XMPPServerConnector connector = connectors.get(server);
if(connector != null && connector.isClosed()) {
Modified: mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java
URL: http://svn.apache.org/viewvc/mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java?rev=1053752&r1=1053751&r2=1053752&view=diff
==============================================================================
--- mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java (original)
+++ mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java Wed Dec 29 21:54:40 2010
@@ -39,12 +39,6 @@ public class XmppEndpointResolver {
}
}
- public static void main(String[] args) throws Exception {
- XmppEndpointResolver resolver = new XmppEndpointResolver();
- System.out.println(resolver.resolveXmppServer("protocol7.com"));
-
- }
-
public List<ResolvedAddress> resolveXmppServer(String domain) {
List<ResolvedAddress> addresses = new ArrayList<ResolvedAddress>();
try {