You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2008/11/18 14:56:13 UTC
svn commit: r718590 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/transport/stomp/
activemq-core/src/test/java/org/apache/activemq/transport/stomp/
assembly/src/release/example/ assembly/src/release/example/src/
Author: dejanb
Date: Tue Nov 18 05:56:13 2008
New Revision: 718590
URL: http://svn.apache.org/viewvc?rev=718590&view=rev
Log:
fix for AMQ-2003
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
- copied, changed from r706270, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
activemq/trunk/assembly/src/release/example/src/StompExample.java (with props)
Removed:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
Modified:
activemq/trunk/assembly/src/release/example/build.xml
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (from r706270, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java&r1=706270&r2=718590&rev=718590&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Tue Nov 18 05:56:13 2008
@@ -18,11 +18,17 @@
package org.apache.activemq.transport.stomp;
import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
public class StompConnection {
@@ -53,12 +59,24 @@
outputStream.write(0);
outputStream.flush();
}
+
+ public StompFrame receive() throws Exception {
+ return receive(RECEIVE_TIMEOUT);
+ }
+
+ public StompFrame receive(long timeOut) throws Exception {
+ stompSocket.setSoTimeout((int)timeOut);
+ InputStream is = stompSocket.getInputStream();
+ StompWireFormat wf = new StompWireFormat();
+ DataInputStream dis = new DataInputStream(is);
+ return (StompFrame)wf.unmarshal(dis);
+ }
public String receiveFrame() throws Exception {
return receiveFrame(RECEIVE_TIMEOUT);
}
- private String receiveFrame(long timeOut) throws Exception {
+ public String receiveFrame(long timeOut) throws Exception {
stompSocket.setSoTimeout((int)timeOut);
InputStream is = stompSocket.getInputStream();
int c = 0;
@@ -87,5 +105,115 @@
public void setStompSocket(Socket stompSocket) {
this.stompSocket = stompSocket;
}
+
+ public void connect(String username, String password) throws Exception {
+ HashMap<String, String> headers = new HashMap();
+ headers.put("login", username);
+ headers.put("password", password);
+ StompFrame frame = new StompFrame("CONNECT", headers);
+ sendFrame(frame.toString());
+ }
+
+ public void disconnect() throws Exception {
+ StompFrame frame = new StompFrame("DISCONNECT");
+ sendFrame(frame.toString());
+ }
+
+ public void send(String destination, String message) throws Exception {
+ send(destination, message, null);
+ }
+
+ public void send(String destination, String message, HashMap<String, String> headers) throws Exception {
+ if (headers == null) {
+ headers = new HashMap<String, String>();
+ }
+ headers.put("destination", destination);
+ StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
+ sendFrame(frame.toString());
+ }
+
+ public void subscribe(String destination) throws Exception {
+ subscribe(destination, null, null);
+ }
+
+ public void subscribe(String destination, String ack) throws Exception {
+ subscribe(destination, ack, new HashMap<String, String>());
+ }
+
+ public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
+ if (headers == null) {
+ headers = new HashMap<String, String>();
+ }
+ headers.put("destination", destination);
+ if (ack != null) {
+ headers.put("ack", ack);
+ }
+ StompFrame frame = new StompFrame("SUBSCRIBE", headers);
+ sendFrame(frame.toString());
+ }
+
+ public void unsubscribe(String destination) throws Exception {
+ unsubscribe(destination, null);
+ }
+
+ public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
+ if (headers == null) {
+ headers = new HashMap<String, String>();
+ }
+ headers.put("destination", destination);
+ StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
+ sendFrame(frame.toString());
+ }
+
+ public void begin(String transaction) throws Exception {
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put("transaction", transaction);
+ StompFrame frame = new StompFrame("BEGIN", headers);
+ sendFrame(frame.toString());
+ }
+
+ public void abort(String transaction) throws Exception {
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put("transaction", transaction);
+ StompFrame frame = new StompFrame("ABORT", headers);
+ sendFrame(frame.toString());
+ }
+
+ public void commit(String transaction) throws Exception {
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put("transaction", transaction);
+ StompFrame frame = new StompFrame("COMMIT", headers);
+ sendFrame(frame.toString());
+ }
+
+ public void ack(StompFrame frame) throws Exception {
+ ack(frame.getHeaders().get("message-id"), null);
+ }
+
+ public void ack(StompFrame frame, String transaction) throws Exception {
+ ack(frame.getHeaders().get("message-id"), transaction);
+ }
+
+ public void ack(String messageId) throws Exception {
+ ack(messageId, null);
+ }
+
+ public void ack(String messageId, String transaction) throws Exception {
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put("message-id", messageId);
+ if (transaction != null)
+ headers.put("transaction", transaction);
+ StompFrame frame = new StompFrame("ACK", headers);
+ sendFrame(frame.toString());
+ }
+
+ protected String appendHeaders(HashMap<String, Object> headers) {
+ StringBuffer result = new StringBuffer();
+ for (String key : headers.keySet()) {
+ result.append(key + ":" + headers.get(key) + "\n");
+ }
+ result.append("\n");
+ return result.toString();
+ }
}
Modified: activemq/trunk/assembly/src/release/example/build.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/build.xml?rev=718590&r1=718589&r2=718590&view=diff
==============================================================================
--- activemq/trunk/assembly/src/release/example/build.xml (original)
+++ activemq/trunk/assembly/src/release/example/build.xml Tue Nov 18 05:56:13 2008
@@ -290,4 +290,12 @@
</java>
</target>
+ <target name="stomp" depends="compile" description="Runs a Stomp example">
+ <echo>Running a Stomp example</echo>
+ <java classname="StompExample" fork="yes" maxmemory="100M">
+ <classpath refid="javac.classpath" />
+ <jvmarg value="-server" />
+ </java>
+ </target>
+
</project>
Added: activemq/trunk/assembly/src/release/example/src/StompExample.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/StompExample.java?rev=718590&view=auto
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/StompExample.java (added)
+++ activemq/trunk/assembly/src/release/example/src/StompExample.java Tue Nov 18 05:56:13 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompConnection;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
+
+/**
+ *
+ * This example demonstrates Stomp Java API
+ *
+ * @version $Revision$
+ *
+ */
+public class StompExample {
+
+ public static void main(String args[]) throws Exception {
+ StompConnection connection = new StompConnection();
+ connection.open("localhost", 61613);
+
+ connection.connect("system", "manager");
+ StompFrame connect = connection.receive();
+ if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
+ throw new Exception ("Not connected");
+ }
+
+ connection.begin("tx1");
+ connection.send("/queue/test", "message1");
+ connection.send("/queue/test", "message2");
+ connection.commit("tx1");
+
+ connection.subscribe("/queue/test", Subscribe.AckModeValues.CLIENT);
+
+ connection.begin("tx2");
+
+ StompFrame message = connection.receive();
+ System.out.println(message.getBody());
+ connection.ack(message, "tx2");
+
+ message = connection.receive();
+ System.out.println(message.getBody());
+ connection.ack(message, "tx2");
+
+ connection.commit("tx2");
+
+ connection.disconnect();
+ }
+
+}
Propchange: activemq/trunk/assembly/src/release/example/src/StompExample.java
------------------------------------------------------------------------------
svn:eol-style = native