You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by be...@apache.org on 2013/07/02 16:58:38 UTC
[3/3] git commit: VYSPER-344: overall improvements of
server-to-server: add logging, harden routing,
exeption handling. fix receiving of keep-alive pings
VYSPER-344: overall improvements of server-to-server: add logging, harden routing, exeption handling.
fix receiving of keep-alive pings
Project: http://git-wip-us.apache.org/repos/asf/mina-vysper/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-vysper/commit/ef2276ca
Tree: http://git-wip-us.apache.org/repos/asf/mina-vysper/tree/ef2276ca
Diff: http://git-wip-us.apache.org/repos/asf/mina-vysper/diff/ef2276ca
Branch: refs/heads/master
Commit: ef2276cad7d6fbbc5cf41ea025d5e2751a21f38d
Parents: 28ca5c5
Author: Bernd Fondermann <be...@brainlounge.de>
Authored: Tue Jul 2 16:57:29 2013 +0200
Committer: Bernd Fondermann <be...@brainlounge.de>
Committed: Tue Jul 2 16:57:29 2013 +0200
----------------------------------------------------------------------
.../vysper/mina/XmppIoHandlerAdapter.java | 2 +-
.../server/s2s/DefaultXMPPServerConnector.java | 302 ++++++++-----------
.../s2s/DefaultXMPPServerConnectorRegistry.java | 21 +-
.../server/s2s/ServerConnectorIoHandler.java | 81 +++++
.../xmpp/server/s2s/XMPPServerConnector.java | 6 +
5 files changed, 239 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-vysper/blob/ef2276ca/server/core/src/main/java/org/apache/vysper/mina/XmppIoHandlerAdapter.java
----------------------------------------------------------------------
diff --git a/server/core/src/main/java/org/apache/vysper/mina/XmppIoHandlerAdapter.java b/server/core/src/main/java/org/apache/vysper/mina/XmppIoHandlerAdapter.java
index db1f0b7..6332033 100644
--- a/server/core/src/main/java/org/apache/vysper/mina/XmppIoHandlerAdapter.java
+++ b/server/core/src/main/java/org/apache/vysper/mina/XmppIoHandlerAdapter.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
import org.xml.sax.SAXParseException;
/**
- *
+ * handler for client-to-server sessions
* @author The Apache MINA Project (dev@mina.apache.org)
*/
public class XmppIoHandlerAdapter implements IoHandler {
http://git-wip-us.apache.org/repos/asf/mina-vysper/blob/ef2276ca/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnector.java
----------------------------------------------------------------------
diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnector.java b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnector.java
index 6752ee1..690e069 100644
--- a/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnector.java
+++ b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnector.java
@@ -18,7 +18,6 @@
*
*/
package org.apache.vysper.xmpp.server.s2s;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.UnresolvedAddressException;
import java.util.Arrays;
@@ -31,10 +30,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
-import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.ssl.SslFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.apache.vysper.mina.MinaBackedSessionContext;
import org.apache.vysper.mina.StanzaLoggingFilter;
@@ -48,6 +45,7 @@ import org.apache.vysper.xmpp.modules.extension.xep0220_server_dailback.DbResult
import org.apache.vysper.xmpp.modules.extension.xep0220_server_dailback.DbVerifyHandler;
import org.apache.vysper.xmpp.modules.extension.xep0220_server_dailback.DialbackIdGenerator;
import org.apache.vysper.xmpp.protocol.NamespaceURIs;
+import org.apache.vysper.xmpp.protocol.ProtocolException;
import org.apache.vysper.xmpp.protocol.ResponseStanzaContainer;
import org.apache.vysper.xmpp.protocol.SessionStateHolder;
import org.apache.vysper.xmpp.protocol.StanzaHandler;
@@ -70,10 +68,17 @@ import org.slf4j.LoggerFactory;
public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerConnector {
private static final Logger LOG = LoggerFactory.getLogger(DefaultXMPPServerConnector.class);
+
+ private final static List<StanzaHandler> S2S_HANDSHAKE_HANDLERS = Arrays.asList(
+ new DbVerifyHandler(),
+ new DbResultHandler(),
+ new TlsProceedHandler(),
+ new FeaturesHandler()
+ );
private ServerRuntimeContext serverRuntimeContext;
private MinaBackedSessionContext sessionContext;
- private Entity otherServer;
+ private Entity remoteServer;
private SessionStateHolder sessionStateHolder = new SessionStateHolder();
private IoConnector connector;
@@ -89,10 +94,12 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
private SessionStateHolder dialbackSessionStateHolder;
private Timer pingTimer;
-
- public DefaultXMPPServerConnector(Entity otherServer, ServerRuntimeContext serverRuntimeContext, SessionContext dialbackSessionContext, SessionStateHolder dialbackSessionStateHolder) {
+ protected ServerConnectorIoHandler serverConnectorIoHandler;
+ protected final CountDownLatch authenticatedLatch = new CountDownLatch(1);
+
+ public DefaultXMPPServerConnector(Entity remoteServer, ServerRuntimeContext serverRuntimeContext, SessionContext dialbackSessionContext, SessionStateHolder dialbackSessionStateHolder) {
this.serverRuntimeContext = serverRuntimeContext;
- this.otherServer = otherServer;
+ this.remoteServer = remoteServer;
this.dialbackSessionContext = dialbackSessionContext;
this.dialbackSessionStateHolder = dialbackSessionStateHolder;
}
@@ -101,29 +108,26 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
* Connect and authenticate the XMPP server connector
*/
public synchronized void start() throws RemoteServerNotFoundException, RemoteServerTimeoutException {
- LOG.info("Starting XMPP server connector to {}", otherServer);
+ LOG.info("Starting XMPP server connector to {}", remoteServer);
- // make this method synchronous
- final CountDownLatch authenticatedLatch = new CountDownLatch(1);
-
boolean successfullyConnected = false;
XmppEndpointResolver resolver = new XmppEndpointResolver();
- List<ResolvedAddress> addresses = resolver.resolveXmppServer(otherServer.getDomain());
+ List<ResolvedAddress> addresses = resolver.resolveXmppServer(remoteServer.getDomain());
Throwable lastException = null;
if(!addresses.isEmpty()) {
- LOG.info("resolved {} address(es) for {}", addresses.size(), otherServer);
+ LOG.info("resolved {} address(es) for {}", addresses.size(), remoteServer);
for(ResolvedAddress address : addresses) {
final InetSocketAddress ipAddress = address.getAddress();
- LOG.info("Connecting to XMPP server {} at {}", otherServer, ipAddress);
+ LOG.info("Connecting to XMPP server {} at {}", remoteServer, ipAddress);
- connector = createConnector(authenticatedLatch);
+ connector = createConnector();
ConnectFuture connectFuture = connector.connect(ipAddress);
if(connectFuture.awaitUninterruptibly(connectTimeout) && connectFuture.isConnected()) {
// success on the TCP/IP level, now wait for the XMPP handshake
- LOG.info("XMPP server {} connected at {}", otherServer, ipAddress);
+ LOG.info("XMPP server {} connected at {}", remoteServer, ipAddress);
try {
if(authenticatedLatch.await(xmppHandshakeTimeout, TimeUnit.MILLISECONDS)) {
// success, break out of connect loop
@@ -131,15 +135,15 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
break;
} else {
// attempt next
- LOG.warn("XMPP handshake with {} at {} timed out", otherServer, ipAddress);
+ LOG.warn("XMPP handshake with {} at {} timed out", remoteServer, ipAddress);
}
} catch (InterruptedException e) {
- throw new RemoteServerTimeoutException("Connection to " + otherServer + " was interrupted", e);
+ throw new RemoteServerTimeoutException("Connection to " + remoteServer + " was interrupted", e);
}
}
lastException = connectFuture.getException();
- LOG.warn("Failed connecting to XMPP server " + otherServer + " at " + ipAddress, connectFuture.getException());
+ LOG.warn("Failed connecting to XMPP server " + remoteServer + " at " + ipAddress, connectFuture.getException());
disposeAndNullifyConnector();
}
} else {
@@ -148,7 +152,7 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
}
if(!successfullyConnected) {
- String exceptionMsg = "Failed to connect to XMPP server at " + otherServer;
+ String exceptionMsg = "Failed to connect to XMPP server at " + remoteServer;
if(lastException instanceof UnresolvedAddressException) {
throw new RemoteServerNotFoundException(exceptionMsg);
@@ -166,13 +170,14 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
connector = null;
}
- private NioSocketConnector createConnector(CountDownLatch authenticatedLatch) {
+ private NioSocketConnector createConnector() {
NioSocketConnector connector = new NioSocketConnector();
DefaultIoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
filterChainBuilder.addLast("xmppCodec", new ProtocolCodecFilter(new XMPPProtocolCodecFactory()));
filterChainBuilder.addLast("loggingFilter", new StanzaLoggingFilter());
connector.setFilterChainBuilder(filterChainBuilder);
- connector.setHandler(new ConnectorIoHandler(authenticatedLatch));
+ serverConnectorIoHandler = new ServerConnectorIoHandler(remoteServer, this);
+ connector.setHandler(serverConnectorIoHandler);
return connector;
}
@@ -184,6 +189,105 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
}
}
+ private StanzaHandler lookupS2SHandler(Stanza stanza) {
+ for (StanzaHandler handler : S2S_HANDSHAKE_HANDLERS) {
+ if (handler.verify(stanza)) {
+ return handler;
+ }
+ }
+ return null;
+ }
+
+ public void handleReceivedStanza(Stanza stanza) {
+
+ // check for basic stanza handlers
+ StanzaHandler s2sHandler = lookupS2SHandler(stanza);
+
+ if(s2sHandler != null) {
+ ResponseStanzaContainer container;
+ try {
+ container = s2sHandler.execute(stanza, serverRuntimeContext, false, sessionContext, sessionStateHolder);
+ } catch (ProtocolException e) {
+ return;
+ }
+ if(container != null && container.hasResponse()) {
+ sessionContext.write(container.getResponseStanza());
+ }
+
+ if(sessionStateHolder.getState() == SessionState.AUTHENTICATED) {
+ LOG.info("XMPP server connector to {} authenticated", remoteServer);
+ authenticatedLatch.countDown();
+
+ // connection established, start pinging
+ startPinging();
+ }
+ // none of the handlers matched, stream start is handled separately
+ } else if(stanza.getName().equals("stream")) {
+ sessionContext.setSessionId(stanza.getAttributeValue("id"));
+ sessionContext.setInitiatingEntity(remoteServer);
+
+ String version = stanza.getAttributeValue("version");
+ if(version == null) {
+ // old protocol, assume dialback
+ String dailbackId = new DialbackIdGenerator().generate(remoteServer, serverRuntimeContext.getServerEnitity(), sessionContext.getSessionId());
+
+ Stanza dbResult = new StanzaBuilder("result", NamespaceURIs.JABBER_SERVER_DIALBACK, "db")
+ .addAttribute("from", serverRuntimeContext.getServerEnitity().getDomain())
+ .addAttribute("to", remoteServer.getDomain())
+ .addText(dailbackId)
+ .build();
+ write(dbResult);
+ }
+
+ if(dialbackSessionContext != null) {
+ // connector is being used for dialback verification, don't do further authentication
+ sessionContext.putAttribute("DIALBACK_SESSION_CONTEXT", dialbackSessionContext);
+ sessionContext.putAttribute("DIALBACK_SESSION_STATE_HOLDER", dialbackSessionStateHolder);
+
+ sessionContext.setInitiatingEntity(remoteServer);
+ sessionStateHolder.setState(SessionState.AUTHENTICATED);
+ authenticatedLatch.countDown();
+ }
+ } else {
+
+ if(sessionStateHolder.getState() != SessionState.AUTHENTICATED) {
+ LOG.warn("regular stanza sent before s2s session to {} was authenticated, closing", remoteServer);
+ sessionContext.close();
+ return;
+ }
+ // only deliver messages to directly server directly
+ if (!serverRuntimeContext.getServerEnitity().equals(stanza.getTo())) {
+ LOG.info("not handling messages to clients here received from {} to {}", remoteServer, stanza.getTo());
+ sessionContext.close();
+ return;
+ }
+
+ serverRuntimeContext.getStanzaProcessor().processStanza(serverRuntimeContext, sessionContext, stanza, sessionStateHolder);
+ }
+ }
+
+ public void handleSessionSecured() {
+ // connection secured, send stream opener
+ sessionStateHolder.setState(SessionState.ENCRYPTED);
+
+ LOG.info("XMPP server connector to {} secured using TLS", remoteServer);
+ LOG.debug("XMPP server connector to {} restarting stream", remoteServer);
+
+ sessionContext.setIsReopeningXMLStream();
+
+ Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), remoteServer, XMPPVersion.VERSION_1_0, sessionContext);
+
+ sessionContext.write(opener);
+ }
+
+ public void handleSessionOpened(IoSession session) {
+ LOG.info("XMPP server session opened to {}", remoteServer);
+ sessionContext = new MinaBackedSessionContext(serverRuntimeContext, sessionStateHolder, session);
+ sessionStateHolder.setState(SessionState.INITIATED);
+ Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), remoteServer, XMPPVersion.VERSION_1_0, sessionContext);
+ sessionContext.write(opener);
+ }
+
/**
* {@inheritDoc}
*/
@@ -195,13 +299,14 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
* {@inheritDoc}
*/
public void close() {
+ LOG.info("XMPP server connector socket closed, closing connector (current status: {})", closed ? "closed" : "open");
try {
if(!closed) {
- LOG.info("XMPP server connector to {} closing", otherServer);
+ LOG.info("XMPP server connector to {} is closing", remoteServer);
if (pingTimer != null) pingTimer.cancel();
sessionContext.close();
disposeAndNullifyConnector();
- LOG.info("XMPP server connector to {} closed", otherServer);
+ LOG.info("XMPP server connector to {} is closed", remoteServer);
}
} finally {
closed = true;
@@ -219,7 +324,7 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
* {@inheritDoc}
*/
public void timeout() {
- LOG.debug("XMPP server connector to {} timed out, closing", otherServer);
+ LOG.debug("XMPP server connector to {} timed out, closing", remoteServer);
close();
}
@@ -231,153 +336,12 @@ public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerC
return closed;
}
- private final class ConnectorIoHandler extends IoHandlerAdapter {
-
- private final List<StanzaHandler> handlers = Arrays.asList(
- new DbVerifyHandler(),
- new DbResultHandler(),
- new TlsProceedHandler(),
- new FeaturesHandler()
- );
-
- private final CountDownLatch authenticatedLatch;
-
- private ConnectorIoHandler(CountDownLatch authenticatedLatch) {
- this.authenticatedLatch = authenticatedLatch;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
- if (cause instanceof IOException) {
- if (cause instanceof javax.net.ssl.SSLHandshakeException) {
- LOG.warn("failed to complete SSL handshake with server {}: {}", otherServer, cause.getMessage());
- } else if (cause instanceof javax.net.ssl.SSLException) {
- LOG.warn("failure in SSL with server {}: {}", otherServer, cause.getMessage());
- } else {
- LOG.info("I/O exception with server {}: {}", otherServer, cause.getMessage());
- }
- close();
- } else {
- LOG.warn("Exception {} thrown by XMPP server connector to " + otherServer + ", probably a bug in Vysper: {}", cause.getClass().getName(), cause.getMessage());
- }
- }
-
- private StanzaHandler lookupHandler(Stanza stanza) {
- for (StanzaHandler handler : handlers) {
- if (handler.verify(stanza)) {
- return handler;
- }
- }
- return null;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void messageReceived(IoSession session, Object message) throws Exception {
- if(message == SslFilter.SESSION_SECURED) {
- // connection secured, send stream opener
- sessionStateHolder.setState(SessionState.ENCRYPTED);
-
- LOG.info("XMPP server connector to {} secured using TLS", otherServer);
- LOG.debug("XMPP server connector to {} restarting stream", otherServer);
-
- sessionContext.setIsReopeningXMLStream();
-
- Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
-
- sessionContext.write(opener);
- } else if(message == SslFilter.SESSION_UNSECURED) {
- // unsecured, closing
- close();
- } else if(message instanceof Stanza) {
- Stanza stanza = (Stanza) message;
-
- // check for basic stanza handlers
- StanzaHandler handler = lookupHandler(stanza);
-
- if(handler != null) {
- ResponseStanzaContainer container = handler.execute(stanza, serverRuntimeContext, false, sessionContext, sessionStateHolder);
- if(container != null && container.hasResponse()) {
- sessionContext.write(container.getResponseStanza());
- }
-
- if(sessionStateHolder.getState() == SessionState.AUTHENTICATED) {
- LOG.info("XMPP server connector to {} authenticated", otherServer);
- authenticatedLatch.countDown();
-
- // connection established, start pinging
- startPinging();
- }
- // none of the handlers matched, stream start is handled separately
- } else if(stanza.getName().equals("stream")) {
- sessionContext.setSessionId(stanza.getAttributeValue("id"));
- sessionContext.setInitiatingEntity(otherServer);
-
- String version = stanza.getAttributeValue("version");
- if(version == null) {
- // old protocol, assume dialback
- String dailbackId = new DialbackIdGenerator().generate(otherServer, serverRuntimeContext.getServerEnitity(), sessionContext.getSessionId());
-
- Stanza dbResult = new StanzaBuilder("result", NamespaceURIs.JABBER_SERVER_DIALBACK, "db")
- .addAttribute("from", serverRuntimeContext.getServerEnitity().getDomain())
- .addAttribute("to", otherServer.getDomain())
- .addText(dailbackId)
- .build();
- write(dbResult);
- }
-
- if(dialbackSessionContext != null) {
- // connector is being used for dialback verification, don't do further authentication
- sessionContext.putAttribute("DIALBACK_SESSION_CONTEXT", dialbackSessionContext);
- sessionContext.putAttribute("DIALBACK_SESSION_STATE_HOLDER", dialbackSessionStateHolder);
-
- sessionContext.setInitiatingEntity(otherServer);
- sessionStateHolder.setState(SessionState.AUTHENTICATED);
- authenticatedLatch.countDown();
- }
- } else {
- // TODO other stanzas coming here?
- if (message != null) LOG.warn("unhandled stanza in S2S ConnectorIoHandler: " + message);
- }
- } else {
- throw new RuntimeException("Only handles SSL events and stanzas, got: " + message.getClass());
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void sessionClosed(IoSession session) throws Exception {
- // Socket was closed, make sure we close the connector
- LOG.info("XMPP server connector socket closed, closing connector");
- close();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void sessionOpened(IoSession session) throws Exception {
- LOG.info("XMPP server session opened to {}", otherServer);
- sessionContext = new MinaBackedSessionContext(serverRuntimeContext, sessionStateHolder, session);
- sessionStateHolder.setState(SessionState.INITIATED);
- Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
-
- sessionContext.write(opener);
- }
- }
-
private class PingTask extends TimerTask {
+ @Override
public void run() {
XmppPingModule pingModule = serverRuntimeContext.getModule(XmppPingModule.class);
- LOG.info("pinging federated XMPP server {}", otherServer);
- pingModule.ping(DefaultXMPPServerConnector.this, serverRuntimeContext.getServerEnitity(), otherServer, pingTimeout, DefaultXMPPServerConnector.this);
+ LOG.info("pinging federated XMPP server {}", remoteServer);
+ pingModule.ping(DefaultXMPPServerConnector.this, serverRuntimeContext.getServerEnitity(), remoteServer, pingTimeout, DefaultXMPPServerConnector.this);
}
}
}
http://git-wip-us.apache.org/repos/asf/mina-vysper/blob/ef2276ca/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnectorRegistry.java
----------------------------------------------------------------------
diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnectorRegistry.java b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnectorRegistry.java
index bda53c4..3cbec0d 100644
--- a/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnectorRegistry.java
+++ b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/DefaultXMPPServerConnectorRegistry.java
@@ -28,6 +28,8 @@ import org.apache.vysper.xmpp.delivery.failure.RemoteServerTimeoutException;
import org.apache.vysper.xmpp.protocol.SessionStateHolder;
import org.apache.vysper.xmpp.server.ServerRuntimeContext;
import org.apache.vysper.xmpp.server.SessionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Default implementation of {@link XMPPServerConnectorRegistry}
@@ -36,6 +38,8 @@ import org.apache.vysper.xmpp.server.SessionContext;
*/
public class DefaultXMPPServerConnectorRegistry implements XMPPServerConnectorRegistry {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultXMPPServerConnectorRegistry.class);
+
private ServerRuntimeContext serverRuntimeContext;
private Map<Entity, XMPPServerConnector> connectors = new ConcurrentHashMap<Entity, XMPPServerConnector>();
@@ -56,10 +60,14 @@ public class DefaultXMPPServerConnectorRegistry implements XMPPServerConnectorRe
}
if(connector == null) {
+ LOG.debug("starting s2s connector to " + server);
connector = createConnector(server, serverRuntimeContext, null, null);
- connector.start();
+ // TODO start async, because we are in a synchronized method
+ connector.start(); // this can take some time to complete
+ LOG.debug("started s2s connector to " + server);
connectors.put(server, connector);
+ LOG.debug("s2s connector registry size is now " + connectors.size());
}
return connector;
@@ -67,8 +75,10 @@ public class DefaultXMPPServerConnectorRegistry implements XMPPServerConnectorRe
@SpecCompliant(spec = "draft-ietf-xmpp-3920bis-22", section = "10.4", status = SpecCompliant.ComplianceStatus.FINISHED, coverage = SpecCompliant.ComplianceCoverage.COMPLETE)
public synchronized XMPPServerConnector connectForDialback(Entity server, SessionContext orginalSessionContext, SessionStateHolder originalSessionStateHolder) throws RemoteServerNotFoundException, RemoteServerTimeoutException {
+ LOG.debug("starting s2s connector for dialback to " + server);
XMPPServerConnector connector = createConnector(server, serverRuntimeContext, orginalSessionContext, originalSessionStateHolder);
connector.start();
+ LOG.debug("started s2s connector for dialback to " + server);
return connector;
}
@@ -79,9 +89,14 @@ public class DefaultXMPPServerConnectorRegistry implements XMPPServerConnectorRe
/* (non-Javadoc)
* @see org.apache.vysper.xmpp.server.s2s.XMPPServerConnectorRegistry#close()
*/
- public void close() {
+ public synchronized void close() {
+ LOG.debug("closing now all {} s2s connectors", connectors.size());
for(XMPPServerConnector connector : connectors.values()) {
- connector.close();
+ try {
+ connector.close();
+ } catch (Throwable e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
}
}
http://git-wip-us.apache.org/repos/asf/mina-vysper/blob/ef2276ca/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/ServerConnectorIoHandler.java
----------------------------------------------------------------------
diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/ServerConnectorIoHandler.java b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/ServerConnectorIoHandler.java
new file mode 100644
index 0000000..e1ed160
--- /dev/null
+++ b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/ServerConnectorIoHandler.java
@@ -0,0 +1,81 @@
+package org.apache.vysper.xmpp.server.s2s;
+
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.ssl.SslFilter;
+import org.apache.vysper.xmpp.addressing.Entity;
+import org.apache.vysper.xmpp.stanza.Stanza;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * handler for server-to-server connections
+*/
+public class ServerConnectorIoHandler extends IoHandlerAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ServerConnectorIoHandler.class);
+
+ protected final Entity remoteServer;
+ protected final XMPPServerConnector serverConnector;
+
+ ServerConnectorIoHandler(Entity remoteServer, XMPPServerConnector serverConnector) {
+ this.remoteServer = remoteServer;
+ this.serverConnector = serverConnector;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+ if (cause instanceof IOException) {
+ if (cause instanceof javax.net.ssl.SSLHandshakeException) {
+ LOG.warn("failed to complete SSL handshake with server {}: {}", remoteServer, cause.getMessage());
+ } else if (cause instanceof javax.net.ssl.SSLException) {
+ LOG.warn("failure in SSL with server {}: {}", remoteServer, cause.getMessage());
+ } else {
+ LOG.info("I/O exception with server {}: {}", remoteServer, cause.getMessage());
+ }
+ serverConnector.close();
+ } else {
+ LOG.warn("Exception {} thrown by XMPP server connector to " + remoteServer + ", probably a bug in Vysper: {}", cause.getClass().getName(), cause.getMessage());
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void messageReceived(IoSession session, Object message) {
+ if(message == SslFilter.SESSION_SECURED) {
+ serverConnector.handleSessionSecured();
+ } else if(message == SslFilter.SESSION_UNSECURED) {
+ // unsecured, closing
+ serverConnector.close();
+ } else if(message instanceof Stanza) {
+ Stanza stanza = (Stanza) message;
+ serverConnector.handleReceivedStanza(stanza);
+ } else {
+ throw new RuntimeException("Only handles SSL events and stanzas, got: " + message.getClass());
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void sessionClosed(IoSession session) throws Exception {
+ // Socket was closed, make sure we close the connector
+ serverConnector.close();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void sessionOpened(IoSession session) throws Exception {
+ serverConnector.handleSessionOpened(session);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mina-vysper/blob/ef2276ca/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
----------------------------------------------------------------------
diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
index f7308c9..7670e3f 100644
--- a/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
+++ b/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
@@ -19,6 +19,7 @@
*/
package org.apache.vysper.xmpp.server.s2s;
+import org.apache.mina.core.session.IoSession;
import org.apache.vysper.xmpp.delivery.failure.RemoteServerNotFoundException;
import org.apache.vysper.xmpp.delivery.failure.RemoteServerTimeoutException;
import org.apache.vysper.xmpp.stanza.Stanza;
@@ -49,4 +50,9 @@ public interface XMPPServerConnector extends StanzaWriter {
*/
void write(Stanza stanza);
+ void handleReceivedStanza(Stanza stanza);
+
+ void handleSessionSecured();
+
+ void handleSessionOpened(IoSession session);
}
\ No newline at end of file