You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2008/11/18 14:56:13 UTC

svn commit: r718590 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/transport/stomp/ activemq-core/src/test/java/org/apache/activemq/transport/stomp/ assembly/src/release/example/ assembly/src/release/example/src/

Author: dejanb
Date: Tue Nov 18 05:56:13 2008
New Revision: 718590

URL: http://svn.apache.org/viewvc?rev=718590&view=rev
Log:
fix for AMQ-2003

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
      - copied, changed from r706270, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
    activemq/trunk/assembly/src/release/example/src/StompExample.java   (with props)
Removed:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
Modified:
    activemq/trunk/assembly/src/release/example/build.xml

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (from r706270, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java&r1=706270&r2=718590&rev=718590&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Tue Nov 18 05:56:13 2008
@@ -18,11 +18,17 @@
 package org.apache.activemq.transport.stomp;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
 
 public class StompConnection {
 
@@ -53,12 +59,24 @@
         outputStream.write(0);
         outputStream.flush();
     }
+    
+    public StompFrame receive() throws Exception {
+        return receive(RECEIVE_TIMEOUT);
+    }    
+    
+    public StompFrame receive(long timeOut) throws Exception {
+    	stompSocket.setSoTimeout((int)timeOut);
+    	InputStream is = stompSocket.getInputStream();
+        StompWireFormat wf = new StompWireFormat();
+        DataInputStream dis = new DataInputStream(is);
+        return (StompFrame)wf.unmarshal(dis);
+    }
 
     public String receiveFrame() throws Exception {
         return receiveFrame(RECEIVE_TIMEOUT);
     }
 
-    private String receiveFrame(long timeOut) throws Exception {
+    public String receiveFrame(long timeOut) throws Exception {
         stompSocket.setSoTimeout((int)timeOut);
         InputStream is = stompSocket.getInputStream();
         int c = 0;
@@ -87,5 +105,115 @@
 	public void setStompSocket(Socket stompSocket) {
 		this.stompSocket = stompSocket;
 	}
+	
+    public void connect(String username, String password) throws Exception {
+    	HashMap<String, String> headers = new HashMap();
+    	headers.put("login", username);
+    	headers.put("password", password);
+    	StompFrame frame = new StompFrame("CONNECT", headers);
+        sendFrame(frame.toString());
+    }
+    
+    public void disconnect() throws Exception {
+    	StompFrame frame = new StompFrame("DISCONNECT");
+        sendFrame(frame.toString());    	
+    }
+    
+    public void send(String destination, String message) throws Exception {
+    	send(destination, message, null);
+    }
+	
+    public void send(String destination, String message, HashMap<String, String> headers) throws Exception {
+    	if (headers == null) {
+    		headers = new HashMap<String, String>();
+    	}
+    	headers.put("destination", destination);
+    	StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
+        sendFrame(frame.toString());    	
+    }
+    
+    public void subscribe(String destination) throws Exception {
+    	subscribe(destination, null, null);
+    }
+    
+    public void subscribe(String destination, String ack) throws Exception {
+    	subscribe(destination, ack, new HashMap<String, String>());
+    }
+    
+    public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
+		if (headers == null) {
+			headers = new HashMap<String, String>();
+		}
+		headers.put("destination", destination);
+    	if (ack != null) {
+    		headers.put("ack", ack);
+    	}
+    	StompFrame frame = new StompFrame("SUBSCRIBE", headers);
+        sendFrame(frame.toString());    	
+    }
+    
+    public void unsubscribe(String destination) throws Exception {
+    	unsubscribe(destination, null);
+    }
+    
+    public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
+		if (headers == null) {
+			headers = new HashMap<String, String>();
+		}
+		headers.put("destination", destination);
+    	StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
+        sendFrame(frame.toString());    	
+    }    
+    
+    public void begin(String transaction) throws Exception {
+    	HashMap<String, String> headers = new HashMap<String, String>();
+    	headers.put("transaction", transaction);
+    	StompFrame frame = new StompFrame("BEGIN", headers);
+    	sendFrame(frame.toString());
+    }
+    
+    public void abort(String transaction) throws Exception {
+    	HashMap<String, String> headers = new HashMap<String, String>();
+    	headers.put("transaction", transaction);
+    	StompFrame frame = new StompFrame("ABORT", headers);
+    	sendFrame(frame.toString());
+    }
+    
+    public void commit(String transaction) throws Exception {
+    	HashMap<String, String> headers = new HashMap<String, String>();
+    	headers.put("transaction", transaction);
+    	StompFrame frame = new StompFrame("COMMIT", headers);
+    	sendFrame(frame.toString());
+    }
+    
+    public void ack(StompFrame frame) throws Exception {
+    	ack(frame.getHeaders().get("message-id"), null);
+    }    
+    
+    public void ack(StompFrame frame, String transaction) throws Exception {
+    	ack(frame.getHeaders().get("message-id"), transaction);
+    }
+    
+    public void ack(String messageId) throws Exception {
+    	ack(messageId, null);
+    }
+    
+    public void ack(String messageId, String transaction) throws Exception {
+    	HashMap<String, String> headers = new HashMap<String, String>();
+    	headers.put("message-id", messageId);
+    	if (transaction != null)
+    		headers.put("transaction", transaction);
+    	StompFrame frame = new StompFrame("ACK", headers);
+    	sendFrame(frame.toString());	
+    }
+    
+    protected String appendHeaders(HashMap<String, Object> headers) {
+    	StringBuffer result = new StringBuffer();
+    	for (String key : headers.keySet()) {
+    		result.append(key + ":" + headers.get(key) + "\n");
+    	}
+    	result.append("\n");
+    	return result.toString();
+    }
 
 }

Modified: activemq/trunk/assembly/src/release/example/build.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/build.xml?rev=718590&r1=718589&r2=718590&view=diff
==============================================================================
--- activemq/trunk/assembly/src/release/example/build.xml (original)
+++ activemq/trunk/assembly/src/release/example/build.xml Tue Nov 18 05:56:13 2008
@@ -290,4 +290,12 @@
 		</java>
 	</target>	
 	
+	<target name="stomp" depends="compile" description="Runs a Stomp example">
+		<echo>Running a Stomp example</echo>
+		<java classname="StompExample" fork="yes" maxmemory="100M">
+			<classpath refid="javac.classpath" />
+			<jvmarg value="-server" />
+		</java>
+	</target>
+	
 </project>

Added: activemq/trunk/assembly/src/release/example/src/StompExample.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/StompExample.java?rev=718590&view=auto
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/StompExample.java (added)
+++ activemq/trunk/assembly/src/release/example/src/StompExample.java Tue Nov 18 05:56:13 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompConnection;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
+
+/**
+ * 
+ * This example demonstrates Stomp Java API
+ * 
+ * @version $Revision$
+ *
+ */
+public class StompExample {
+
+	public static void main(String args[]) throws Exception {
+		StompConnection connection = new StompConnection();
+		connection.open("localhost", 61613);
+		
+		connection.connect("system", "manager");
+		StompFrame connect = connection.receive();
+		if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
+			throw new Exception ("Not connected");
+		}
+		
+		connection.begin("tx1");
+		connection.send("/queue/test", "message1");
+		connection.send("/queue/test", "message2");
+		connection.commit("tx1");
+		
+		connection.subscribe("/queue/test", Subscribe.AckModeValues.CLIENT);
+		
+		connection.begin("tx2");
+		
+		StompFrame message = connection.receive();
+		System.out.println(message.getBody());
+		connection.ack(message, "tx2");
+		
+		message = connection.receive();
+		System.out.println(message.getBody());
+		connection.ack(message, "tx2");
+		
+		connection.commit("tx2");
+		
+		connection.disconnect();
+	}
+	
+}

Propchange: activemq/trunk/assembly/src/release/example/src/StompExample.java
------------------------------------------------------------------------------
    svn:eol-style = native