You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/08/27 09:20:53 UTC
svn commit: r1377586 - in /camel/trunk/components/camel-xmpp/src:
main/java/org/apache/camel/component/xmpp/
test/java/org/apache/camel/component/xmpp/
Author: davsclaus
Date: Mon Aug 27 07:20:52 2012
New Revision: 1377586
URL: http://svn.apache.org/viewvc?rev=1377586&view=rev
Log:
CAMEL-4224: Added options to lazy connect and repair connections to xmpp server. Thanks to Rich Newcomb for the patch.
Added:
camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/EmbeddedXmppTestServer.java (with props)
camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppDeferredConnectionTest.java (with props)
camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRobustConnectionTest.java (with props)
Modified:
camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java
camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppGroupChatProducer.java
camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java
Modified: camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java?rev=1377586&r1=1377585&r2=1377586&view=diff
==============================================================================
--- camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java (original)
+++ camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java Mon Aug 27 07:20:52 2012
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,11 +39,11 @@ public class XmppComponent extends Defau
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
if (endpointCache.containsKey(uri)) {
- LOG.debug("Using cached endpoint for URI {}", uri);
+ LOG.debug("Using cached endpoint for URI {}", URISupport.sanitizeUri(uri));
return endpointCache.get(uri);
}
- LOG.debug("Creating new endpoint for URI {}", uri);
+ LOG.debug("Creating new endpoint for URI {}", URISupport.sanitizeUri(uri));
XmppEndpoint endpoint = new XmppEndpoint(uri, this);
URI u = new URI(uri);
Modified: camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java?rev=1377586&r1=1377585&r2=1377586&view=diff
==============================================================================
--- camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java (original)
+++ camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java Mon Aug 27 07:20:52 2012
@@ -16,9 +16,12 @@
*/
package org.apache.camel.component.xmpp;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.URISupport;
import org.jivesoftware.smack.Chat;
import org.jivesoftware.smack.ChatManager;
import org.jivesoftware.smack.ChatManagerListener;
@@ -26,6 +29,7 @@ import org.jivesoftware.smack.MessageLis
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.XMPPConnection;
+import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.AndFilter;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.filter.ToContainsFilter;
@@ -49,6 +53,7 @@ public class XmppConsumer extends Defaul
private Chat privateChat;
private ChatManager chatManager;
private XMPPConnection connection;
+ private ScheduledExecutorService scheduledExecutor;
public XmppConsumer(XmppEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -57,7 +62,18 @@ public class XmppConsumer extends Defaul
@Override
protected void doStart() throws Exception {
- connection = endpoint.createConnection();
+ try {
+ connection = endpoint.createConnection();
+ } catch (XMPPException e) {
+ if (endpoint.isTestConnectionOnStartup()) {
+ throw new RuntimeException("Could not connect to XMPP server.", e);
+ } else {
+ LOG.warn(XmppEndpoint.getXmppExceptionLogMessage(e));
+ scheduleDelayedStart();
+ return;
+ }
+ }
+
chatManager = connection.getChatManager();
chatManager.addChatListener(this);
@@ -93,9 +109,59 @@ public class XmppConsumer extends Defaul
}
}
+ this.startRobustConnectionMonitor();
super.doStart();
}
+ protected void scheduleDelayedStart() throws Exception {
+ Runnable startRunnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ doStart();
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ }
+ }
+ };
+ LOG.info("Delaying XMPP consumer startup for endpoint {}. Trying again in {} seconds.",
+ URISupport.sanitizeUri(endpoint.getEndpointUri()), endpoint.getConnectionPollDelay());
+ getExecutor().schedule(startRunnable, endpoint.getConnectionPollDelay(), TimeUnit.SECONDS);
+ }
+
+ private void startRobustConnectionMonitor() throws Exception {
+ Runnable connectionCheckRunnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ checkConnection();
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ }
+ }
+ };
+ // background thread to detect and repair lost connections
+ getExecutor().scheduleAtFixedRate(connectionCheckRunnable, endpoint.getConnectionPollDelay(),
+ endpoint.getConnectionPollDelay(), TimeUnit.SECONDS);
+ }
+
+ private void checkConnection() throws Exception {
+ if (!connection.isConnected()) {
+ LOG.info("Attempting to reconnect to: {}", XmppEndpoint.getConnectionMessage(connection));
+ try {
+ connection.connect();
+ } catch (XMPPException e) {
+ LOG.warn(XmppEndpoint.getXmppExceptionLogMessage(e));
+ }
+ }
+ }
+ private ScheduledExecutorService getExecutor() {
+ if (this.scheduledExecutor == null) {
+ scheduledExecutor = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "connectionPoll");
+ }
+ return scheduledExecutor;
+ }
+
@Override
protected void doStop() throws Exception {
super.doStop();
@@ -110,6 +176,10 @@ public class XmppConsumer extends Defaul
if (connection != null && connection.isConnected()) {
connection.disconnect();
}
+ if (scheduledExecutor != null) {
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutor);
+ scheduledExecutor = null;
+ }
}
public void chatCreated(Chat chat, boolean createdLocally) {
Modified: camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java?rev=1377586&r1=1377585&r2=1377586&view=diff
==============================================================================
--- camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java (original)
+++ camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java Mon Aug 27 07:20:52 2012
@@ -36,6 +36,7 @@ import org.jivesoftware.smack.XMPPExcept
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
+import org.jivesoftware.smack.packet.XMPPError;
import org.jivesoftware.smackx.muc.MultiUserChat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +62,8 @@ public class XmppEndpoint extends Defaul
private String nickname;
private String serviceName;
private XMPPConnection connection;
+ private boolean testConnectionOnStartup = true;
+ private int connectionPollDelay = 10;
public XmppEndpoint() {
}
@@ -112,7 +115,7 @@ public class XmppEndpoint extends Defaul
exchange.setIn(new XmppMessage(message));
return exchange;
}
-
+
@Override
protected String createEndpointUri() {
return "xmpp://" + host + ":" + port + "/" + getParticipant() + "?serviceName=" + serviceName;
@@ -122,20 +125,22 @@ public class XmppEndpoint extends Defaul
return true;
}
- public XMPPConnection createConnection() throws XMPPException {
+ public synchronized XMPPConnection createConnection() throws XMPPException {
- if (connection != null) {
+ if (connection != null && connection.isConnected()) {
return connection;
}
- if (port > 0) {
- if (getServiceName() == null) {
- connection = new XMPPConnection(new ConnectionConfiguration(host, port));
+ if (connection == null) {
+ if (port > 0) {
+ if (getServiceName() == null) {
+ connection = new XMPPConnection(new ConnectionConfiguration(host, port));
+ } else {
+ connection = new XMPPConnection(new ConnectionConfiguration(host, port, serviceName));
+ }
} else {
- connection = new XMPPConnection(new ConnectionConfiguration(host, port, serviceName));
+ connection = new XMPPConnection(host);
}
- } else {
- connection = new XMPPConnection(host);
}
connection.connect();
@@ -183,7 +188,6 @@ public class XmppEndpoint extends Defaul
return connection;
}
-
/*
* If there is no "@" symbol in the room, find the chat service JID and
* return fully qualified JID for the room as room@conference.server.domain
@@ -206,10 +210,29 @@ public class XmppEndpoint extends Defaul
return room + "@" + chatServer;
}
+ public String getConnectionDescription() {
+ return host + ":" + port + "/" + serviceName;
+ }
+
public static String getConnectionMessage(XMPPConnection connection) {
return connection.getHost() + ":" + connection.getPort() + "/" + connection.getServiceName();
}
+ public static String getXmppExceptionLogMessage(XMPPException e) {
+ XMPPError xmppError = e.getXMPPError();
+ Throwable t = e.getWrappedThrowable();
+ StringBuffer strBuff = new StringBuffer();
+ if (xmppError != null) {
+ strBuff.append("[ ").append(xmppError.getCode()).append(" ] ")
+ .append(xmppError.getCondition()).append(" : ")
+ .append(xmppError.getMessage());
+ }
+ if (t != null) {
+ strBuff.append(" ( ").append(e.getWrappedThrowable().getMessage()).append(" )");
+ }
+ return strBuff.toString();
+ }
+
public String getChatId() {
return "Chat:" + getParticipant() + ":" + getUser();
}
@@ -319,7 +342,7 @@ public class XmppEndpoint extends Defaul
public String getServiceName() {
return serviceName;
}
-
+
public HeaderFilterStrategy getHeaderFilterStrategy() {
return headerFilterStrategy;
}
@@ -328,6 +351,22 @@ public class XmppEndpoint extends Defaul
this.headerFilterStrategy = headerFilterStrategy;
}
+ public boolean isTestConnectionOnStartup() {
+ return testConnectionOnStartup;
+ }
+
+ public void setTestConnectionOnStartup(boolean testConnectionOnStartup) {
+ this.testConnectionOnStartup = testConnectionOnStartup;
+ }
+
+ public int getConnectionPollDelay() {
+ return connectionPollDelay;
+ }
+
+ public void setConnectionPollDelay(int connectionPollDelay) {
+ this.connectionPollDelay = connectionPollDelay;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
Modified: camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppGroupChatProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppGroupChatProducer.java?rev=1377586&r1=1377585&r2=1377586&view=diff
==============================================================================
--- camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppGroupChatProducer.java (original)
+++ camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppGroupChatProducer.java Mon Aug 27 07:20:52 2012
@@ -44,6 +44,23 @@ public class XmppGroupChatProducer exten
}
public void process(Exchange exchange) {
+
+ if (connection == null) {
+ try {
+ connection = endpoint.createConnection();
+ } catch (XMPPException e) {
+ throw new RuntimeExchangeException("Could not connect to XMPP server.", exchange, e);
+ }
+ }
+
+ if (chat == null) {
+ try {
+ initializeChat();
+ } catch (Exception e) {
+ throw new RuntimeExchangeException("Could not initialize XMPP chat.", exchange, e);
+ }
+ }
+
Message message = chat.createMessage();
message.setTo(room);
message.setFrom(endpoint.getUser());
@@ -52,10 +69,7 @@ public class XmppGroupChatProducer exten
try {
// make sure we are connected
if (!connection.isConnected()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reconnecting to: {}", XmppEndpoint.getConnectionMessage(connection));
- }
- connection.connect();
+ this.reconnect();
}
if (LOG.isDebugEnabled()) {
@@ -66,16 +80,41 @@ public class XmppGroupChatProducer exten
// otherwise the client local queue will fill up (CAMEL-1467)
chat.pollMessage();
} catch (XMPPException e) {
- throw new RuntimeExchangeException("Cannot send XMPP message: " + message, exchange, e);
+ throw new RuntimeExchangeException("Could not send XMPP message: " + message, exchange, e);
+ }
+ }
+
+ private synchronized void reconnect() throws XMPPException {
+ if (!connection.isConnected()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reconnecting to: {}", XmppEndpoint.getConnectionMessage(connection));
+ }
+ connection.connect();
}
}
@Override
protected void doStart() throws Exception {
if (connection == null) {
- connection = endpoint.createConnection();
+ try {
+ connection = endpoint.createConnection();
+ } catch (XMPPException e) {
+ if (endpoint.isTestConnectionOnStartup()) {
+ throw new RuntimeException("Could not connect to XMPP server: " + endpoint.getConnectionDescription(), e);
+ } else {
+ LOG.warn("Could not connect to XMPP server. {} Producer will attempt lazy connection when needed.", XmppEndpoint.getXmppExceptionLogMessage(e));
+ }
+ }
+ }
+
+ if (chat == null && connection != null) {
+ initializeChat();
}
+ super.doStart();
+ }
+
+ protected synchronized void initializeChat() throws XMPPException {
if (chat == null) {
room = endpoint.resolveRoom(connection);
chat = new MultiUserChat(connection, room);
@@ -86,8 +125,6 @@ public class XmppGroupChatProducer exten
LOG.info("Joined room: {} as: {}", room, endpoint.getNickname());
}
}
-
- super.doStart();
}
@Override
Modified: camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java?rev=1377586&r1=1377585&r2=1377586&view=diff
==============================================================================
--- camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java (original)
+++ camel/trunk/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java Mon Aug 27 07:20:52 2012
@@ -37,7 +37,7 @@ public class XmppPrivateChatProducer ext
private final XmppEndpoint endpoint;
private XMPPConnection connection;
private final String participant;
-
+
public XmppPrivateChatProducer(XmppEndpoint endpoint, String participant) {
super(endpoint);
this.endpoint = endpoint;
@@ -48,17 +48,18 @@ public class XmppPrivateChatProducer ext
}
public void process(Exchange exchange) {
+
+ // make sure we are connected
try {
- // make sure we are connected
+ if (connection == null) {
+ connection = endpoint.createConnection();
+ }
+
if (!connection.isConnected()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reconnecting to: {}", XmppEndpoint.getConnectionMessage(connection));
- }
- connection.connect();
+ this.reconnect();
}
} catch (XMPPException e) {
- throw new RuntimeExchangeException("Cannot connect to: "
- + XmppEndpoint.getConnectionMessage(connection), exchange, e);
+ throw new RuntimeException("Could not connect to XMPP server.", e);
}
ChatManager chatManager = connection.getChatManager();
@@ -77,10 +78,10 @@ public class XmppPrivateChatProducer ext
}
chat.sendMessage(message);
} catch (XMPPException xmppe) {
- throw new RuntimeExchangeException("Cannot send XMPP message: to " + endpoint.getParticipant() + " from " + endpoint.getUser() + " : " + message
+ throw new RuntimeExchangeException("Could not send XMPP message: to " + endpoint.getParticipant() + " from " + endpoint.getUser() + " : " + message
+ " to: " + XmppEndpoint.getConnectionMessage(connection), exchange, xmppe);
} catch (Exception e) {
- throw new RuntimeExchangeException("Cannot send XMPP message to " + endpoint.getParticipant() + " from " + endpoint.getUser() + " : " + message
+ throw new RuntimeExchangeException("Could not send XMPP message to " + endpoint.getParticipant() + " from " + endpoint.getUser() + " : " + message
+ " to: " + XmppEndpoint.getConnectionMessage(connection), exchange, e);
}
}
@@ -107,10 +108,27 @@ public class XmppPrivateChatProducer ext
return chat;
}
+ private synchronized void reconnect() throws XMPPException {
+ if (!connection.isConnected()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reconnecting to: {}", XmppEndpoint.getConnectionMessage(connection));
+ }
+ connection.connect();
+ }
+ }
+
@Override
protected void doStart() throws Exception {
if (connection == null) {
- connection = endpoint.createConnection();
+ try {
+ connection = endpoint.createConnection();
+ } catch (XMPPException e) {
+ if (endpoint.isTestConnectionOnStartup()) {
+ throw new RuntimeException("Could not establish connection to XMPP server: " + endpoint.getConnectionDescription(), e);
+ } else {
+ LOG.warn("Could not connect to XMPP server. {} Producer will attempt lazy connection when needed.", XmppEndpoint.getXmppExceptionLogMessage(e));
+ }
+ }
}
super.doStart();
}
Added: camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/EmbeddedXmppTestServer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/EmbeddedXmppTestServer.java?rev=1377586&view=auto
==============================================================================
--- camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/EmbeddedXmppTestServer.java (added)
+++ camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/EmbeddedXmppTestServer.java Mon Aug 27 07:20:52 2012
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.xmpp;
+
+import java.io.InputStream;
+import java.util.Arrays;
+
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.vysper.mina.TCPEndpoint;
+import org.apache.vysper.storage.StorageProviderRegistry;
+import org.apache.vysper.storage.inmemory.MemoryStorageProviderRegistry;
+import org.apache.vysper.xmpp.addressing.Entity;
+import org.apache.vysper.xmpp.addressing.EntityImpl;
+import org.apache.vysper.xmpp.authorization.AccountManagement;
+import org.apache.vysper.xmpp.authorization.Anonymous;
+import org.apache.vysper.xmpp.authorization.SASLMechanism;
+import org.apache.vysper.xmpp.modules.extension.xep0045_muc.MUCModule;
+import org.apache.vysper.xmpp.modules.extension.xep0045_muc.model.Conference;
+import org.apache.vysper.xmpp.modules.extension.xep0045_muc.model.RoomType;
+import org.apache.vysper.xmpp.server.XMPPServer;
+
+
+public final class EmbeddedXmppTestServer {
+
+ private static EmbeddedXmppTestServer instance;
+
+ private XMPPServer xmppServer;
+ private TCPEndpoint endpoint;
+ private int port;
+
+ // restricted to singleton
+ private EmbeddedXmppTestServer() { }
+
+ public static EmbeddedXmppTestServer instance() {
+ if (instance == null) {
+ instance = new EmbeddedXmppTestServer();
+ instance.initializeXmppServer();
+ }
+ return instance;
+ }
+
+ private void initializeXmppServer() {
+ try {
+ if (xmppServer == null) {
+ xmppServer = new XMPPServer("apache.camel");
+
+ StorageProviderRegistry providerRegistry = new MemoryStorageProviderRegistry();
+ AccountManagement accountManagement = (AccountManagement) providerRegistry.retrieve(AccountManagement.class);
+
+ Entity user = EntityImpl.parseUnchecked("camel_consumer@apache.camel");
+ accountManagement.addUser(user, "secret");
+
+ Entity user2 = EntityImpl.parseUnchecked("camel_producer@apache.camel");
+ accountManagement.addUser(user2, "secret");
+
+ Entity user3 = EntityImpl.parseUnchecked("camel_producer1@apache.camel");
+ accountManagement.addUser(user3, "secret");
+
+ xmppServer.setStorageProviderRegistry(providerRegistry);
+
+ if (endpoint == null) {
+ endpoint = new TCPEndpoint();
+ this.port = AvailablePortFinder.getNextAvailable(5222);
+ endpoint.setPort(port);
+ }
+
+ xmppServer.addEndpoint(endpoint);
+
+ InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream("xmppServer.jks");
+ xmppServer.setTLSCertificateInfo(stream, "secret");
+
+ // allow anonymous logins
+ xmppServer.setSASLMechanisms(Arrays.asList(new SASLMechanism[]{new Anonymous()}));
+
+ xmppServer.start();
+
+ // add the multi-user chat module and create a few test rooms
+ Conference conference = new Conference("test conference");
+ conference.createRoom(EntityImpl.parseUnchecked("camel-anon@apache.camel"), "camel-anon", RoomType.FullyAnonymous);
+ conference.createRoom(EntityImpl.parseUnchecked("camel-test@apache.camel"), "camel-test", RoomType.Public);
+ xmppServer.addModule(new MUCModule("conference", conference));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("An error occurred when initializing the XMPP Test Server.", e);
+ }
+ }
+
+ public void startXmppEndpoint() throws Exception {
+ endpoint.start();
+ }
+
+ public void stopXmppEndpoint() {
+ endpoint.stop();
+ }
+
+ public int getXmppPort() {
+ return port;
+ }
+}
Propchange: camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/EmbeddedXmppTestServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/EmbeddedXmppTestServer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppDeferredConnectionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppDeferredConnectionTest.java?rev=1377586&view=auto
==============================================================================
--- camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppDeferredConnectionTest.java (added)
+++ camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppDeferredConnectionTest.java Mon Aug 27 07:20:52 2012
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.xmpp;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Test to verify that the XMPP producer and consumer will create deferred / lazy connections
+ * to the XMPP server when the server is not available upon route initialization. Also verify that
+ * these endpoints will then deliver messages as expected.
+ */
+public class XmppDeferredConnectionTest extends CamelTestSupport {
+
+ /**
+ * Ensures that the XMPP server instance is created and 'stopped' before the camel
+ * routes are initialized
+ */
+ @Override
+ public void doPreSetup() throws Exception {
+ EmbeddedXmppTestServer.instance().stopXmppEndpoint();
+ }
+
+ @Test
+ public void testXmppChatWithDelayedConnection() throws Exception {
+
+ MockEndpoint consumerEndpoint = context.getEndpoint("mock:out", MockEndpoint.class);
+ MockEndpoint simpleEndpoint = context.getEndpoint("mock:simple", MockEndpoint.class);
+
+ consumerEndpoint.setExpectedMessageCount(1);
+ consumerEndpoint.expectedBodiesReceived("Hello again!");
+ simpleEndpoint.setExpectedMessageCount(1);
+
+ MockEndpoint errorEndpoint = context.getEndpoint("mock:error", MockEndpoint.class);
+ errorEndpoint.setExpectedMessageCount(1);
+
+ // this request should fail XMPP delivery because the server is not available
+ template.sendBody("direct:start", "Hello!");
+ consumerEndpoint.assertIsNotSatisfied();
+ errorEndpoint.assertIsSatisfied();
+
+ // this request should be received because it is not going through the XMPP endpoints
+ // verifying that the non-xmpp routes are started
+ template.sendBody("direct:simple", "Hello simple!");
+ simpleEndpoint.assertIsSatisfied();
+
+ EmbeddedXmppTestServer.instance().startXmppEndpoint();
+
+ // wait for the connection to be established
+ Thread.sleep(2000);
+
+ // this request should succeed now that the server is available
+ template.sendBody("direct:start", "Hello again!");
+ consumerEndpoint.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+
+ onException(RuntimeException.class).handled(true).to("mock:error");
+
+ from("direct:start")
+ .to(getProducerUri());
+
+ from(getConsumerUri()).id("test-consumer")
+ .to("mock:out");
+
+ from("direct:simple")
+ .to("mock:simple");
+ }
+ };
+ }
+
+ protected String getProducerUri() {
+ return "xmpp://localhost:" + EmbeddedXmppTestServer.instance().getXmppPort()
+ + "/camel_consumer@apache.camel?user=camel_producer&password=secret&serviceName=apache.camel"
+ + "&testConnectionOnStartup=false";
+ }
+
+ protected String getConsumerUri() {
+ return "xmpp://localhost:" + EmbeddedXmppTestServer.instance().getXmppPort()
+ + "/camel_consumer@apache.camel?user=camel_consumer&password=secret&serviceName=apache.camel"
+ + "&testConnectionOnStartup=false&connectionPollDelay=1";
+ }
+
+}
Propchange: camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppDeferredConnectionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppDeferredConnectionTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRobustConnectionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRobustConnectionTest.java?rev=1377586&view=auto
==============================================================================
--- camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRobustConnectionTest.java (added)
+++ camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRobustConnectionTest.java Mon Aug 27 07:20:52 2012
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.xmpp;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Test to verify that the XMPP consumer will reconnect when the connection is lost.
+ * Also verifies that the XMPP producer will lazily re-establish a lost connection.
+ */
+public class XmppRobustConnectionTest extends CamelTestSupport {
+
+ @Test
+ public void testXmppChatWithRobustConnection() throws Exception {
+ MockEndpoint consumerEndpoint = context.getEndpoint("mock:out", MockEndpoint.class);
+ MockEndpoint errorEndpoint = context.getEndpoint("mock:error", MockEndpoint.class);
+
+ consumerEndpoint.setExpectedMessageCount(10);
+ errorEndpoint.setExpectedMessageCount(5);
+
+ for (int i = 0; i < 5; i++) {
+ template.sendBody("direct:start", "Test message [ " + i + " ]");
+ }
+
+ consumerEndpoint.assertIsNotSatisfied();
+ errorEndpoint.assertIsNotSatisfied();
+
+ EmbeddedXmppTestServer.instance().stopXmppEndpoint();
+ Thread.sleep(2000);
+
+ for (int i = 0; i < 5; i++) {
+ template.sendBody("direct:start", "Test message [ " + i + " ]");
+ }
+
+ errorEndpoint.assertIsSatisfied();
+ consumerEndpoint.assertIsNotSatisfied();
+
+ EmbeddedXmppTestServer.instance().startXmppEndpoint();
+ Thread.sleep(2000);
+
+ for (int i = 0; i < 5; i++) {
+ template.sendBody("direct:start", "Test message [ " + i + " ]");
+ }
+
+ consumerEndpoint.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ onException(RuntimeException.class).handled(true).to("mock:error");
+
+ from("direct:start").id("direct:start")
+ .to(getProducerUri());
+
+ from(getConsumerUri())
+ .to("mock:out");
+ }
+ };
+ }
+
+ protected String getProducerUri() {
+ return "xmpp://localhost:" + EmbeddedXmppTestServer.instance().getXmppPort()
+ + "/camel_consumer@apache.camel?user=camel_producer&password=secret&serviceName=apache.camel";
+ }
+
+ protected String getConsumerUri() {
+ return "xmpp://localhost:" + EmbeddedXmppTestServer.instance().getXmppPort()
+ + "/camel_consumer@apache.camel?user=camel_consumer&password=secret&serviceName=apache.camel"
+ + "&connectionPollDelay=1";
+ }
+}
Propchange: camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRobustConnectionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRobustConnectionTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date