You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/10/26 16:59:14 UTC
svn commit: r468026 - in /incubator/activemq/trunk/activemq-xmpp: ./
src/main/java/org/apache/activemq/transport/xmpp/
src/test/java/org/apache/activemq/transport/xmpp/
Author: jstrachan
Date: Thu Oct 26 07:59:11 2006
New Revision: 468026
URL: http://svn.apache.org/viewvc?view=rev&rev=468026
Log:
fixed up the test case so its working now along with fixed a schoolboy error in sending messages over XMPP so that works nicely now. FWIW you can now connect via a Jabber client to the broker and interchange messages with the example JMS programs and the web console!
Modified:
incubator/activemq/trunk/activemq-xmpp/pom.xml
incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java
incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java
Modified: incubator/activemq/trunk/activemq-xmpp/pom.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/pom.xml?view=diff&rev=468026&r1=468025&r2=468026
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/pom.xml (original)
+++ incubator/activemq/trunk/activemq-xmpp/pom.xml Thu Oct 26 07:59:11 2006
@@ -177,16 +177,11 @@
</configuration>
</plugin>
- <!-- Configure which tests are included/excuded -->
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
- <!--
- <includes>
- <include>**/*Test.*</include>
- </includes>
- -->
<excludes>
+ <!--<exclude>**/XmppTest.*</exclude>-->
</excludes>
</configuration>
</plugin>
Modified: incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java?view=diff&rev=468026&r1=468025&r2=468026
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java (original)
+++ incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java Thu Oct 26 07:59:11 2006
@@ -353,14 +353,21 @@
item.setNick("broker");
sendPresence(presence, item);
+ /*
item = new org.jabber.protocol.muc_user.Item();
item.setAffiliation("admin");
item.setRole("moderator");
sendPresence(presence, item);
+ */
// lets create a subscription
final String to = presence.getTo();
+ ActiveMQDestination destination = createActiveMQDestination(to);
+ if (destination == null) {
+ log.debug("No 'to' attribute specified for presence so not creating a JMS subscription");
+ return;
+ }
boolean createConsumer = false;
ConsumerInfo consumerInfo = null;
@@ -373,6 +380,7 @@
ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
consumerInfo.setConsumerId(consumerId);
consumerInfo.setPrefetchSize(10);
+ consumerInfo.setNoLocal(true);
createConsumer = true;
}
}
@@ -380,7 +388,6 @@
return;
}
- ActiveMQDestination destination = createActiveMQDestination(to);
consumerInfo.setDestination(destination);
subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler<MessageDispatch>() {
@@ -508,14 +515,16 @@
activeMQMessage.setTimestamp(System.currentTimeMillis());
addActiveMQMessageHeaders(activeMQMessage, message);
+ /*
MessageDispatch dispatch = new MessageDispatch();
dispatch.setDestination(destination);
dispatch.setMessage(activeMQMessage);
+ */
if (log.isDebugEnabled()) {
log.debug("Sending ActiveMQ message: " + activeMQMessage);
}
- sendToActiveMQ(dispatch, createErrorHandler("send message"));
+ sendToActiveMQ(activeMQMessage, createErrorHandler("send message"));
}
protected Handler<Response> createErrorHandler(final String text) {
Modified: incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java?view=diff&rev=468026&r1=468025&r2=468026
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java (original)
+++ incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java Thu Oct 26 07:59:11 2006
@@ -69,7 +69,8 @@
protected OutputStream outputStream;
protected InputStream inputStream;
private ProtocolConverter converter;
- private String from;
+ private String from = "localhost";
+ private String brokerId = "broker-id-1";
public XmppTransport(WireFormat wireFormat, Socket socket) throws IOException {
super(wireFormat, socket);
@@ -94,24 +95,10 @@
if (command instanceof BrokerInfo) {
BrokerInfo brokerInfo = (BrokerInfo) command;
- String id = brokerInfo.getBrokerId().toString();
+ brokerId = brokerInfo.getBrokerId().toString();
from = brokerInfo.getBrokerName();
try {
- writeOpenStream(id, from);
-
- // now lets write the features
- Features features = new Features();
-
- // TODO support TLS
- //features.getAny().add(new Starttls());
-
- Mechanisms mechanisms = new Mechanisms();
-
- // TODO support SASL
- //mechanisms.getMechanism().add("DIGEST-MD5");
- //mechanisms.getMechanism().add("PLAIN");
- features.getAny().add(mechanisms);
- marshall(features);
+ writeOpenStream(brokerId, from);
}
catch (XMLStreamException e) {
throw IOExceptionSupport.create(e);
@@ -139,9 +126,14 @@
* Marshalls the given POJO to the client
*/
public void marshall(Object command) throws IOException {
+ if (isStopped() || isStopping()) {
+ log.warn("Not marshalling command as shutting down: " + command);
+ return;
+ }
try {
marshaller.marshal(command, xmlWriter);
xmlWriter.flush();
+ outputStream.flush();
}
catch (JAXBException e) {
throw IOExceptionSupport.create(e);
@@ -193,7 +185,8 @@
if (event.getEventType() == XMLEvent.END_ELEMENT) {
break;
}
- else if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) {
+ else
+ if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) {
break;
}
else {
@@ -250,16 +243,30 @@
@Override
protected void initializeStreams() throws Exception {
// TODO it would be preferable to use class discovery here!
- context = JAXBContext.newInstance("jabber.client" + ":jabber.server"
- + ":jabber.iq._private" + ":jabber.iq.auth" + ":jabber.iq.gateway" + ":jabber.iq.last" + ":jabber.iq.oob"
- + ":jabber.iq.pass" + ":jabber.iq.roster" + ":jabber.iq.time" + ":jabber.iq.version"
- + ":org.jabber.etherx.streams" + ":org.jabber.protocol.activity" + ":org.jabber.protocol.address"
+ context = JAXBContext.newInstance("jabber.client"
+ /*
+ + ":jabber.server"
+ + ":jabber.iq.gateway"
+ + ":jabber.iq.last"
+ + ":jabber.iq.oob"
+ + ":jabber.iq.pass"
+ + ":jabber.iq.time"
+ + ":jabber.iq.version"
+ + ":org.jabber.protocol.activity" + ":org.jabber.protocol.address"
+ ":org.jabber.protocol.amp" + ":org.jabber.protocol.amp_errors"
+ + ":org.jabber.protocol.muc_admin"
+ + ":org.jabber.protocol.muc_unique"
+ */
+ + ":jabber.iq._private"
+ + ":jabber.iq.auth"
+ + ":jabber.iq.roster"
+ + ":org.jabber.etherx.streams"
+ ":org.jabber.protocol.disco_info" + ":org.jabber.protocol.disco_items"
- + ":org.jabber.protocol.muc" + ":org.jabber.protocol.muc_admin"
- + ":org.jabber.protocol.muc_unique" + ":org.jabber.protocol.muc_user"
+ + ":org.jabber.protocol.muc"
+ + ":org.jabber.protocol.muc_user"
+ ":ietf.params.xml.ns.xmpp_sasl" + ":ietf.params.xml.ns.xmpp_stanzas"
- + ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls");
+ + ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls"
+ );
inputStream = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024);
outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024);
@@ -270,6 +277,7 @@
}
protected void writeOpenStream(String id, String from) throws IOException, XMLStreamException {
+ log.debug("Sending initial stream element");
XMLOutputFactory factory = XMLOutputFactory.newInstance();
//factory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
xmlWriter = factory.createXMLStreamWriter(outputStream);
@@ -286,7 +294,23 @@
}
xmlWriter.writeAttribute("to", to);
xmlWriter.writeAttribute("from", from);
- xmlWriter.writeCharacters("\n");
+
+
+ // now lets write the features
+ Features features = new Features();
+
+ // TODO support TLS
+ //features.getAny().add(new Starttls());
+
+ Mechanisms mechanisms = new Mechanisms();
+
+ // TODO support SASL
+ //mechanisms.getMechanism().add("DIGEST-MD5");
+ //mechanisms.getMechanism().add("PLAIN");
+ features.getAny().add(mechanisms);
+ marshall(features);
+
+ log.debug("Initial stream element sent!");
}
}
Modified: incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java?view=diff&rev=468026&r1=468025&r2=468026
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java (original)
+++ incubator/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java Thu Oct 26 07:59:11 2006
@@ -18,40 +18,25 @@
package org.apache.activemq.transport.xmpp;
import junit.framework.TestCase;
+import junit.textui.TestRunner;
import org.jivesoftware.smack.Chat;
-import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XMPPConnection;
+import org.jivesoftware.smack.XMPPException;
/**
* @version $Revision$
*/
public class XmppTest extends TestCase {
+ protected static boolean block = false;
+
private XmppBroker broker = new XmppBroker();
- private boolean block = false;
public static void main(String[] args) {
- XmppTest test = new XmppTest();
- test.block = true;
- try {
- test.setUp();
- test.testConnect();
- }
- catch (Exception e) {
- System.out.println("Caught: " + e);
- e.printStackTrace();
- }
- finally {
- try {
- test.tearDown();
- }
- catch (Exception e) {
- System.out.println("Caught: " + e);
- e.printStackTrace();
- }
- }
-
+ block = true;
+ TestRunner.run(XmppTest.class);
}
+
public void testConnect() throws Exception {
//ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222);
//config.setDebuggerEnabled(true);
@@ -69,8 +54,13 @@
System.out.println("Sent all messages!");
}
catch (XMPPException e) {
- System.out.println("Caught: " + e);
- e.printStackTrace();
+ if (block) {
+ System.out.println("Caught: " + e);
+ e.printStackTrace();
+ }
+ else {
+ throw e;
+ }
}
if (block) {
Thread.sleep(20000);