You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/10/25 13:24:57 UTC
svn commit: r467606 - in /incubator/activemq/trunk/activemq-xmpp/src:
main/java/org/apache/activemq/transport/xmpp/
test/java/org/apache/activemq/transport/xmpp/
Author: jstrachan
Date: Wed Oct 25 04:24:56 2006
New Revision: 467606
URL: http://svn.apache.org/viewvc?view=rev&rev=467606
Log:
managed to get the XMPP transport to subscribe to topics and receive messages when tested via the Spark Jabber client
Modified:
incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java
incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java
Modified: incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java?view=diff&rev=467606&r1=467605&r2=467606
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java (original)
+++ incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java Wed Oct 25 04:24:56 2006
@@ -45,6 +45,7 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -76,6 +77,8 @@
private final Map<Integer, Handler<Response>> resposeHandlers = new ConcurrentHashMap<Integer, Handler<Response>>();
private final Map<ConsumerId, Handler<MessageDispatch>> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, Handler<MessageDispatch>>();
+ private final Map<String, ConsumerInfo> jidToConsumerMap = new HashMap<String, ConsumerInfo>();
+
private final Map transactions = new ConcurrentHashMap();
private final Object commnadIdMutex = new Object();
@@ -142,6 +145,9 @@
if (handler != null) {
handler.handle(response);
}
+ else {
+ log.warn("No handler for response: " + response);
+ }
}
else if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) command;
@@ -149,6 +155,9 @@
if (handler != null) {
handler.handle(md);
}
+ else {
+ log.warn("No handler for message: " + md);
+ }
}
}
@@ -160,53 +169,7 @@
Object any = iq.getAny();
if (any instanceof Query) {
- Query query = (Query) any;
- if (log.isDebugEnabled()) {
- log.debug("Iq Auth Query " + debugString(iq) + " resource: " + query.getResource() + " username: " + query.getUsername());
- }
- if (query.getPassword() == null) {
- Iq result = createResult(iq);
- Query required = new Query();
- required.setPassword("");
- required.setUsername("");
- result.setAny(required);
- transport.marshall(result);
- return;
- }
-
- //connectionInfo.setClientId(query.getResource());
- connectionInfo.setUserName(query.getUsername());
- connectionInfo.setPassword(query.getPassword());
-
- // TODO support digest?
-
- if (connectionInfo.getClientId() == null) {
- connectionInfo.setClientId(clientIdGenerator.generateId());
- }
-
- sendToActiveMQ(connectionInfo, new Handler<Response>() {
- public void handle(Response response) throws Exception {
-
- Iq result = createResult(iq);
-
- if (response instanceof ExceptionResponse) {
- ExceptionResponse exceptionResponse = (ExceptionResponse) response;
- jabber.client.Error error = new jabber.client.Error();
- result.setError(error);
-
- StringWriter buffer = new StringWriter();
- exceptionResponse.getException().printStackTrace(new PrintWriter(buffer));
- error.setInternalServerError(buffer.toString());
- }
- else {
- connected.set(true);
- }
- transport.marshall(result);
-
- sendToActiveMQ(sessionInfo, null);
- sendToActiveMQ(producerInfo, null);
- }
- });
+ onAuthQuery(any, iq);
}
else if (any instanceof jabber.iq._private.Query) {
@@ -255,6 +218,60 @@
}
}
+ protected void onAuthQuery(Object any, final Iq iq) throws IOException {
+ Query query = (Query) any;
+ if (log.isDebugEnabled()) {
+ log.debug("Iq Auth Query " + debugString(iq) + " resource: " + query.getResource() + " username: " + query.getUsername());
+ }
+ if (query.getPassword() == null) {
+ Iq result = createResult(iq);
+ Query required = new Query();
+ required.setPassword("");
+ required.setUsername("");
+ result.setAny(required);
+ transport.marshall(result);
+ return;
+ }
+
+ //connectionInfo.setClientId(query.getResource());
+ connectionInfo.setUserName(query.getUsername());
+ connectionInfo.setPassword(query.getPassword());
+
+ // TODO support digest?
+
+ if (connectionInfo.getClientId() == null) {
+ connectionInfo.setClientId(clientIdGenerator.generateId());
+ }
+
+ sendToActiveMQ(connectionInfo, new Handler<Response>() {
+ public void handle(Response response) throws Exception {
+
+ Iq result = createResult(iq);
+
+ if (response instanceof ExceptionResponse) {
+ ExceptionResponse exceptionResponse = (ExceptionResponse) response;
+ Throwable exception = exceptionResponse.getException();
+
+ log.warn("Failed to create connection: " + exception, exception);
+
+ Error error = new Error();
+ result.setError(error);
+
+ StringWriter buffer = new StringWriter();
+ exception.printStackTrace(new PrintWriter(buffer));
+ error.setInternalServerError(buffer.toString());
+ }
+ else {
+ connected.set(true);
+ }
+ transport.marshall(result);
+
+ sendToActiveMQ(sessionInfo, createErrorHandler("create sesssion"));
+ sendToActiveMQ(producerInfo, createErrorHandler("create producer"));
+ }
+ });
+ }
+
protected String debugString(Iq iq) {
return " to: " + iq.getTo() + " type: " + iq.getType() + " from: " + iq.getFrom() + " id: " + iq.getId();
}
@@ -325,20 +342,101 @@
transport.marshall(result);
}
- protected void onPresence(Presence presence) throws IOException {
+ protected void onPresence(Presence presence) throws IOException, JMSException {
if (log.isDebugEnabled()) {
log.debug("Presence: " + presence.getFrom() + " id: " + presence.getId() + " to: " + presence.getTo() + " type: " + presence.getType()
+ " showOrStatusOrPriority: " + presence.getShowOrStatusOrPriority() + " any: " + presence.getAny());
}
+ org.jabber.protocol.muc_user.Item item = new org.jabber.protocol.muc_user.Item();
+ item.setAffiliation("owner");
+ item.setRole("moderator");
+ item.setNick("broker");
+ sendPresence(presence, item);
+
+ item = new org.jabber.protocol.muc_user.Item();
+ item.setAffiliation("admin");
+ item.setRole("moderator");
+ sendPresence(presence, item);
+
+ // lets create a subscription
+ final String to = presence.getTo();
+
+
+ boolean createConsumer = false;
+ ConsumerInfo consumerInfo = null;
+ synchronized (jidToConsumerMap) {
+ consumerInfo = jidToConsumerMap.get(to);
+ if (consumerInfo == null) {
+ consumerInfo = new ConsumerInfo();
+ jidToConsumerMap.put(to, consumerInfo);
+
+ ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+ consumerInfo.setConsumerId(consumerId);
+ consumerInfo.setPrefetchSize(10);
+ createConsumer = true;
+ }
+ }
+ if (!createConsumer) {
+ return;
+ }
+
+ ActiveMQDestination destination = createActiveMQDestination(to);
+ consumerInfo.setDestination(destination);
+
+ subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler<MessageDispatch>() {
+ public void handle(MessageDispatch messageDispatch) throws Exception {
+ // processing the inbound message
+ if (log.isDebugEnabled()) {
+ log.debug("Receiving inbound: " + messageDispatch.getMessage());
+ }
+
+ // lets send back an ACK
+ MessageAck ack = new MessageAck(messageDispatch, MessageAck.STANDARD_ACK_TYPE, 1);
+ sendToActiveMQ(ack, createErrorHandler("Ack of message: " + messageDispatch.getMessage().getMessageId()));
+
+ Message message = createXmppMessage(to, messageDispatch);
+ if (message != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Sending message to XMPP client from: " + message.getFrom() + " to: " + message.getTo() + " type: " + message.getType() + " with body: " + message.getAny());
+ }
+ transport.marshall(message);
+ }
+ }
+ });
+ sendToActiveMQ(consumerInfo, createErrorHandler("subscribe to destination: " + destination));
+ }
+
+ protected Message createXmppMessage(String to, MessageDispatch messageDispatch) throws JMSException {
+ Message answer = new Message();
+ answer.setType("groupchat");
+ String from = to;
+ int idx= from.indexOf('/');
+ if (idx > 0) {
+ from = from.substring(0, idx) + "/broker";
+ }
+ answer.setFrom(from);
+ answer.setTo(to);
+
+ org.apache.activemq.command.Message message = messageDispatch.getMessage();
+ //answer.setType(message.getType());
+ if (message instanceof ActiveMQTextMessage) {
+ ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) message;
+ Body body = new Body();
+ body.setValue(activeMQTextMessage.getText());
+ answer.getAny().add(body);
+ }
+ else {
+ // TODO support other message types
+ }
+ return answer;
+ }
+
+ protected void sendPresence(Presence presence, org.jabber.protocol.muc_user.Item item) throws IOException {
Presence answer = new Presence();
answer.setFrom(presence.getTo());
answer.setType(presence.getType());
answer.setTo(presence.getFrom());
X x = new X();
- org.jabber.protocol.muc_user.Item item = new org.jabber.protocol.muc_user.Item();
- item.setAffiliation("owner");
- item.setRole("moderator");
- item.setNick("broker");
x.getDeclineOrDestroyOrInvite().add(item);
answer.getShowOrStatusOrPriority().add(x);
transport.marshall(answer);
@@ -400,19 +498,41 @@
log.debug("Message from: " + message.getFrom() + " to: " + message.getTo() + " subjectOrBodyOrThread: " + message.getSubjectOrBodyOrThread());
}
- ActiveMQMessage activeMQMessage = createActiveMQMessage(message);
+ final ActiveMQMessage activeMQMessage = createActiveMQMessage(message);
ActiveMQDestination destination = createActiveMQDestination(message.getTo());
activeMQMessage.setMessageId(new MessageId(producerInfo, messageIdGenerator.getNextSequenceId()));
activeMQMessage.setDestination(destination);
+ activeMQMessage.setProducerId(producerId);
+ activeMQMessage.setTimestamp(System.currentTimeMillis());
addActiveMQMessageHeaders(activeMQMessage, message);
MessageDispatch dispatch = new MessageDispatch();
dispatch.setDestination(destination);
dispatch.setMessage(activeMQMessage);
- sendToActiveMQ(dispatch, null);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Sending ActiveMQ message: " + activeMQMessage);
+ }
+ sendToActiveMQ(dispatch, createErrorHandler("send message"));
+ }
+
+ protected Handler<Response> createErrorHandler(final String text) {
+ return new Handler<Response>() {
+ public void handle(Response event) throws Exception {
+ if (event instanceof ExceptionResponse) {
+ ExceptionResponse exceptionResponse = (ExceptionResponse) event;
+ Throwable exception = exceptionResponse.getException();
+ log.error("Failed to " + text + ". Reason: " + exception, exception);
+ }
+ else if (log.isDebugEnabled()) {
+ log.debug("Completed " + text);
+ }
+ }
+ };
}
+
/**
* Converts the Jabber destination name into a destination in ActiveMQ
Modified: incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java?view=diff&rev=467606&r1=467605&r2=467606
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java (original)
+++ incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java Wed Oct 25 04:24:56 2006
@@ -101,16 +101,17 @@
// now lets write the features
Features features = new Features();
+
+ // TODO support TLS
//features.getAny().add(new Starttls());
+
Mechanisms mechanisms = new Mechanisms();
+
+ // TODO support SASL
//mechanisms.getMechanism().add("DIGEST-MD5");
//mechanisms.getMechanism().add("PLAIN");
features.getAny().add(mechanisms);
marshall(features);
- /*
- xmlWriter.flush();
- outputStream.flush();
- */
}
catch (XMLStreamException e) {
throw IOExceptionSupport.create(e);
Modified: incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java?view=diff&rev=467606&r1=467605&r2=467606
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java (original)
+++ incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java Wed Oct 25 04:24:56 2006
@@ -28,8 +28,30 @@
public class XmppTest extends TestCase {
private XmppBroker broker = new XmppBroker();
- private boolean block = true;
+ private boolean block = false;
+ public static void main(String[] args) {
+ XmppTest test = new XmppTest();
+ test.block = true;
+ try {
+ test.setUp();
+ test.testConnect();
+ }
+ catch (Exception e) {
+ System.out.println("Caught: " + e);
+ e.printStackTrace();
+ }
+ finally {
+ try {
+ test.tearDown();
+ }
+ catch (Exception e) {
+ System.out.println("Caught: " + e);
+ e.printStackTrace();
+ }
+ }
+
+ }
public void testConnect() throws Exception {
//ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222);
//config.setDebuggerEnabled(true);