You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/10/25 13:24:57 UTC

svn commit: r467606 - in /incubator/activemq/trunk/activemq-xmpp/src: main/java/org/apache/activemq/transport/xmpp/ test/java/org/apache/activemq/transport/xmpp/

Author: jstrachan
Date: Wed Oct 25 04:24:56 2006
New Revision: 467606

URL: http://svn.apache.org/viewvc?view=rev&rev=467606
Log:
managed to get the XMPP transport to subscribe to topics and receive messages when tested via the Spark Jabber client

Modified:
    incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
    incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java
    incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java

Modified: incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java?view=diff&rev=467606&r1=467605&r2=467606
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java (original)
+++ incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java Wed Oct 25 04:24:56 2006
@@ -45,6 +45,7 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -76,6 +77,8 @@
 
     private final Map<Integer, Handler<Response>> resposeHandlers = new ConcurrentHashMap<Integer, Handler<Response>>();
     private final Map<ConsumerId, Handler<MessageDispatch>> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, Handler<MessageDispatch>>();
+    private final Map<String, ConsumerInfo> jidToConsumerMap = new HashMap<String, ConsumerInfo>();
+
     private final Map transactions = new ConcurrentHashMap();
 
     private final Object commnadIdMutex = new Object();
@@ -142,6 +145,9 @@
             if (handler != null) {
                 handler.handle(response);
             }
+            else {
+                log.warn("No handler for response: " + response);
+            }
         }
         else if (command.isMessageDispatch()) {
             MessageDispatch md = (MessageDispatch) command;
@@ -149,6 +155,9 @@
             if (handler != null) {
                 handler.handle(md);
             }
+            else {
+                log.warn("No handler for message: " + md);
+            }
         }
     }
 
@@ -160,53 +169,7 @@
         Object any = iq.getAny();
 
         if (any instanceof Query) {
-            Query query = (Query) any;
-            if (log.isDebugEnabled()) {
-                log.debug("Iq Auth Query " + debugString(iq) + " resource: " + query.getResource() + " username: " + query.getUsername());
-            }
-            if (query.getPassword() == null) {
-                Iq result = createResult(iq);
-                Query required = new Query();
-                required.setPassword("");
-                required.setUsername("");
-                result.setAny(required);
-                transport.marshall(result);
-                return;
-            }
-
-            //connectionInfo.setClientId(query.getResource());
-            connectionInfo.setUserName(query.getUsername());
-            connectionInfo.setPassword(query.getPassword());
-
-            // TODO support digest?
-
-            if (connectionInfo.getClientId() == null) {
-                connectionInfo.setClientId(clientIdGenerator.generateId());
-            }
-
-            sendToActiveMQ(connectionInfo, new Handler<Response>() {
-                public void handle(Response response) throws Exception {
-
-                    Iq result = createResult(iq);
-
-                    if (response instanceof ExceptionResponse) {
-                        ExceptionResponse exceptionResponse = (ExceptionResponse) response;
-                        jabber.client.Error error = new jabber.client.Error();
-                        result.setError(error);
-
-                        StringWriter buffer = new StringWriter();
-                        exceptionResponse.getException().printStackTrace(new PrintWriter(buffer));
-                        error.setInternalServerError(buffer.toString());
-                    }
-                    else {
-                        connected.set(true);
-                    }
-                    transport.marshall(result);
-
-                    sendToActiveMQ(sessionInfo, null);
-                    sendToActiveMQ(producerInfo, null);
-                }
-            });
+            onAuthQuery(any, iq);
 
         }
         else if (any instanceof jabber.iq._private.Query) {
@@ -255,6 +218,60 @@
         }
     }
 
+    protected void onAuthQuery(Object any, final Iq iq) throws IOException {
+        Query query = (Query) any;
+        if (log.isDebugEnabled()) {
+            log.debug("Iq Auth Query " + debugString(iq) + " resource: " + query.getResource() + " username: " + query.getUsername());
+        }
+        if (query.getPassword() == null) {
+            Iq result = createResult(iq);
+            Query required = new Query();
+            required.setPassword("");
+            required.setUsername("");
+            result.setAny(required);
+            transport.marshall(result);
+            return;
+        }
+
+        //connectionInfo.setClientId(query.getResource());
+        connectionInfo.setUserName(query.getUsername());
+        connectionInfo.setPassword(query.getPassword());
+
+        // TODO support digest?
+
+        if (connectionInfo.getClientId() == null) {
+            connectionInfo.setClientId(clientIdGenerator.generateId());
+        }
+
+        sendToActiveMQ(connectionInfo, new Handler<Response>() {
+            public void handle(Response response) throws Exception {
+
+                Iq result = createResult(iq);
+
+                if (response instanceof ExceptionResponse) {
+                    ExceptionResponse exceptionResponse = (ExceptionResponse) response;
+                    Throwable exception = exceptionResponse.getException();
+
+                    log.warn("Failed to create connection: " + exception, exception);
+
+                    Error error = new Error();
+                    result.setError(error);
+
+                    StringWriter buffer = new StringWriter();
+                    exception.printStackTrace(new PrintWriter(buffer));
+                    error.setInternalServerError(buffer.toString());
+                }
+                else {
+                    connected.set(true);
+                }
+                transport.marshall(result);
+
+                sendToActiveMQ(sessionInfo, createErrorHandler("create sesssion"));
+                sendToActiveMQ(producerInfo, createErrorHandler("create producer"));
+            }
+        });
+    }
+
     protected String debugString(Iq iq) {
         return " to: " + iq.getTo() + " type: " + iq.getType() + " from: " + iq.getFrom() + " id: " + iq.getId();
     }
@@ -325,20 +342,101 @@
         transport.marshall(result);
     }
 
-    protected void onPresence(Presence presence) throws IOException {
+    protected void onPresence(Presence presence) throws IOException, JMSException {
         if (log.isDebugEnabled()) {
             log.debug("Presence: " + presence.getFrom() + " id: " + presence.getId() + " to: " + presence.getTo() + " type: " + presence.getType()
                     + " showOrStatusOrPriority: " + presence.getShowOrStatusOrPriority() + " any: " + presence.getAny());
         }
+        org.jabber.protocol.muc_user.Item item = new org.jabber.protocol.muc_user.Item();
+        item.setAffiliation("owner");
+        item.setRole("moderator");
+        item.setNick("broker");
+        sendPresence(presence, item);
+
+        item = new org.jabber.protocol.muc_user.Item();
+        item.setAffiliation("admin");
+        item.setRole("moderator");
+        sendPresence(presence, item);
+
+        // lets create a subscription
+        final String to = presence.getTo();
+
+
+        boolean createConsumer = false;
+        ConsumerInfo consumerInfo = null;
+        synchronized (jidToConsumerMap) {
+            consumerInfo = jidToConsumerMap.get(to);
+            if (consumerInfo == null) {
+                consumerInfo = new ConsumerInfo();
+                jidToConsumerMap.put(to, consumerInfo);
+
+                ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+                consumerInfo.setConsumerId(consumerId);
+                consumerInfo.setPrefetchSize(10);
+                createConsumer = true;
+            }
+        }
+        if (!createConsumer) {
+            return;
+        }
+
+        ActiveMQDestination destination = createActiveMQDestination(to);
+        consumerInfo.setDestination(destination);
+
+        subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler<MessageDispatch>() {
+            public void handle(MessageDispatch messageDispatch) throws Exception {
+                // processing the inbound message
+                if (log.isDebugEnabled()) {
+                    log.debug("Receiving inbound: " + messageDispatch.getMessage());
+                }
+
+                // lets send back an ACK
+                MessageAck ack = new MessageAck(messageDispatch, MessageAck.STANDARD_ACK_TYPE, 1);
+                sendToActiveMQ(ack, createErrorHandler("Ack of message: " + messageDispatch.getMessage().getMessageId()));
+
+                Message message = createXmppMessage(to, messageDispatch);
+                if (message != null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Sending message to XMPP client from: " + message.getFrom() + " to: " + message.getTo() +  " type: " + message.getType() + " with body: " + message.getAny());
+                    }
+                    transport.marshall(message);
+                }
+            }
+        });
+        sendToActiveMQ(consumerInfo, createErrorHandler("subscribe to destination: " + destination));
+    }
+
+    protected Message createXmppMessage(String to, MessageDispatch messageDispatch) throws JMSException {
+        Message answer = new Message();
+        answer.setType("groupchat");
+        String from = to;
+        int idx= from.indexOf('/');
+        if (idx > 0) {
+            from = from.substring(0, idx) + "/broker";
+        }
+        answer.setFrom(from);
+        answer.setTo(to);
+
+        org.apache.activemq.command.Message message = messageDispatch.getMessage();
+        //answer.setType(message.getType());
+        if (message instanceof ActiveMQTextMessage) {
+            ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) message;
+            Body body = new Body();
+            body.setValue(activeMQTextMessage.getText());
+            answer.getAny().add(body);
+        }
+        else {
+            // TODO support other message types
+        }
+        return answer;
+    }
+
+    protected void sendPresence(Presence presence, org.jabber.protocol.muc_user.Item item) throws IOException {
         Presence answer = new Presence();
         answer.setFrom(presence.getTo());
         answer.setType(presence.getType());
         answer.setTo(presence.getFrom());
         X x = new X();
-        org.jabber.protocol.muc_user.Item item = new org.jabber.protocol.muc_user.Item();
-        item.setAffiliation("owner");
-        item.setRole("moderator");
-        item.setNick("broker");
         x.getDeclineOrDestroyOrInvite().add(item);
         answer.getShowOrStatusOrPriority().add(x);
         transport.marshall(answer);
@@ -400,19 +498,41 @@
             log.debug("Message from: " + message.getFrom() + " to: " + message.getTo() + " subjectOrBodyOrThread: " + message.getSubjectOrBodyOrThread());
         }
 
-        ActiveMQMessage activeMQMessage = createActiveMQMessage(message);
+        final ActiveMQMessage activeMQMessage = createActiveMQMessage(message);
 
         ActiveMQDestination destination = createActiveMQDestination(message.getTo());
 
         activeMQMessage.setMessageId(new MessageId(producerInfo, messageIdGenerator.getNextSequenceId()));
         activeMQMessage.setDestination(destination);
+        activeMQMessage.setProducerId(producerId);
+        activeMQMessage.setTimestamp(System.currentTimeMillis());
         addActiveMQMessageHeaders(activeMQMessage, message);
 
         MessageDispatch dispatch = new MessageDispatch();
         dispatch.setDestination(destination);
         dispatch.setMessage(activeMQMessage);
-        sendToActiveMQ(dispatch, null);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Sending ActiveMQ message: " + activeMQMessage);
+        }
+        sendToActiveMQ(dispatch, createErrorHandler("send message"));
+    }
+
+    protected Handler<Response> createErrorHandler(final String text) {
+        return new Handler<Response>() {
+            public void handle(Response event) throws Exception {
+                if (event instanceof ExceptionResponse) {
+                    ExceptionResponse exceptionResponse = (ExceptionResponse) event;
+                    Throwable exception = exceptionResponse.getException();
+                    log.error("Failed to " + text + ". Reason: " + exception, exception);
+                }
+                else if (log.isDebugEnabled()) {
+                    log.debug("Completed " + text);
+                }
+            }
+        };
     }
+
 
     /**
      * Converts the Jabber destination name into a destination in ActiveMQ

Modified: incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java?view=diff&rev=467606&r1=467605&r2=467606
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java (original)
+++ incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java Wed Oct 25 04:24:56 2006
@@ -101,16 +101,17 @@
 
                     // now lets write the features
                     Features features = new Features();
+
+                    // TODO support TLS
                     //features.getAny().add(new Starttls());
+
                     Mechanisms mechanisms = new Mechanisms();
+
+                    // TODO support SASL
                     //mechanisms.getMechanism().add("DIGEST-MD5");
                     //mechanisms.getMechanism().add("PLAIN");
                     features.getAny().add(mechanisms);
                     marshall(features);
-                    /*
-                    xmlWriter.flush();
-                    outputStream.flush();
-                    */
                 }
                 catch (XMLStreamException e) {
                     throw IOExceptionSupport.create(e);

Modified: incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java?view=diff&rev=467606&r1=467605&r2=467606
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java (original)
+++ incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java Wed Oct 25 04:24:56 2006
@@ -28,8 +28,30 @@
 public class XmppTest extends TestCase {
 
     private XmppBroker broker = new XmppBroker();
-    private boolean block = true;
+    private boolean block = false;
 
+    public static void main(String[] args) {
+        XmppTest test = new XmppTest();
+        test.block = true;
+        try {
+            test.setUp();
+            test.testConnect();
+        }
+        catch (Exception e) {
+            System.out.println("Caught: " + e);
+            e.printStackTrace();
+        }
+        finally {
+            try {
+                test.tearDown();
+            }
+            catch (Exception e) {
+                System.out.println("Caught: " + e);
+                e.printStackTrace();
+            }
+        }
+
+    }
     public void testConnect() throws Exception {
         //ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222);
         //config.setDebuggerEnabled(true);