You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/10/26 16:59:14 UTC

svn commit: r468026 - in /incubator/activemq/trunk/activemq-xmpp: ./ src/main/java/org/apache/activemq/transport/xmpp/ src/test/java/org/apache/activemq/transport/xmpp/

Author: jstrachan
Date: Thu Oct 26 07:59:11 2006
New Revision: 468026

URL: http://svn.apache.org/viewvc?view=rev&rev=468026
Log:
fixed up the test case so its working now along with fixed a schoolboy error in sending messages over XMPP so that works nicely now. FWIW you can now connect via a Jabber client to the broker and interchange messages with the example JMS programs and the web console!

Modified:
    incubator/activemq/trunk/activemq-xmpp/pom.xml
    incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
    incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java
    incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java

Modified: incubator/activemq/trunk/activemq-xmpp/pom.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/pom.xml?view=diff&rev=468026&r1=468025&r2=468026
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/pom.xml (original)
+++ incubator/activemq/trunk/activemq-xmpp/pom.xml Thu Oct 26 07:59:11 2006
@@ -177,16 +177,11 @@
         </configuration>
       </plugin>
 
-      <!-- Configure which tests are included/excuded -->
       <plugin>
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
-          <!--
-          <includes>
-            <include>**/*Test.*</include>
-          </includes>
-          -->
           <excludes>
+            <!--<exclude>**/XmppTest.*</exclude>-->
           </excludes>
         </configuration>
       </plugin>

Modified: incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java?view=diff&rev=468026&r1=468025&r2=468026
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java (original)
+++ incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java Thu Oct 26 07:59:11 2006
@@ -353,14 +353,21 @@
         item.setNick("broker");
         sendPresence(presence, item);
 
+        /*
         item = new org.jabber.protocol.muc_user.Item();
         item.setAffiliation("admin");
         item.setRole("moderator");
         sendPresence(presence, item);
+        */
 
         // lets create a subscription
         final String to = presence.getTo();
 
+        ActiveMQDestination destination = createActiveMQDestination(to);
+        if (destination == null) {
+            log.debug("No 'to' attribute specified for presence so not creating a JMS subscription");
+            return;
+        }
 
         boolean createConsumer = false;
         ConsumerInfo consumerInfo = null;
@@ -373,6 +380,7 @@
                 ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
                 consumerInfo.setConsumerId(consumerId);
                 consumerInfo.setPrefetchSize(10);
+                consumerInfo.setNoLocal(true);
                 createConsumer = true;
             }
         }
@@ -380,7 +388,6 @@
             return;
         }
 
-        ActiveMQDestination destination = createActiveMQDestination(to);
         consumerInfo.setDestination(destination);
 
         subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler<MessageDispatch>() {
@@ -508,14 +515,16 @@
         activeMQMessage.setTimestamp(System.currentTimeMillis());
         addActiveMQMessageHeaders(activeMQMessage, message);
 
+        /*
         MessageDispatch dispatch = new MessageDispatch();
         dispatch.setDestination(destination);
         dispatch.setMessage(activeMQMessage);
+        */
 
         if (log.isDebugEnabled()) {
             log.debug("Sending ActiveMQ message: " + activeMQMessage);
         }
-        sendToActiveMQ(dispatch, createErrorHandler("send message"));
+        sendToActiveMQ(activeMQMessage, createErrorHandler("send message"));
     }
 
     protected Handler<Response> createErrorHandler(final String text) {

Modified: incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java?view=diff&rev=468026&r1=468025&r2=468026
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java (original)
+++ incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java Thu Oct 26 07:59:11 2006
@@ -69,7 +69,8 @@
     protected OutputStream outputStream;
     protected InputStream inputStream;
     private ProtocolConverter converter;
-    private String from;
+    private String from = "localhost";
+    private String brokerId = "broker-id-1";
 
     public XmppTransport(WireFormat wireFormat, Socket socket) throws IOException {
         super(wireFormat, socket);
@@ -94,24 +95,10 @@
             if (command instanceof BrokerInfo) {
                 BrokerInfo brokerInfo = (BrokerInfo) command;
 
-                String id = brokerInfo.getBrokerId().toString();
+                brokerId = brokerInfo.getBrokerId().toString();
                 from = brokerInfo.getBrokerName();
                 try {
-                    writeOpenStream(id, from);
-
-                    // now lets write the features
-                    Features features = new Features();
-
-                    // TODO support TLS
-                    //features.getAny().add(new Starttls());
-
-                    Mechanisms mechanisms = new Mechanisms();
-
-                    // TODO support SASL
-                    //mechanisms.getMechanism().add("DIGEST-MD5");
-                    //mechanisms.getMechanism().add("PLAIN");
-                    features.getAny().add(mechanisms);
-                    marshall(features);
+                    writeOpenStream(brokerId, from);
                 }
                 catch (XMLStreamException e) {
                     throw IOExceptionSupport.create(e);
@@ -139,9 +126,14 @@
      * Marshalls the given POJO to the client
      */
     public void marshall(Object command) throws IOException {
+        if (isStopped() || isStopping()) {
+            log.warn("Not marshalling command as shutting down: " + command);
+            return;
+        }
         try {
             marshaller.marshal(command, xmlWriter);
             xmlWriter.flush();
+            outputStream.flush();
         }
         catch (JAXBException e) {
             throw IOExceptionSupport.create(e);
@@ -193,7 +185,8 @@
                     if (event.getEventType() == XMLEvent.END_ELEMENT) {
                         break;
                     }
-                    else if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) {
+                    else
+                    if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) {
                         break;
                     }
                     else {
@@ -250,16 +243,30 @@
     @Override
     protected void initializeStreams() throws Exception {
         // TODO it would be preferable to use class discovery here!
-        context = JAXBContext.newInstance("jabber.client" + ":jabber.server"
-                + ":jabber.iq._private" + ":jabber.iq.auth" + ":jabber.iq.gateway" + ":jabber.iq.last" + ":jabber.iq.oob"
-                + ":jabber.iq.pass" + ":jabber.iq.roster" + ":jabber.iq.time" + ":jabber.iq.version"
-                + ":org.jabber.etherx.streams" + ":org.jabber.protocol.activity" + ":org.jabber.protocol.address"
+        context = JAXBContext.newInstance("jabber.client"
+                /*
+                + ":jabber.server"
+                + ":jabber.iq.gateway"
+                + ":jabber.iq.last"
+                + ":jabber.iq.oob"
+                + ":jabber.iq.pass"
+                + ":jabber.iq.time"
+                + ":jabber.iq.version"
+                + ":org.jabber.protocol.activity" + ":org.jabber.protocol.address"
                 + ":org.jabber.protocol.amp" + ":org.jabber.protocol.amp_errors"
+                + ":org.jabber.protocol.muc_admin"
+                + ":org.jabber.protocol.muc_unique"
+                */
+                + ":jabber.iq._private"
+                + ":jabber.iq.auth"
+                + ":jabber.iq.roster"
+                + ":org.jabber.etherx.streams"
                 + ":org.jabber.protocol.disco_info" + ":org.jabber.protocol.disco_items"
-                + ":org.jabber.protocol.muc" + ":org.jabber.protocol.muc_admin"
-                + ":org.jabber.protocol.muc_unique" + ":org.jabber.protocol.muc_user"
+                + ":org.jabber.protocol.muc"
+                + ":org.jabber.protocol.muc_user"
                 + ":ietf.params.xml.ns.xmpp_sasl" + ":ietf.params.xml.ns.xmpp_stanzas"
-                + ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls");
+                + ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls"
+        );
 
         inputStream = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024);
         outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024);
@@ -270,6 +277,7 @@
     }
 
     protected void writeOpenStream(String id, String from) throws IOException, XMLStreamException {
+        log.debug("Sending initial stream element");
         XMLOutputFactory factory = XMLOutputFactory.newInstance();
         //factory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
         xmlWriter = factory.createXMLStreamWriter(outputStream);
@@ -286,7 +294,23 @@
         }
         xmlWriter.writeAttribute("to", to);
         xmlWriter.writeAttribute("from", from);
-        xmlWriter.writeCharacters("\n");
+
+
+        // now lets write the features
+        Features features = new Features();
+
+        // TODO support TLS
+        //features.getAny().add(new Starttls());
+
+        Mechanisms mechanisms = new Mechanisms();
+
+        // TODO support SASL
+        //mechanisms.getMechanism().add("DIGEST-MD5");
+        //mechanisms.getMechanism().add("PLAIN");
+        features.getAny().add(mechanisms);
+        marshall(features);
+
+        log.debug("Initial stream element sent!");
     }
 
 }

Modified: incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java?view=diff&rev=468026&r1=468025&r2=468026
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java (original)
+++ incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java Thu Oct 26 07:59:11 2006
@@ -18,40 +18,25 @@
 package org.apache.activemq.transport.xmpp;
 
 import junit.framework.TestCase;
+import junit.textui.TestRunner;
 import org.jivesoftware.smack.Chat;
-import org.jivesoftware.smack.XMPPException;
 import org.jivesoftware.smack.XMPPConnection;
+import org.jivesoftware.smack.XMPPException;
 
 /**
  * @version $Revision$
  */
 public class XmppTest extends TestCase {
 
+    protected static boolean block = false;
+
     private XmppBroker broker = new XmppBroker();
-    private boolean block = false;
 
     public static void main(String[] args) {
-        XmppTest test = new XmppTest();
-        test.block = true;
-        try {
-            test.setUp();
-            test.testConnect();
-        }
-        catch (Exception e) {
-            System.out.println("Caught: " + e);
-            e.printStackTrace();
-        }
-        finally {
-            try {
-                test.tearDown();
-            }
-            catch (Exception e) {
-                System.out.println("Caught: " + e);
-                e.printStackTrace();
-            }
-        }
-
+        block = true;
+        TestRunner.run(XmppTest.class);
     }
+
     public void testConnect() throws Exception {
         //ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222);
         //config.setDebuggerEnabled(true);
@@ -69,8 +54,13 @@
             System.out.println("Sent all messages!");
         }
         catch (XMPPException e) {
-            System.out.println("Caught: " + e);
-            e.printStackTrace();
+            if (block) {
+                System.out.println("Caught: " + e);
+                e.printStackTrace();
+            }
+            else {
+                throw e;
+            }
         }
         if (block) {
             Thread.sleep(20000);