You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/18 19:31:07 UTC
svn commit: r786177 - in /activemq/sandbox/activemq-flow:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/
activemq-openwire/src/main/java/org/apache/activemq/command/...
Author: chirino
Date: Thu Jun 18 17:31:06 2009
New Revision: 786177
URL: http://svn.apache.org/viewvc?rev=786177&view=rev
Log:
Working on making more tests pass in the BrokerTest class.
* Composite sending works now
* Temp domains added
* Creating destination (even temp ones) now supported in the openwire protocol
* Made the ActiveMQDestination classes implement the apollo Destination interface to avoid extra object allocations
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/protobuf/AsciiBuffer.java
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java Thu Jun 18 17:31:06 2009
@@ -34,6 +34,8 @@
public static final AsciiBuffer TOPIC_DOMAIN = new AsciiBuffer("topic");
public static final AsciiBuffer QUEUE_DOMAIN = new AsciiBuffer("queue");
+ public static final AsciiBuffer TEMP_TOPIC_DOMAIN = new AsciiBuffer("temp-topic");
+ public static final AsciiBuffer TEMP_QUEUE_DOMAIN = new AsciiBuffer("temp-queue");
private final HashMap<AsciiBuffer, Domain> domains = new HashMap<AsciiBuffer, Domain>();
private VirtualHost virtualHost;
@@ -42,6 +44,8 @@
public Router() {
domains.put(QUEUE_DOMAIN, new QueueDomain());
domains.put(TOPIC_DOMAIN, new TopicDomain());
+ domains.put(TEMP_QUEUE_DOMAIN, new QueueDomain());
+ domains.put(TEMP_TOPIC_DOMAIN, new TopicDomain());
}
public Domain getDomain(AsciiBuffer name) {
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Thu Jun 18 17:31:06 2009
@@ -33,7 +33,6 @@
static final private AsciiBuffer ENCODING = new AsciiBuffer("openwire");
private final Message message;
- private Destination destination;
private AsciiBuffer producerId;
private OpenWireFormat storeWireFormat;
private PersistListener persistListener = null;
@@ -53,10 +52,7 @@
}
public Destination getDestination() {
- if (destination == null) {
- destination = OpenwireProtocolHandler.convert(message.getDestination());
- }
- return destination;
+ return message.getDestination();
}
public int getMemorySize() {
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu Jun 18 17:31:06 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.openwire;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -30,7 +31,6 @@
import org.apache.activemq.apollo.broker.ProtocolHandler;
import org.apache.activemq.apollo.broker.Router;
import org.apache.activemq.apollo.broker.VirtualHost;
-import org.apache.activemq.apollo.broker.BrokerSubscription.UserAlreadyConnectedException;
import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.command.ActiveMQDestination;
@@ -81,7 +81,6 @@
import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.flow.ISinkController.FlowControllable;
import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.state.CommandVisitor;
@@ -100,6 +99,8 @@
private Router router;
private VirtualHost host;
private final CommandVisitor visitor;
+
+ ArrayList<ActiveMQDestination> temporaryDestinations = new ArrayList<ActiveMQDestination>();
public OpenwireProtocolHandler() {
setStoreWireFormat(new OpenWireFormat());
@@ -159,6 +160,10 @@
// Message Processing Methods.
// /////////////////////////////////////////////////////////////////
public Response processMessage(Message info) throws Exception {
+ if( info.getOriginalDestination() == null ) {
+ info.setOriginalDestination(info.getDestination());
+ }
+
ProducerId producerId = info.getProducerId();
ProducerContext producerContext = producers.get(producerId);
@@ -249,7 +254,14 @@
// Methods for server management
// /////////////////////////////////////////////////////////////////
public Response processAddDestination(DestinationInfo info) throws Exception {
- throw new UnsupportedOperationException();
+ ActiveMQDestination destination = info.getDestination();
+ if( destination.isTemporary() ) {
+ // Keep track of it so that we can remove them this connection
+ // shuts down.
+ temporaryDestinations.add(destination);
+ }
+ host.createQueue(destination);
+ return ack(info);
}
public Response processRemoveDestination(DestinationInfo info) throws Exception {
@@ -577,7 +589,7 @@
* #getDestination()
*/
public Destination getDestination() {
- return convert(info.getDestination());
+ return info.getDestination();
}
/*
@@ -681,26 +693,6 @@
}
- static public Destination convert(ActiveMQDestination dest) {
- if (dest.isComposite()) {
- ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
- Destination.MultiDestination md = new Destination.MultiDestination();
- for (int i = 0; i < compositeDestinations.length; i++) {
- md.add(convert(compositeDestinations[i]));
- }
- return md;
- }
- AsciiBuffer domain;
- if (dest.isQueue()) {
- domain = Router.QUEUE_DOMAIN;
- } else if (dest.isTopic()) {
- domain = Router.TOPIC_DOMAIN;
- } else {
- throw new IllegalArgumentException("Unsupported domain type: " + dest);
- }
- return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
- }
-
private static BooleanExpression parseSelector(ConsumerInfo info) throws FilterException {
BooleanExpression rc = null;
if (info.getSelector() != null) {
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java Thu Jun 18 17:31:06 2009
@@ -22,6 +22,8 @@
import java.io.ObjectOutput;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -35,6 +37,7 @@
import javax.jms.Topic;
import org.apache.activemq.jndi.JNDIBaseStorable;
+import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
@@ -42,7 +45,7 @@
* @openwire:marshaller
* @version $Revision: 1.10 $
*/
-public abstract class ActiveMQDestination extends JNDIBaseStorable implements DataStructure, Destination, Externalizable, Comparable {
+public abstract class ActiveMQDestination extends JNDIBaseStorable implements DataStructure, Destination, Externalizable, Comparable, org.apache.activemq.apollo.broker.Destination {
public static final String PATH_SEPERATOR = ".";
public static final char COMPOSITE_SEPERATOR = ',';
@@ -62,7 +65,7 @@
private static final long serialVersionUID = -3885260014960795889L;
- protected String physicalName;
+ protected AsciiBuffer physicalName;
protected transient ActiveMQDestination[] compositeDestinations;
protected transient String[] destinationPaths;
@@ -81,6 +84,21 @@
setCompositeDestinations(composites);
}
+
+ public AsciiBuffer getName() {
+ return physicalName;
+ }
+
+ abstract public AsciiBuffer getDomain();
+
+ public Collection<org.apache.activemq.apollo.broker.Destination> getDestinations() {
+ if( !isComposite() ) {
+ return null;
+ }
+ List<org.apache.activemq.apollo.broker.Destination> t = (List)Arrays.asList(compositeDestinations);
+ return t;
+ }
+
// static helper methods for working with destinations
// -------------------------------------------------------------------------
@@ -185,12 +203,12 @@
sb.append(destinations[i].getQualifiedName());
}
}
- physicalName = sb.toString();
+ physicalName = new AsciiBuffer(sb.toString());
}
public String getQualifiedName() {
if (isComposite()) {
- return physicalName;
+ return physicalName.toString();
}
return getQualifiedPrefix() + physicalName;
}
@@ -201,16 +219,16 @@
* @openwire:property version=1
*/
public String getPhysicalName() {
- return physicalName;
+ return physicalName.toString();
}
- public void setPhysicalName(String physicalName) {
- final int len = physicalName.length();
+ public void setPhysicalName(String value) {
+ final int len = value.length();
// options offset
int p = -1;
boolean composite = false;
for (int i = 0; i < len; i++) {
- char c = physicalName.charAt(i);
+ char c = value.charAt(i);
if (c == '?') {
p = i;
break;
@@ -225,21 +243,21 @@
}
// Strip off any options
if (p >= 0) {
- String optstring = physicalName.substring(p + 1);
- physicalName = physicalName.substring(0, p);
+ String optstring = value.substring(p + 1);
+ value = value.substring(0, p);
try {
options = URISupport.parseQuery(optstring);
} catch (URISyntaxException e) {
- throw new IllegalArgumentException("Invalid destination name: " + physicalName + ", it's options are not encoded properly: " + e);
+ throw new IllegalArgumentException("Invalid destination name: " + value + ", it's options are not encoded properly: " + e);
}
}
- this.physicalName = physicalName;
+ this.physicalName = new AsciiBuffer(value);
this.destinationPaths = null;
this.hashValue = 0;
if (composite) {
// Check to see if it is a composite.
List<String> l = new ArrayList<String>();
- StringTokenizer iter = new StringTokenizer(physicalName, "" + COMPOSITE_SEPERATOR);
+ StringTokenizer iter = new StringTokenizer(value, "" + COMPOSITE_SEPERATOR);
while (iter.hasMoreTokens()) {
String name = iter.nextToken().trim();
if (name.length() == 0) {
@@ -268,7 +286,7 @@
}
List<String> l = new ArrayList<String>();
- StringTokenizer iter = new StringTokenizer(physicalName, PATH_SEPERATOR);
+ StringTokenizer iter = new StringTokenizer(physicalName.toString(), PATH_SEPERATOR);
while (iter.hasMoreTokens()) {
String name = iter.nextToken().trim();
if (name.length() == 0) {
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java Thu Jun 18 17:31:06 2009
@@ -16,9 +16,15 @@
*/
package org.apache.activemq.command;
+import java.util.Collection;
+
import javax.jms.JMSException;
import javax.jms.Queue;
+import org.apache.activemq.apollo.broker.Destination;
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
/**
*
* @org.apache.xbean.XBean element="queue" description="An ActiveMQ Queue
@@ -59,4 +65,9 @@
return QUEUE_QUALIFIED_PREFIX;
}
+ @Override
+ public AsciiBuffer getDomain() {
+ return Router.QUEUE_DOMAIN;
+ }
+
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java Thu Jun 18 17:31:06 2009
@@ -65,9 +65,10 @@
// Parse off the sequenceId off the end.
// this can fail if the temp destination is
// generated by another JMS system via the JMS<->JMS Bridge
- int p = this.physicalName.lastIndexOf(":");
+ String value = physicalName.toString();
+ int p = value.lastIndexOf(":");
if (p >= 0) {
- String seqStr = this.physicalName.substring(p + 1).trim();
+ String seqStr = value.substring(p + 1).trim();
if (seqStr != null && seqStr.length() > 0) {
try {
sequenceId = Integer.parseInt(seqStr);
@@ -75,7 +76,7 @@
LOG.debug("Did not parse sequence Id from " + physicalName);
}
// The rest should be the connection id.
- connectionId = this.physicalName.substring(0, p);
+ connectionId = value.substring(0, p);
}
}
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java Thu Jun 18 17:31:06 2009
@@ -19,6 +19,9 @@
import javax.jms.JMSException;
import javax.jms.TemporaryQueue;
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
/**
* @openwire:marshaller code="102"
* @version $Revision: 1.6 $
@@ -59,4 +62,9 @@
return TEMP_QUEUE_QUALIFED_PREFIX;
}
+ @Override
+ public AsciiBuffer getDomain() {
+ return Router.TEMP_QUEUE_DOMAIN;
+ }
+
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java Thu Jun 18 17:31:06 2009
@@ -19,6 +19,9 @@
import javax.jms.JMSException;
import javax.jms.TemporaryTopic;
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
/**
* @openwire:marshaller code="103"
* @version $Revision: 1.6 $
@@ -58,4 +61,9 @@
protected String getQualifiedPrefix() {
return TEMP_TOPIC_QUALIFED_PREFIX;
}
+
+ @Override
+ public AsciiBuffer getDomain() {
+ return Router.TEMP_TOPIC_DOMAIN;
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java Thu Jun 18 17:31:06 2009
@@ -19,6 +19,9 @@
import javax.jms.JMSException;
import javax.jms.Topic;
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
/**
* @org.apache.xbean.XBean element="topic" description="An ActiveMQ Topic
* Destination"
@@ -57,4 +60,9 @@
return TOPIC_QUALIFIED_PREFIX;
}
+ @Override
+ public AsciiBuffer getDomain() {
+ return Router.TOPIC_DOMAIN;
+ }
+
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java Thu Jun 18 17:31:06 2009
@@ -44,6 +44,74 @@
public boolean durableConsumer;
protected static final int MAX_NULL_WAIT=500;
+ public void initCombosForTestCompositeSend() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+ Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+ }
+
+ public void testCompositeSend() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType);
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destinationA);
+ consumerInfo1.setRetroactive(true);
+ consumerInfo1.setPrefetchSize(100);
+ connection1.request(consumerInfo1);
+
+ // Setup a second connection
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+
+ ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", destinationType);
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationB);
+ consumerInfo2.setRetroactive(true);
+ consumerInfo2.setPrefetchSize(100);
+ connection2.request(consumerInfo2);
+
+ // Send the messages to the composite destination.
+ ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B",
+ destinationType);
+ for (int i = 0; i < 4; i++) {
+ connection1.request(createMessage(producerInfo1, compositeDestination, deliveryMode));
+ }
+
+ // The messages should have been delivered to both the A and B
+ // destination.
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ Message m2 = receiveMessage(connection2);
+
+ assertNotNull(m1);
+ assertNotNull(m2);
+
+ assertEquals(m1.getMessageId(), m2.getMessageId());
+ assertEquals(compositeDestination, m1.getOriginalDestination());
+ assertEquals(compositeDestination, m2.getOriginalDestination());
+
+ connection1.request(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+ connection2.request(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
+
+ }
+
+ assertNoMessagesLeft(connection1);
+ assertNoMessagesLeft(connection2);
+
+ connection1.send(closeConnectionInfo(connectionInfo1));
+ connection2.send(closeConnectionInfo(connectionInfo2));
+ }
public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
@@ -331,167 +399,6 @@
connection.send(closeConnectionInfo(connectionInfo));
}
- public void initCombosForTestTransactedAckWithPrefetchOfOne() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType",
- new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
- Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
- Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
- Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
- }
-
- public void testTransactedAckWithPrefetchOfOne() throws Exception {
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo1);
-
- destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
-
- ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
- consumerInfo1.setPrefetchSize(1);
- connection1.send(consumerInfo1);
-
- // Send the messages
- for (int i = 0; i < 4; i++) {
- Message message = createMessage(producerInfo1, destination, deliveryMode);
- connection1.send(message);
- }
-
-
-
- // Now get the messages.
- for (int i = 0; i < 4; i++) {
- // Begin the transaction.
- LocalTransactionId txid = createLocalTransaction(sessionInfo1);
- connection1.send(createBeginTransaction(connectionInfo1, txid));
- Message m1 = receiveMessage(connection1);
- assertNotNull(m1);
- MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
- ack.setTransactionId(txid);
- connection1.send(ack);
- // Commit the transaction.
- connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
- }
- assertNoMessagesLeft(connection1);
- }
-
- public void initCombosForTestTransactedSend() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType",
- new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
- Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
- Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
- Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
- }
-
- public void testTransactedSend() throws Exception {
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo1);
-
- destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
-
- ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
- consumerInfo1.setPrefetchSize(100);
- connection1.send(consumerInfo1);
-
- // Begin the transaction.
- LocalTransactionId txid = createLocalTransaction(sessionInfo1);
- connection1.send(createBeginTransaction(connectionInfo1, txid));
-
- // Send the messages
- for (int i = 0; i < 4; i++) {
- Message message = createMessage(producerInfo1, destination, deliveryMode);
- message.setTransactionId(txid);
- connection1.request(message);
- }
-
- // The point of this test is that message should not be delivered until
- // send is committed.
- assertNull(receiveMessage(connection1,MAX_NULL_WAIT));
-
- // Commit the transaction.
- connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
-
- // Now get the messages.
- for (int i = 0; i < 4; i++) {
- Message m1 = receiveMessage(connection1);
- assertNotNull(m1);
- }
-
- assertNoMessagesLeft(connection1);
- }
-
- public void initCombosForTestQueueTransactedAck() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType",
- new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
- Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
- }
-
- public void testQueueTransactedAck() throws Exception {
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo1);
-
- destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
-
- ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
- consumerInfo1.setPrefetchSize(100);
- connection1.send(consumerInfo1);
-
- // Send the messages
- for (int i = 0; i < 4; i++) {
- Message message = createMessage(producerInfo1, destination, deliveryMode);
- connection1.send(message);
- }
-
- // Begin the transaction.
- LocalTransactionId txid = createLocalTransaction(sessionInfo1);
- connection1.send(createBeginTransaction(connectionInfo1, txid));
-
- // Acknowledge the first 2 messages.
- for (int i = 0; i < 2; i++) {
- Message m1 = receiveMessage(connection1);
- assertNotNull("m1 is null for index: " + i, m1);
- MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
- ack.setTransactionId(txid);
- connection1.request(ack);
- }
-
- // Commit the transaction.
- connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
-
- // The queue should now only have the remaining 2 messages
- assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination));
- }
-
- public void initCombosForTestConsumerCloseCausesRedelivery() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
- }
public void testConsumerCloseCausesRedelivery() throws Exception {
@@ -711,52 +618,6 @@
addCombinationValues("durableConsumer", new Object[] {Boolean.TRUE, Boolean.FALSE});
}
- public void testTopicRetroactiveConsumerSeeMessagesBeforeCreation() throws Exception {
-
- ActiveMQDestination destination = new ActiveMQTopic("TEST");
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- connectionInfo1.setClientId("A");
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo1);
-
- // Send the messages
- Message m = createMessage(producerInfo1, destination, deliveryMode);
- connection1.send(m);
-
- // Create the durable subscription.
- ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
- if (durableConsumer) {
- consumerInfo1.setSubscriptionName("test");
- }
- consumerInfo1.setPrefetchSize(100);
- consumerInfo1.setRetroactive(true);
- connection1.send(consumerInfo1);
-
- connection1.send(createMessage(producerInfo1, destination, deliveryMode));
- connection1.request(createMessage(producerInfo1, destination, deliveryMode));
-
- // the behavior is VERY dependent on the recovery policy used.
- // But the default broker settings try to make it as consistent as
- // possible
-
- // Subscription should see all messages sent.
- Message m2 = receiveMessage(connection1);
- assertNotNull(m2);
- assertEquals(m.getMessageId(), m2.getMessageId());
- for (int i = 0; i < 2; i++) {
- m2 = receiveMessage(connection1);
- assertNotNull(m2);
- }
-
- assertNoMessagesLeft(connection1);
- }
-
//
// TODO: need to reimplement this since we don't fail when we send to a
// non-existant
@@ -990,100 +851,32 @@
connection1.send(producerInfo1);
// setup the composite consumer.
- ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B",
- destinationType);
- ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, compositeDestination);
- consumerInfo1.setRetroactive(true);
- consumerInfo1.setPrefetchSize(100);
- connection1.send(consumerInfo1);
-
- // Publish to the two destinations
- ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType);
- ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", destinationType);
-
- // Send a message to each destination .
- connection1.send(createMessage(producerInfo1, destinationA, deliveryMode));
- connection1.send(createMessage(producerInfo1, destinationB, deliveryMode));
-
- // The consumer should get both messages.
- for (int i = 0; i < 2; i++) {
- Message m1 = receiveMessage(connection1);
- assertNotNull(m1);
- }
-
- assertNoMessagesLeft(connection1);
- connection1.send(closeConnectionInfo(connectionInfo1));
- }
-
- public void initCombosForTestCompositeSend() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
- Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
- }
-
- public void testCompositeSend() throws Exception {
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo1);
-
- ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType);
- ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destinationA);
+ ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B",
+ destinationType);
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, compositeDestination);
consumerInfo1.setRetroactive(true);
consumerInfo1.setPrefetchSize(100);
- connection1.request(consumerInfo1);
-
- // Setup a second connection
- StubConnection connection2 = createConnection();
- ConnectionInfo connectionInfo2 = createConnectionInfo();
- SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
- connection2.send(connectionInfo2);
- connection2.send(sessionInfo2);
+ connection1.send(consumerInfo1);
+ // Publish to the two destinations
+ ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType);
ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", destinationType);
- ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationB);
- consumerInfo2.setRetroactive(true);
- consumerInfo2.setPrefetchSize(100);
- connection2.request(consumerInfo2);
- // Send the messages to the composite destination.
- ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B",
- destinationType);
- for (int i = 0; i < 4; i++) {
- connection1.request(createMessage(producerInfo1, compositeDestination, deliveryMode));
- }
+ // Send a message to each destination .
+ connection1.send(createMessage(producerInfo1, destinationA, deliveryMode));
+ connection1.send(createMessage(producerInfo1, destinationB, deliveryMode));
- // The messages should have been delivered to both the A and B
- // destination.
- for (int i = 0; i < 4; i++) {
+ // The consumer should get both messages.
+ for (int i = 0; i < 2; i++) {
Message m1 = receiveMessage(connection1);
- Message m2 = receiveMessage(connection2);
-
assertNotNull(m1);
- assertNotNull(m2);
-
- assertEquals(m1.getMessageId(), m2.getMessageId());
- assertEquals(compositeDestination, m1.getOriginalDestination());
- assertEquals(compositeDestination, m2.getOriginalDestination());
-
- connection1.request(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
- connection2.request(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
-
}
assertNoMessagesLeft(connection1);
- assertNoMessagesLeft(connection2);
-
connection1.send(closeConnectionInfo(connectionInfo1));
- connection2.send(closeConnectionInfo(connectionInfo2));
}
+
public void initCombosForTestConnectionCloseCascades() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)});
@@ -1739,6 +1532,215 @@
assertNotNull(m3);
connection.request(createAck(consumerInfo, m3, 1, MessageAck.DELIVERED_ACK_TYPE));
}
+
+ public void initCombosForTestTransactedAckWithPrefetchOfOne() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+ Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testTransactedAckWithPrefetchOfOne() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ consumerInfo1.setPrefetchSize(1);
+ connection1.send(consumerInfo1);
+
+ // Send the messages
+ for (int i = 0; i < 4; i++) {
+ Message message = createMessage(producerInfo1, destination, deliveryMode);
+ connection1.send(message);
+ }
+
+
+
+ // Now get the messages.
+ for (int i = 0; i < 4; i++) {
+ // Begin the transaction.
+ LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+ connection1.send(createBeginTransaction(connectionInfo1, txid));
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection1.send(ack);
+ // Commit the transaction.
+ connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
+ }
+ assertNoMessagesLeft(connection1);
+ }
+
+ public void initCombosForTestTransactedSend() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+ Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testTransactedSend() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ consumerInfo1.setPrefetchSize(100);
+ connection1.send(consumerInfo1);
+
+ // Begin the transaction.
+ LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+ connection1.send(createBeginTransaction(connectionInfo1, txid));
+
+ // Send the messages
+ for (int i = 0; i < 4; i++) {
+ Message message = createMessage(producerInfo1, destination, deliveryMode);
+ message.setTransactionId(txid);
+ connection1.request(message);
+ }
+
+ // The point of this test is that message should not be delivered until
+ // send is committed.
+ assertNull(receiveMessage(connection1,MAX_NULL_WAIT));
+
+ // Commit the transaction.
+ connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
+
+ // Now get the messages.
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ }
+
+ assertNoMessagesLeft(connection1);
+ }
+
+ public void initCombosForTestQueueTransactedAck() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+ }
+
+ public void testQueueTransactedAck() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ consumerInfo1.setPrefetchSize(100);
+ connection1.send(consumerInfo1);
+
+ // Send the messages
+ for (int i = 0; i < 4; i++) {
+ Message message = createMessage(producerInfo1, destination, deliveryMode);
+ connection1.send(message);
+ }
+
+ // Begin the transaction.
+ LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+ connection1.send(createBeginTransaction(connectionInfo1, txid));
+
+ // Acknowledge the first 2 messages.
+ for (int i = 0; i < 2; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull("m1 is null for index: " + i, m1);
+ MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection1.request(ack);
+ }
+
+ // Commit the transaction.
+ connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
+
+ // The queue should now only have the remaining 2 messages
+ assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination));
+ }
+
+ public void testTopicRetroactiveConsumerSeeMessagesBeforeCreation() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQTopic("TEST");
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ connectionInfo1.setClientId("A");
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ // Send the messages
+ Message m = createMessage(producerInfo1, destination, deliveryMode);
+ connection1.send(m);
+
+ // Create the durable subscription.
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ if (durableConsumer) {
+ consumerInfo1.setSubscriptionName("test");
+ }
+ consumerInfo1.setPrefetchSize(100);
+ consumerInfo1.setRetroactive(true);
+ connection1.send(consumerInfo1);
+
+ connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+ connection1.request(createMessage(producerInfo1, destination, deliveryMode));
+
+ // the behavior is VERY dependent on the recovery policy used.
+ // But the default broker settings try to make it as consistent as
+ // possible
+
+ // Subscription should see all messages sent.
+ Message m2 = receiveMessage(connection1);
+ assertNotNull(m2);
+ assertEquals(m.getMessageId(), m2.getMessageId());
+ for (int i = 0; i < 2; i++) {
+ m2 = receiveMessage(connection1);
+ assertNotNull(m2);
+ }
+
+ assertNoMessagesLeft(connection1);
+ }
+
+ public void initCombosForTestConsumerCloseCausesRedelivery() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
+ }
+
public static Test suite() {
return suite(BrokerTest.class);
Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/protobuf/AsciiBuffer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/protobuf/AsciiBuffer.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/protobuf/AsciiBuffer.java (original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/protobuf/AsciiBuffer.java Thu Jun 18 17:31:06 2009
@@ -4,7 +4,8 @@
final public class AsciiBuffer extends Buffer {
private int hashCode;
-
+ private String value;
+
public AsciiBuffer(Buffer other) {
super(other);
}
@@ -17,8 +18,9 @@
super(data);
}
- public AsciiBuffer(String input) {
- super(encode(input));
+ public AsciiBuffer(String value) {
+ super(encode(value));
+ this.value = value;
}
public AsciiBuffer compact() {
@@ -30,7 +32,10 @@
public String toString()
{
- return decode(this);
+ if( value == null ) {
+ value = decode(this);
+ }
+ return value;
}
@Override