You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/10/15 21:12:28 UTC

svn commit: r704995 [1/3] - in /activemq/sandbox/chirino-pb/activemq-core: ./ src/main/java/org/apache/activemq/advisory/ src/main/java/org/apache/activemq/broker/ src/main/java/org/apache/activemq/command/ src/main/java/org/apache/activemq/pbwire/ src...

Author: chirino
Date: Wed Oct 15 12:12:26 2008
New Revision: 704995

URL: http://svn.apache.org/viewvc?rev=704995&view=rev
Log:
First pass of a protocol buffer based marshalling impl

Added:
    activemq/sandbox/chirino-pb/activemq-core/PBWIRE-README.txt
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProtocolBufferBacked.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBCommand.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBCommandVisitor.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBConversionSupport.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBWireFormat.java   (with props)
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBWireFormatFactory.java   (with props)
    activemq/sandbox/chirino-pb/activemq-core/src/main/proto/
    activemq/sandbox/chirino-pb/activemq-core/src/main/proto/pbwire.proto
    activemq/sandbox/chirino-pb/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/pb
    activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/pbwire/
    activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/pbwire/PBAMQStoreQueueTest.java
    activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/pbwire/SimpleTest.java
Modified:
    activemq/sandbox/chirino-pb/activemq-core/pom.xml
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQQueue.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTopic.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Command.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerId.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DataStructure.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/FlushCommand.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalQueueAck.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTopicAck.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTransaction.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/LocalTransactionId.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageId.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerId.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/RemoveInfo.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Response.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/SessionId.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/SessionInfo.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/TransactionId.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/TransactionInfo.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
    activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
    activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
    activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java

Added: activemq/sandbox/chirino-pb/activemq-core/PBWIRE-README.txt
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/PBWIRE-README.txt?rev=704995&view=auto
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/PBWIRE-README.txt (added)
+++ activemq/sandbox/chirino-pb/activemq-core/PBWIRE-README.txt Wed Oct 15 12:12:26 2008
@@ -0,0 +1,13 @@
+=======================================================================
+ Implementation Notes for PB Wire Format
+=======================================================================
+
+- Need to see the actual number of byte[] copies that are done for each 
+  marshall and unmarshall.  I suspect that we will need a custom
+  CodedInputStream/CodedOutputStream implementations so we can do more 
+  efficient scatter gather type byte[] handling.
+  
+- There are several objects which just wrap a String value, we might 
+  need to consider re-factoring that out since it does not add value:
+   * BrokerId
+   * ConnectionId
\ No newline at end of file

Modified: activemq/sandbox/chirino-pb/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/pom.xml?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/pom.xml (original)
+++ activemq/sandbox/chirino-pb/activemq-core/pom.xml Wed Oct 15 12:12:26 2008
@@ -91,6 +91,13 @@
       <artifactId>camel-jms</artifactId>
       <optional>true</optional>
     </dependency>
+    
+    <dependency>
+      <groupId>org.apache.activemq.protobuf</groupId>
+      <artifactId>activemq-protobuf</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>        
+
 
     <dependency>
       <groupId>org.apache.geronimo.specs</groupId>
@@ -343,6 +350,20 @@
     
   <build>
     <plugins>
+    
+      <plugin>
+        <groupId>org.apache.activemq.protobuf</groupId>
+        <artifactId>activemq-protobuf</artifactId>
+        <version>1.0-SNAPSHOT</version>
+         <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    
       <!-- Configure which tests are included/excuded -->
       <plugin>
         <artifactId>maven-surefire-plugin</artifactId>

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Wed Oct 15 12:12:26 2008
@@ -418,7 +418,9 @@
             advisoryMessage.setPersistent(false);
             advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
             advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
-            advisoryMessage.setTargetConsumerId(targetConsumerId);
+            if( targetConsumerId!=null ) {
+                advisoryMessage.setTargetConsumerId(targetConsumerId);
+            }
             advisoryMessage.setDestination(topic);
             advisoryMessage.setResponseRequired(false);
             advisoryMessage.setProducerId(advisoryProducerId);

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Oct 15 12:12:26 2008
@@ -209,6 +209,7 @@
     }
 
     public void serviceTransportException(IOException e) {
+        LOG.warn("Transport failed:",e);
     	BrokerService bService=connector.getBrokerService();
     	if(bService.isShutdownOnSlaveFailure()){
 	    	if(brokerInfo!=null){

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java Wed Oct 15 12:12:26 2008
@@ -33,6 +33,7 @@
 import javax.jms.MessageNotReadableException;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.PBMessage.PBMessageType;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
@@ -99,6 +100,14 @@
     protected transient DataInputStream dataIn;
     protected transient int length;
 
+    public ActiveMQBytesMessage() {
+        super(new PBMessage().setMessageType(PBMessageType.BYTES_MESSAGE));
+    }
+    
+    public ActiveMQBytesMessage(PBMessage message) {
+        super(message);
+    }
+
     public Message copy() {
         ActiveMQBytesMessage copy = new ActiveMQBytesMessage();
         copy(copy);
@@ -123,7 +132,7 @@
             if (dataOut != null) {
                 dataOut.close();
                 ByteSequence bs = bytesOut.toByteSequence();
-                if (compressed) {
+                if (isCompressed()) {
                     int pos = bs.offset;
                     ByteSequenceData.writeIntBig(bs, length);
                     bs.offset = pos;
@@ -784,7 +793,7 @@
                     throw JMSExceptionSupport.create(e);
                 }
                 length = 0;
-                compressed = true;
+                setCompressed(true);
                 Deflater deflater = new Deflater(Deflater.BEST_SPEED);
                 os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) {
                     public void write(byte[] arg0) throws IOException {

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQDestination.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQDestination.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQDestination.java Wed Oct 15 12:12:26 2008
@@ -62,26 +62,27 @@
 
     private static final long serialVersionUID = -3885260014960795889L;
 
-    protected String physicalName;
-
     protected transient ActiveMQDestination[] compositeDestinations;
     protected transient String[] destinationPaths;
     protected transient boolean isPattern;
     protected transient int hashValue;
     protected Map<String, String> options;
+    protected PBActiveMQDestination pb;
     
-    public ActiveMQDestination() {
-    }
 
-    protected ActiveMQDestination(String name) {
-        setPhysicalName(name);
-    }
-
-    public ActiveMQDestination(ActiveMQDestination composites[]) {
+    public ActiveMQDestination(PBActiveMQDestination pb, ActiveMQDestination composites[]) {
+        this(pb);
         setCompositeDestinations(composites);
     }
 
 
+    public ActiveMQDestination(PBActiveMQDestination pb) {
+        this.pb = pb;
+        if( pb.hasName() ) {
+            setPhysicalName(pb.getName());
+        }
+    }
+
     // static helper methods for working with destinations
     // -------------------------------------------------------------------------
     public static ActiveMQDestination createDestination(String name, byte defaultType) {
@@ -185,14 +186,14 @@
                 sb.append(destinations[i].getQualifiedName());
             }
         }
-        physicalName = sb.toString();
+        pb.setName(sb.toString());
     }
 
     public String getQualifiedName() {
         if (isComposite()) {
-            return physicalName;
+            return pb.getName();
         }
-        return getQualifiedPrefix() + physicalName;
+        return getQualifiedPrefix() + pb.getName();
     }
 
     protected abstract String getQualifiedPrefix();
@@ -201,7 +202,7 @@
      * @openwire:property version=1
      */
     public String getPhysicalName() {
-        return physicalName;
+        return pb.getName();
     }
 
     public void setPhysicalName(String physicalName) {
@@ -233,7 +234,7 @@
                 throw new IllegalArgumentException("Invalid destination name: " + physicalName + ", it's options are not encoded properly: " + e);
             }
         }
-        this.physicalName = physicalName;
+        this.pb.setName(physicalName);
         this.destinationPaths = null;
         this.hashValue = 0;
         if (composite) {
@@ -268,7 +269,7 @@
         }
 
         List<String> l = new ArrayList<String>();
-        StringTokenizer iter = new StringTokenizer(physicalName, PATH_SEPERATOR);
+        StringTokenizer iter = new StringTokenizer(pb.getName(), PATH_SEPERATOR);
         while (iter.hasMoreTokens()) {
             String name = iter.nextToken().trim();
             if (name.length() == 0) {
@@ -305,12 +306,12 @@
         }
 
         ActiveMQDestination d = (ActiveMQDestination)o;
-        return physicalName.equals(d.physicalName);
+        return d.pb.equals(pb);
     }
 
     public int hashCode() {
         if (hashValue == 0) {
-            hashValue = physicalName.hashCode();
+            hashValue = pb.hashCode();
         }
         return hashValue;
     }
@@ -368,4 +369,8 @@
     public boolean isPattern() {
         return isPattern;
     }
+
+    public PBActiveMQDestination getPB() {
+        return pb;
+    }
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java Wed Oct 15 12:12:26 2008
@@ -34,6 +34,7 @@
 import javax.jms.MessageNotWriteableException;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.PBMessage.PBMessageType;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
@@ -101,6 +102,14 @@
 
     protected transient Map<String, Object> map = new HashMap<String, Object>();
 
+    public ActiveMQMapMessage() {
+        super(new PBMessage().setMessageType(PBMessageType.MAP_MESSAGE));
+    }
+
+    public ActiveMQMapMessage(PBMessage message) {
+        super(message);
+    }
+
     public Message copy() {
         ActiveMQMapMessage copy = new ActiveMQMapMessage();
         copy(copy);
@@ -125,7 +134,7 @@
                 OutputStream os = bytesOut;
                 ActiveMQConnection connection = getConnection();
                 if (connection != null && connection.isUseCompression()) {
-                    compressed = true;
+                    setCompressed(true);
                     os = new DeflaterOutputStream(os);
                 }
                 DataOutputStream dataOut = new DataOutputStream(os);

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java Wed Oct 15 12:12:26 2008
@@ -32,6 +32,7 @@
 import javax.jms.MessageNotWriteableException;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.PBMessage.PBMessageType;
 import org.apache.activemq.filter.PropertyExpression;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.util.Callback;
@@ -49,6 +50,14 @@
 
     protected transient Callback acknowledgeCallback;
 
+    public ActiveMQMessage() {
+        super(new PBMessage().setMessageType(PBMessageType.EMPTY_MESSAGE));
+    }
+    
+    public ActiveMQMessage(PBMessage message) {
+        super(message);
+    }
+
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
@@ -131,7 +140,7 @@
                 // so lets set the IDs to be 1
                 MessageId id = new MessageId();
                 id.setTextView(value);
-                this.setMessageId(messageId);
+                this.setMessageId(id);
             }
         } else {
             this.setMessageId(null);
@@ -638,4 +647,5 @@
     public Response visit(CommandVisitor visitor) throws Exception {
         return visitor.processMessage(this);
     }
+
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java Wed Oct 15 12:12:26 2008
@@ -31,6 +31,7 @@
 import javax.jms.ObjectMessage;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.PBMessage.PBMessageType;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
@@ -70,6 +71,14 @@
 
     protected transient Serializable object;
 
+    public ActiveMQObjectMessage() {
+        super(new PBMessage().setMessageType(PBMessageType.OBJECT_MESSAGE));
+    }
+
+    public ActiveMQObjectMessage(PBMessage message) {
+        super(message);
+    }
+
     public Message copy() {
         ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
         copy(copy);
@@ -90,7 +99,7 @@
                 OutputStream os = bytesOut;
                 ActiveMQConnection connection = getConnection();
                 if (connection != null && connection.isUseCompression()) {
-                    compressed = true;
+                    setCompressed(true);
                     os = new DeflaterOutputStream(os);
                 }
                 DataOutputStream dataOut = new DataOutputStream(os);

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQQueue.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQQueue.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQQueue.java Wed Oct 15 12:12:26 2008
@@ -19,6 +19,8 @@
 import javax.jms.JMSException;
 import javax.jms.Queue;
 
+import org.apache.activemq.command.PBActiveMQDestination.DestinationType;
+
 /**
  * 
  * @org.apache.xbean.XBean element="queue" description="An ActiveMQ Queue
@@ -33,10 +35,15 @@
     private static final long serialVersionUID = -3885260014960795889L;
 
     public ActiveMQQueue() {
+        super(new PBActiveMQDestination().setType(DestinationType.QUEUE));
     }
 
     public ActiveMQQueue(String name) {
-        super(name);
+        super(new PBActiveMQDestination().setName(name).setType(DestinationType.QUEUE));
+    }
+
+    public ActiveMQQueue(PBActiveMQDestination pb) {
+        super(pb);
     }
 
     public byte getDataStructureType() {

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java Wed Oct 15 12:12:26 2008
@@ -35,6 +35,7 @@
 import javax.jms.StreamMessage;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.PBMessage.PBMessageType;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
@@ -118,6 +119,14 @@
     protected transient DataInputStream dataIn;
     protected transient int remainingBytes = -1;
 
+    public ActiveMQStreamMessage() {
+        super(new PBMessage().setMessageType(PBMessageType.STREAM_MESSAGE));
+    }
+
+    public ActiveMQStreamMessage(PBMessage message) {
+        super(message);
+    }
+
     public Message copy() {
         ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
         copy(copy);
@@ -1113,7 +1122,7 @@
             OutputStream os = bytesOut;
             ActiveMQConnection connection = getConnection();
             if (connection != null && connection.isUseCompression()) {
-                compressed = true;
+                setCompressed(true);
                 os = new DeflaterOutputStream(os);
             }
             this.dataOut = new DataOutputStream(os);

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java Wed Oct 15 12:12:26 2008
@@ -32,15 +32,13 @@
     protected transient String connectionId;
     protected transient int sequenceId;
 
-    public ActiveMQTempDestination() {
-    }
 
-    public ActiveMQTempDestination(String name) {
-        super(name);
+    public ActiveMQTempDestination(PBActiveMQDestination pb, ActiveMQDestination[] composites) {
+        super(pb, composites);
     }
 
-    public ActiveMQTempDestination(String connectionId, long sequenceId) {
-        super(connectionId + ":" + sequenceId);
+    public ActiveMQTempDestination(PBActiveMQDestination pb) {
+        super(pb);
     }
 
     public boolean isTemporary() {
@@ -65,9 +63,9 @@
             // Parse off the sequenceId off the end.
             // this can fail if the temp destination is
             // generated by another JMS system via the JMS<->JMS Bridge
-            int p = this.physicalName.lastIndexOf(":");
+            int p = this.pb.getName().lastIndexOf(":");
             if (p >= 0) {
-                String seqStr = this.physicalName.substring(p + 1).trim();
+                String seqStr = this.pb.getName().substring(p + 1).trim();
                 if (seqStr != null && seqStr.length() > 0) {
                     try {
                         sequenceId = Integer.parseInt(seqStr);
@@ -75,7 +73,7 @@
                         LOG.debug("Did not parse sequence Id from " + physicalName);
                     }
                     // The rest should be the connection id.
-                    connectionId = this.physicalName.substring(0, p);
+                    connectionId = this.pb.getName().substring(0, p);
                 }
             }
         }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java Wed Oct 15 12:12:26 2008
@@ -19,6 +19,8 @@
 import javax.jms.JMSException;
 import javax.jms.TemporaryQueue;
 
+import org.apache.activemq.command.PBActiveMQDestination.DestinationType;
+
 /**
  * @openwire:marshaller code="102"
  * @version $Revision: 1.6 $
@@ -29,14 +31,19 @@
     private static final long serialVersionUID = 6683049467527633867L;
 
     public ActiveMQTempQueue() {
+        super(new PBActiveMQDestination().setType(DestinationType.TEMP_QUEUE));
     }
 
     public ActiveMQTempQueue(String name) {
-        super(name);
+        super(new PBActiveMQDestination().setType(DestinationType.TEMP_QUEUE).setName(name));
     }
 
     public ActiveMQTempQueue(ConnectionId connectionId, long sequenceId) {
-        super(connectionId.getValue(), sequenceId);
+        super(new PBActiveMQDestination().setType(DestinationType.TEMP_QUEUE).setName(connectionId+":"+sequenceId));
+    }
+
+    public ActiveMQTempQueue(PBActiveMQDestination destination) {
+        super(destination);
     }
 
     public byte getDataStructureType() {

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java Wed Oct 15 12:12:26 2008
@@ -19,6 +19,8 @@
 import javax.jms.JMSException;
 import javax.jms.TemporaryTopic;
 
+import org.apache.activemq.command.PBActiveMQDestination.DestinationType;
+
 /**
  * @openwire:marshaller code="103"
  * @version $Revision: 1.6 $
@@ -29,14 +31,19 @@
     private static final long serialVersionUID = -4325596784597300253L;
 
     public ActiveMQTempTopic() {
+        super(new PBActiveMQDestination().setType(DestinationType.TEMP_TOPIC));
     }
 
     public ActiveMQTempTopic(String name) {
-        super(name);
+        super(new PBActiveMQDestination().setType(DestinationType.TEMP_TOPIC).setName(name));
     }
 
     public ActiveMQTempTopic(ConnectionId connectionId, long sequenceId) {
-        super(connectionId.getValue(), sequenceId);
+        super(new PBActiveMQDestination().setType(DestinationType.TEMP_TOPIC).setName(connectionId+":"+sequenceId));
+    }
+
+    public ActiveMQTempTopic(PBActiveMQDestination destination) {
+        super(destination);
     }
 
     public byte getDataStructureType() {

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java Wed Oct 15 12:12:26 2008
@@ -29,6 +29,7 @@
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.PBMessage.PBMessageType;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
@@ -46,6 +47,15 @@
 
     protected String text;
 
+    
+    public ActiveMQTextMessage() {
+        super(new PBMessage().setMessageType(PBMessageType.TEXT_MESSAGE));
+    }
+
+    public ActiveMQTextMessage(PBMessage message) {
+        super(message);
+    }
+    
     public Message copy() {
         ActiveMQTextMessage copy = new ActiveMQTextMessage();
         copy(copy);
@@ -110,7 +120,7 @@
             OutputStream os = bytesOut;
             ActiveMQConnection connection = getConnection();
             if (connection != null && connection.isUseCompression()) {
-                compressed = true;
+                setCompressed(true);
                 os = new DeflaterOutputStream(os);
             }
             DataOutputStream dataOut = new DataOutputStream(os);
@@ -137,7 +147,7 @@
     }
 
     public int getSize() {
-        if (size == 0 && content == null && text != null) {
+        if (size == 0 && !pb.hasContent() && text != null) {
             size = getMinimumMessageSize();
             if (marshalledProperties != null) {
                 size += marshalledProperties.getLength();

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTopic.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTopic.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTopic.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTopic.java Wed Oct 15 12:12:26 2008
@@ -19,6 +19,8 @@
 import javax.jms.JMSException;
 import javax.jms.Topic;
 
+import org.apache.activemq.command.PBActiveMQDestination.DestinationType;
+
 /**
  * @org.apache.xbean.XBean element="topic" description="An ActiveMQ Topic
  *                         Destination"
@@ -31,10 +33,15 @@
     private static final long serialVersionUID = 7300307405896488588L;
 
     public ActiveMQTopic() {
+        super(new PBActiveMQDestination().setType(DestinationType.TOPIC));
     }
 
     public ActiveMQTopic(String name) {
-        super(name);
+        super(new PBActiveMQDestination().setType(DestinationType.TOPIC).setName(name));
+    }
+
+    public ActiveMQTopic(PBActiveMQDestination destination) {
+        super(destination);
     }
 
     public byte getDataStructureType() {

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java Wed Oct 15 12:12:26 2008
@@ -60,7 +60,7 @@
     }
 
     public String toString() {
-        return IntrospectionSupport.toString(this, BaseCommand.class);
+        return getClass().getSimpleName()+"[\n"+getPBCommand().toString()+"]";
     }
     
     public boolean isWireFormatInfo() {
@@ -121,5 +121,8 @@
         this.to = to;
     }
     
+    public Command getCommandObject() {
+        return this;
+    }
     
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.command;
 
+import java.util.ArrayList;
+
 import org.apache.activemq.state.CommandVisitor;
 
 /**
@@ -28,19 +30,16 @@
  * @version $Revision: 1.7 $
  */
 public class BrokerInfo extends BaseCommand {
+    
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.BROKER_INFO;
-    BrokerId brokerId;
-    String brokerURL;
-    boolean slaveBroker;
-    boolean masterBroker;
-    boolean faultTolerantConfiguration;
-    boolean networkConnection;
-    boolean duplexConnection;
-    BrokerInfo peerBrokerInfos[];
-    String brokerName;
-    long connectionId;
-    String brokerUploadUrl;
-    String networkProperties;
+    PBBrokerInfo pb = new PBBrokerInfo();
+    
+    public BrokerInfo() {
+    }
+    
+    public BrokerInfo(PBBrokerInfo pb) {
+        this.pb = pb;
+    }
 
     public boolean isBrokerInfo() {
         return true;
@@ -54,44 +53,70 @@
      * @openwire:property version=1 cache=true
      */
     public BrokerId getBrokerId() {
-        return brokerId;
+        if( pb.hasBrokerId() ) {
+            return new BrokerId(pb.getBrokerId());
+        }
+        return null;
     }
 
     public void setBrokerId(BrokerId brokerId) {
-        this.brokerId = brokerId;
+        if( brokerId==null ) {
+            pb.clearBrokerId();
+        } else {
+            this.pb.setBrokerId(brokerId.getValue());
+        }
     }
 
     /**
      * @openwire:property version=1
      */
     public String getBrokerURL() {
-        return brokerURL;
+        return pb.getBrokerUrl();
     }
 
     public void setBrokerURL(String brokerURL) {
-        this.brokerURL = brokerURL;
+        this.pb.setBrokerUrl(brokerURL);
     }
 
     /**
      * @openwire:property version=1 testSize=0
      */
     public BrokerInfo[] getPeerBrokerInfos() {
-        return peerBrokerInfos;
+        if( !pb.hasPeerBrokerInfos() ) {
+            return null;
+        }
+        BrokerInfo rc[]= new BrokerInfo[pb.getPeerBrokerInfosCount()];
+        for (int i = 0; i < rc.length; i++) {
+            rc[i] = new BrokerInfo(pb.getPeerBrokerInfos(i));
+        }
+        return rc;
     }
 
     public void setPeerBrokerInfos(BrokerInfo[] peerBrokerInfos) {
-        this.peerBrokerInfos = peerBrokerInfos;
+        if( peerBrokerInfos == null ) {
+            pb.clearPeerBrokerInfos();
+        } else {
+            ArrayList<PBBrokerInfo> t = new ArrayList<PBBrokerInfo>(peerBrokerInfos.length);
+            for (int i = 0; i < peerBrokerInfos.length; i++) {
+                t.add(peerBrokerInfos[i].getPB());
+            }
+            pb.setPeerBrokerInfosList(t);
+        }
+    }
+
+    public PBBrokerInfo getPB() {
+        return pb;
     }
 
     /**
      * @openwire:property version=1
      */
     public String getBrokerName() {
-        return brokerName;
+        return pb.getBrokerName();
     }
 
     public void setBrokerName(String brokerName) {
-        this.brokerName = brokerName;
+        this.pb.setBrokerName(brokerName);
     }
 
     public Response visit(CommandVisitor visitor) throws Exception {
@@ -102,25 +127,25 @@
      * @openwire:property version=1
      */
     public boolean isSlaveBroker() {
-        return slaveBroker;
+        return pb.getSlaveBroker();
     }
 
     public void setSlaveBroker(boolean slaveBroker) {
-        this.slaveBroker = slaveBroker;
+        this.pb.setSlaveBroker(slaveBroker);
     }
 
     /**
      * @openwire:property version=1
      */
     public boolean isMasterBroker() {
-        return masterBroker;
+        return pb.getMasterBroker();
     }
 
     /**
      * @param masterBroker The masterBroker to set.
      */
     public void setMasterBroker(boolean masterBroker) {
-        this.masterBroker = masterBroker;
+        this.pb.setMasterBroker(masterBroker);
     }
 
     /**
@@ -128,14 +153,14 @@
      * @return Returns the faultTolerantConfiguration.
      */
     public boolean isFaultTolerantConfiguration() {
-        return faultTolerantConfiguration;
+        return pb.getFaultTolerantConfiguration();
     }
 
     /**
      * @param faultTolerantConfiguration The faultTolerantConfiguration to set.
      */
     public void setFaultTolerantConfiguration(boolean faultTolerantConfiguration) {
-        this.faultTolerantConfiguration = faultTolerantConfiguration;
+        this.pb.setFaultTolerantConfiguration(faultTolerantConfiguration);
     }
 
     /**
@@ -143,14 +168,14 @@
      * @return the duplexConnection
      */
     public boolean isDuplexConnection() {
-        return this.duplexConnection;
+        return this.pb.getDuplexConnection();
     }
 
     /**
      * @param duplexConnection the duplexConnection to set
      */
     public void setDuplexConnection(boolean duplexConnection) {
-        this.duplexConnection = duplexConnection;
+        this.pb.setDuplexConnection(duplexConnection);
     }
 
     /**
@@ -158,14 +183,14 @@
      * @return the networkConnection
      */
     public boolean isNetworkConnection() {
-        return this.networkConnection;
+        return this.pb.getNetworkConnection();
     }
 
     /**
      * @param networkConnection the networkConnection to set
      */
     public void setNetworkConnection(boolean networkConnection) {
-        this.networkConnection = networkConnection;
+        this.pb.setNetworkConnection(networkConnection);
     }
 
     /**
@@ -174,11 +199,11 @@
      * @openwire:property version=2
      */
     public long getConnectionId() {
-        return connectionId;
+        return pb.getConnectionId();
     }
 
     public void setConnectionId(long connectionId) {
-        this.connectionId = connectionId;
+        this.pb.setConnectionId(connectionId);
     }
 
     /**
@@ -188,11 +213,11 @@
      * @openwire:property version=3
      */
     public String getBrokerUploadUrl() {
-        return brokerUploadUrl;
+        return pb.getBrokerUploadUrl();
     }
 
     public void setBrokerUploadUrl(String brokerUploadUrl) {
-        this.brokerUploadUrl = brokerUploadUrl;
+        this.pb.setBrokerUrl(brokerUploadUrl);
     }
 
     /**
@@ -200,13 +225,17 @@
      * @return the networkProperties
      */
     public String getNetworkProperties() {
-        return this.networkProperties;
+        return this.pb.getNetworkProperties();
     }
 
     /**
      * @param networkProperties the networkProperties to set
      */
     public void setNetworkProperties(String networkProperties) {
-        this.networkProperties = networkProperties;
+        this.pb.setNetworkProperties(networkProperties);
+    }
+
+    public PBBrokerInfo getPBCommand() {
+        return pb;
     }
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Command.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Command.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Command.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Command.java Wed Oct 15 12:12:26 2008
@@ -24,7 +24,7 @@
  * 
  * @version $Revision: 1.7 $
  */
-public interface Command extends DataStructure {
+public interface Command extends DataStructure, ProtocolBufferBacked {
 
     void setCommandId(int value);
 
@@ -70,4 +70,5 @@
     Endpoint getTo();
 
     void setTo(Endpoint to);
+    
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java Wed Oct 15 12:12:26 2008
@@ -26,11 +26,15 @@
  */
 public class ConnectionControl extends BaseCommand {
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONNECTION_CONTROL;
-    protected boolean suspend;
-    protected boolean resume;
-    protected boolean close;
-    protected boolean exit;
-    protected boolean faultTolerant;
+
+    PBConnectionControl pb = new PBConnectionControl();
+    
+    public ConnectionControl() {
+    }
+    
+    public ConnectionControl(PBConnectionControl pb) {
+        this.pb = pb;
+    }
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
@@ -45,14 +49,14 @@
      * @return Returns the close.
      */
     public boolean isClose() {
-        return close;
+        return pb.getClose();
     }
 
     /**
      * @param close The close to set.
      */
     public void setClose(boolean close) {
-        this.close = close;
+        this.pb.setClose(close);
     }
 
     /**
@@ -60,14 +64,14 @@
      * @return Returns the exit.
      */
     public boolean isExit() {
-        return exit;
+        return pb.getExit();
     }
 
     /**
      * @param exit The exit to set.
      */
     public void setExit(boolean exit) {
-        this.exit = exit;
+        this.pb.setExit(exit);
     }
 
     /**
@@ -75,14 +79,14 @@
      * @return Returns the faultTolerant.
      */
     public boolean isFaultTolerant() {
-        return faultTolerant;
+        return pb.getFaultTolerant();
     }
 
     /**
      * @param faultTolerant The faultTolerant to set.
      */
     public void setFaultTolerant(boolean faultTolerant) {
-        this.faultTolerant = faultTolerant;
+        this.pb.setFaultTolerant(faultTolerant);
     }
 
     /**
@@ -90,14 +94,14 @@
      * @return Returns the resume.
      */
     public boolean isResume() {
-        return resume;
+        return pb.getResume();
     }
 
     /**
      * @param resume The resume to set.
      */
     public void setResume(boolean resume) {
-        this.resume = resume;
+        this.pb.setResume(resume);
     }
 
     /**
@@ -105,13 +109,17 @@
      * @return Returns the suspend.
      */
     public boolean isSuspend() {
-        return suspend;
+        return pb.getSuspend();
     }
 
     /**
      * @param suspend The suspend to set.
      */
     public void setSuspend(boolean suspend) {
-        this.suspend = suspend;
+        this.pb.setSuspend(suspend);
+    }
+
+    public PBConnectionControl getPBCommand() {
+        return pb;
     }
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.command;
 
+import org.apache.activemq.pbwire.PBConversionSupport;
 import org.apache.activemq.state.CommandVisitor;
 
 /**
@@ -27,8 +28,14 @@
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONNECTION_ERROR;
 
-    private ConnectionId connectionId;
-    private Throwable exception;
+    PBConnectionError pb = new PBConnectionError();
+
+    public ConnectionError() {
+    }
+    
+    public ConnectionError(PBConnectionError pb) {
+        this.pb=pb;
+    }
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
@@ -42,22 +49,40 @@
      * @openwire:property version=1
      */
     public Throwable getException() {
-        return exception;
+        if( pb.hasException() ) {
+            return PBConversionSupport.convert(pb.getException(), true);
+        }
+        return null;
     }
 
     public void setException(Throwable exception) {
-        this.exception = exception;
+        if( exception == null ) {
+            pb.clearException();
+        } else {
+            this.pb.setException(PBConversionSupport.convert(exception, true));
+        }
     }
 
     /**
      * @openwire:property version=1
      */
     public ConnectionId getConnectionId() {
-        return connectionId;
+        if( pb.hasConnectionId() ) {
+            return new ConnectionId(pb.getConnectionId());
+        }
+        return null;
     }
 
     public void setConnectionId(ConnectionId connectionId) {
-        this.connectionId = connectionId;
+        if( connectionId==null ) {
+            pb.clearConnectionId();
+        } else {
+            this.pb.setConnectionId(connectionId.getValue());
+        }
+    }
+
+    public PBConnectionError getPBCommand() {
+        return pb;
     }
 
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.command;
 
+import java.util.ArrayList;
+
+import org.apache.activemq.pbwire.PBConversionSupport;
 import org.apache.activemq.state.CommandVisitor;
 
 /**
@@ -27,21 +30,19 @@
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONNECTION_INFO;
 
-    protected ConnectionId connectionId;
-    protected String clientId;
-    protected String userName;
-    protected String password;
-    protected BrokerId[] brokerPath;
-    protected boolean brokerMasterConnector;
-    protected boolean manageable;
-    protected boolean clientMaster = true;
+    protected PBConnectionInfo pb = new PBConnectionInfo();
+    
     protected transient Object transportContext;
 
     public ConnectionInfo() {
     }
 
     public ConnectionInfo(ConnectionId connectionId) {
-        this.connectionId = connectionId;
+        setConnectionId(connectionId);
+    }
+
+    public ConnectionInfo(PBConnectionInfo pb) {
+        this.pb=pb;
     }
 
     public byte getDataStructureType() {
@@ -50,34 +51,36 @@
 
     public void copy(ConnectionInfo copy) {
         super.copy(copy);
-        copy.clientId = clientId;
-        copy.userName = userName;
-        copy.password = password;
-        copy.brokerPath = brokerPath;
-        copy.brokerMasterConnector = brokerMasterConnector;
-        copy.manageable = manageable;
+        pb = copy.pb.clone();
     }
 
     /**
      * @openwire:property version=1 cache=true
      */
     public ConnectionId getConnectionId() {
-        return connectionId;
+        if( pb.hasConnectionId() ) {
+            return new ConnectionId(pb.getConnectionId());
+        }
+        return null;
     }
 
     public void setConnectionId(ConnectionId connectionId) {
-        this.connectionId = connectionId;
+        if( connectionId==null ) {
+            pb.clearConnectionId();
+        } else {
+            this.pb.setConnectionId(connectionId.getValue());
+        }
     }
 
     /**
      * @openwire:property version=1
      */
     public String getClientId() {
-        return clientId;
+        return pb.getClientId();
     }
 
     public void setClientId(String clientId) {
-        this.clientId = clientId;
+        this.pb.setClientId(clientId);
     }
 
     public RemoveInfo createRemoveCommand() {
@@ -90,22 +93,22 @@
      * @openwire:property version=1
      */
     public String getPassword() {
-        return password;
+        return pb.getPassword();
     }
 
     public void setPassword(String password) {
-        this.password = password;
+        this.pb.setPassword(password);
     }
 
     /**
      * @openwire:property version=1
      */
     public String getUserName() {
-        return userName;
+        return pb.getUserName();
     }
 
     public void setUserName(String userName) {
-        this.userName = userName;
+        this.pb.setUserName(userName);
     }
 
     /**
@@ -114,13 +117,23 @@
      * @openwire:property version=1 cache=true
      */
     public BrokerId[] getBrokerPath() {
-        return brokerPath;
+        BrokerId rc[]=null;
+        if( pb.hasBrokerPath() ) {
+            rc = PBConversionSupport.convertBrokerIdList(pb.getBrokerPathList());
+        }
+        return rc;
     }
 
     public void setBrokerPath(BrokerId[] brokerPath) {
-        this.brokerPath = brokerPath;
+        if( brokerPath!=null ) {
+            ArrayList<String> rc = PBConversionSupport.convertBrokerIdList(brokerPath);
+            pb.setBrokerPathList(rc);
+        } else {
+            pb.clearBrokerPath();
+        }
     }
 
+
     public Response visit(CommandVisitor visitor) throws Exception {
         return visitor.processAddConnection(this);
     }
@@ -129,28 +142,28 @@
      * @openwire:property version=1
      */
     public boolean isBrokerMasterConnector() {
-        return brokerMasterConnector;
+        return pb.getBrokerMasterConnector();
     }
 
     /**
      * @param brokerMasterConnector The brokerMasterConnector to set.
      */
-    public void setBrokerMasterConnector(boolean slaveBroker) {
-        this.brokerMasterConnector = slaveBroker;
+    public void setBrokerMasterConnector(boolean brokerMasterConnector) {
+        this.pb.setBrokerMasterConnector(brokerMasterConnector);
     }
 
     /**
      * @openwire:property version=1
      */
     public boolean isManageable() {
-        return manageable;
+        return pb.getManageable();
     }
 
     /**
      * @param manageable The manageable to set.
      */
     public void setManageable(boolean manageable) {
-        this.manageable = manageable;
+        this.pb.setManageable(manageable);
     }
 
     /**
@@ -180,14 +193,18 @@
      * @return the clientMaster
      */
     public boolean isClientMaster() {
-        return this.clientMaster;
+        return this.pb.getClientMaster();
     }
 
     /**
      * @param clientMaster the clientMaster to set
      */
     public void setClientMaster(boolean clientMaster) {
-        this.clientMaster = clientMaster;
+        this.pb.setClientMaster(clientMaster);
+    }
+
+    public PBConnectionInfo getPBCommand() {
+        return pb;
     }
 
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java Wed Oct 15 12:12:26 2008
@@ -28,12 +28,7 @@
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_CONTROL;
 
-    protected ConsumerId consumerId;
-    protected boolean close;
-    protected boolean stop;
-    protected boolean start;
-    protected boolean flush;
-    protected int prefetch;
+    protected PBConsumerControl pb = new PBConsumerControl();
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
@@ -48,14 +43,14 @@
      * @return Returns the close.
      */
     public boolean isClose() {
-        return close;
+        return pb.getClose();
     }
 
     /**
      * @param close The close to set.
      */
     public void setClose(boolean close) {
-        this.close = close;
+        this.pb.setClose(close);
     }
 
     /**
@@ -63,14 +58,21 @@
      * @return Returns the consumerId.
      */
     public ConsumerId getConsumerId() {
-        return consumerId;
+        if( pb.hasConsumerId() ) {
+            return new ConsumerId(pb.getConsumerId());
+        }
+        return null;
     }
 
     /**
      * @param consumerId The consumerId to set.
      */
     public void setConsumerId(ConsumerId consumerId) {
-        this.consumerId = consumerId;
+        if( consumerId==null ) {
+            pb.clearConsumerId();
+        } else {
+            this.pb.setConsumerId(consumerId.getPB());
+        }
     }
 
     /**
@@ -78,14 +80,14 @@
      * @return Returns the prefetch.
      */
     public int getPrefetch() {
-        return prefetch;
+        return pb.getPrefetch();
     }
 
     /**
      * @param prefetch The prefetch to set.
      */
     public void setPrefetch(int prefetch) {
-        this.prefetch = prefetch;
+        this.pb.setPrefetch(prefetch);
     }
 
     /**
@@ -93,14 +95,14 @@
      * @return the flush
      */
     public boolean isFlush() {
-        return this.flush;
+        return this.pb.getFlush();
     }
 
     /**
      * @param flush the flush to set
      */
     public void setFlush(boolean flush) {
-        this.flush = flush;
+        this.pb.setFlush(flush);
     }
 
     /**
@@ -108,14 +110,14 @@
      * @return the start
      */
     public boolean isStart() {
-        return this.start;
+        return this.pb.getStart();
     }
 
     /**
      * @param start the start to set
      */
     public void setStart(boolean start) {
-        this.start = start;
+        this.pb.setStart(start);
     }
 
     /**
@@ -123,13 +125,17 @@
      * @return the stop
      */
     public boolean isStop() {
-        return this.stop;
+        return this.pb.getStop();
     }
 
     /**
      * @param stop the stop to set
      */
     public void setStop(boolean stop) {
-        this.stop = stop;
+        this.pb.setStop(stop);
+    }
+
+    public PBConsumerControl getPBCommand() {
+        return pb;
     }
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerId.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerId.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerId.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.command;
 
+import org.apache.activemq.pbwire.PBCommand;
+import org.apache.activemq.protobuf.BaseMessage;
+
 /**
  * @openwire:marshaller code="122"
  * @version $Revision$
@@ -24,27 +27,30 @@
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_ID;
 
-    protected String connectionId;
-    protected long sessionId;
-    protected long value;
-
     protected transient int hashCode;
     protected transient String key;
     protected transient SessionId parentId;
 
+	private PBConsumerId pb = new PBConsumerId();
+
     public ConsumerId() {
     }
 
     public ConsumerId(SessionId sessionId, long consumerId) {
-        this.connectionId = sessionId.getConnectionId();
-        this.sessionId = sessionId.getValue();
-        this.value = consumerId;
+    	setSessionId(sessionId);
+    	setValue(consumerId);
+    }
+
+    public void setSessionId(SessionId sessionId) {
+        pb.setSessionId(sessionId.getPB());
+    }
+
+    public ConsumerId(PBConsumerId id) {
+    	this.pb = id;
     }
 
     public ConsumerId(ConsumerId id) {
-        this.connectionId = id.getConnectionId();
-        this.sessionId = id.getSessionId();
-        this.value = id.getValue();
+    	this.pb = id.getPB().clone();
     }
 
     public SessionId getParentId() {
@@ -56,7 +62,7 @@
 
     public int hashCode() {
         if (hashCode == 0) {
-            hashCode = connectionId.hashCode() ^ (int)sessionId ^ (int)value;
+            hashCode = pb.hashCode();
         }
         return hashCode;
     }
@@ -69,7 +75,7 @@
             return false;
         }
         ConsumerId id = (ConsumerId)o;
-        return sessionId == id.sessionId && value == id.value && connectionId.equals(id.connectionId);
+        return pb.equals(id.pb);
     }
 
     public byte getDataStructureType() {
@@ -78,7 +84,7 @@
 
     public String toString() {
         if (key == null) {
-            key = connectionId + ":" + sessionId + ":" + value;
+            key = getConnectionId() + ":" + getSessionId() + ":" + getValue();
         }
         return key;
     }
@@ -87,36 +93,40 @@
      * @openwire:property version=1
      */
     public String getConnectionId() {
-        return connectionId;
+        return pb.getSessionId().getConnectionId();
     }
 
     public void setConnectionId(String connectionId) {
-        this.connectionId = connectionId;
+        this.pb.getSessionId().setConnectionId(connectionId);
     }
 
     /**
      * @openwire:property version=1
      */
     public long getSessionId() {
-        return sessionId;
+        return pb.getSessionId().getId();
     }
 
     public void setSessionId(long sessionId) {
-        this.sessionId = sessionId;
+        this.pb.getSessionId().setId(sessionId);
     }
 
     /**
      * @openwire:property version=1
      */
     public long getValue() {
-        return value;
+        return this.pb.getId();
     }
 
     public void setValue(long consumerId) {
-        this.value = consumerId;
+    	this.pb.setId(consumerId);
     }
 
     public boolean isMarshallAware() {
         return false;
     }
+
+	public PBConsumerId getPB() {
+		return pb;
+	}
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java Wed Oct 15 12:12:26 2008
@@ -20,6 +20,7 @@
 import java.util.List;
 
 import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.pbwire.PBConversionSupport;
 import org.apache.activemq.state.CommandVisitor;
 
 /**
@@ -35,30 +36,16 @@
     public static final byte NETWORK_CONSUMER_PRIORITY = -5;
     public static final byte LOW_PRIORITY = -10;
 
-    protected ConsumerId consumerId;
-    protected ActiveMQDestination destination;
-    protected int prefetchSize;
-    protected int maximumPendingMessageLimit;
-    protected boolean browser;
-    protected boolean dispatchAsync;
-    protected String selector;
-    protected String subscriptionName;
-    protected boolean noLocal;
-    protected boolean exclusive;
-    protected boolean retroactive;
-    protected byte priority;
-    protected BrokerId[] brokerPath;
-    protected boolean optimizedAcknowledge;
+    PBConsumerInfo pb = new PBConsumerInfo(); 
+    
     // used by the broker
     protected transient int currentPrefetchSize;
-    // if true, the consumer will not send range
-    protected boolean noRangeAcks;
-    // acks.
-
     protected BooleanExpression additionalPredicate;
     protected transient boolean networkSubscription; // this subscription
     protected transient List<ConsumerId> networkConsumerIds; // the original consumerId
 
+    private ActiveMQDestination destination;
+
     // originated from a
     // network connection
 
@@ -66,11 +53,15 @@
     }
 
     public ConsumerInfo(ConsumerId consumerId) {
-        this.consumerId = consumerId;
+        this.setConsumerId(consumerId);
     }
 
     public ConsumerInfo(SessionInfo sessionInfo, long consumerId) {
-        this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId);
+        this(new ConsumerId(sessionInfo.getSessionId(), consumerId));
+    }
+
+    public ConsumerInfo(PBConsumerInfo pb) {
+        this.pb=pb;
     }
 
     public ConsumerInfo copy() {
@@ -81,19 +72,7 @@
 
     public void copy(ConsumerInfo info) {
         super.copy(info);
-        info.consumerId = consumerId;
-        info.destination = destination;
-        info.prefetchSize = prefetchSize;
-        info.maximumPendingMessageLimit = maximumPendingMessageLimit;
-        info.browser = browser;
-        info.dispatchAsync = dispatchAsync;
-        info.selector = selector;
-        info.subscriptionName = subscriptionName;
-        info.noLocal = noLocal;
-        info.exclusive = exclusive;
-        info.retroactive = retroactive;
-        info.priority = priority;
-        info.brokerPath = brokerPath;
+        pb = info.pb.clone();
         info.networkSubscription = networkSubscription;
         if (networkConsumerIds != null) {
             if (info.networkConsumerIds==null){
@@ -104,7 +83,7 @@
     }
 
     public boolean isDurable() {
-        return subscriptionName != null;
+        return getSubscriptionName() != null;
     }
 
     public byte getDataStructureType() {
@@ -117,11 +96,18 @@
      * @openwire:property version=1 cache=true
      */
     public ConsumerId getConsumerId() {
-        return consumerId;
+        if( pb.hasConsumerId() ) {
+            return new ConsumerId(pb.getConsumerId());
+        }
+        return null;
     }
 
     public void setConsumerId(ConsumerId consumerId) {
-        this.consumerId = consumerId;
+        if( consumerId==null ) {
+            pb.clearConsumerId();
+        } else {
+            this.pb.setConsumerId(consumerId.getPB());
+        }
     }
 
     /**
@@ -130,11 +116,11 @@
      * @openwire:property version=1
      */
     public boolean isBrowser() {
-        return browser;
+        return pb.getBrowser();
     }
 
     public void setBrowser(boolean browser) {
-        this.browser = browser;
+        this.pb.setBrowser(browser);
     }
 
     /**
@@ -144,11 +130,19 @@
      * @openwire:property version=1 cache=true
      */
     public ActiveMQDestination getDestination() {
+        if( pb.hasDestination() ) {
+            if( destination==null ) {
+                destination = PBConversionSupport.convert(pb.getDestination());
+            }
+        } else {
+            destination = null;
+        }
         return destination;
     }
 
     public void setDestination(ActiveMQDestination destination) {
         this.destination = destination;
+        this.pb.setDestination(destination.getPB());
     }
 
     /**
@@ -158,11 +152,11 @@
      * @openwire:property version=1
      */
     public int getPrefetchSize() {
-        return prefetchSize;
+        return pb.getPrefetchSize();
     }
 
     public void setPrefetchSize(int prefetchSize) {
-        this.prefetchSize = prefetchSize;
+        this.pb.setPrefetchSize(prefetchSize);
         this.currentPrefetchSize = prefetchSize;
     }
 
@@ -173,11 +167,11 @@
      * @openwire:property version=1
      */
     public int getMaximumPendingMessageLimit() {
-        return maximumPendingMessageLimit;
+        return pb.getMaximumPendingMessageLimit();
     }
 
     public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
-        this.maximumPendingMessageLimit = maximumPendingMessageLimit;
+        this.pb.setMaximumPendingMessageLimit(maximumPendingMessageLimit);
     }
 
     /**
@@ -190,11 +184,11 @@
      * @openwire:property version=1
      */
     public boolean isDispatchAsync() {
-        return dispatchAsync;
+        return pb.getDispatchAsync();
     }
 
     public void setDispatchAsync(boolean dispatchAsync) {
-        this.dispatchAsync = dispatchAsync;
+        this.pb.setDispatchAsync(dispatchAsync);
     }
 
     /**
@@ -204,11 +198,11 @@
      * @openwire:property version=1
      */
     public String getSelector() {
-        return selector;
+        return pb.getSelector();
     }
 
     public void setSelector(String selector) {
-        this.selector = selector;
+        this.pb.setSelector(selector);
     }
 
     /**
@@ -217,11 +211,11 @@
      * @openwire:property version=1
      */
     public String getSubscriptionName() {
-        return subscriptionName;
+        return pb.getSubscriptionName();
     }
 
-    public void setSubscriptionName(String durableSubscriptionId) {
-        this.subscriptionName = durableSubscriptionId;
+    public void setSubscriptionName(String subscriptionName) {
+        this.pb.setSubscriptionName(subscriptionName);
     }
 
     /**
@@ -230,7 +224,7 @@
      * @see getSubscriptionName
      */
     public String getSubcriptionName() {
-        return subscriptionName;
+        return getSubscriptionName();
     }
 
     /**
@@ -238,8 +232,8 @@
      * @see setSubscriptionName
      * @param durableSubscriptionId
      */
-    public void setSubcriptionName(String durableSubscriptionId) {
-        this.subscriptionName = durableSubscriptionId;
+    public void setSubcriptionName(String subscriptionName) {
+        setSubscriptionName(subscriptionName);
     }
 
     /**
@@ -249,11 +243,11 @@
      * @openwire:property version=1
      */
     public boolean isNoLocal() {
-        return noLocal;
+        return pb.getNoLocal();
     }
 
     public void setNoLocal(boolean noLocal) {
-        this.noLocal = noLocal;
+        this.pb.setNoLocal(noLocal);
     }
 
     /**
@@ -265,11 +259,11 @@
      * @openwire:property version=1
      */
     public boolean isExclusive() {
-        return exclusive;
+        return pb.getExclusive();
     }
 
     public void setExclusive(boolean exclusive) {
-        this.exclusive = exclusive;
+        this.pb.setExclusive(exclusive);
     }
 
     /**
@@ -283,11 +277,11 @@
      * @openwire:property version=1
      */
     public boolean isRetroactive() {
-        return retroactive;
+        return pb.getRetroactive();
     }
 
     public void setRetroactive(boolean retroactive) {
-        this.retroactive = retroactive;
+        this.pb.setRetroactive(retroactive);
     }
 
     public RemoveInfo createRemoveCommand() {
@@ -305,11 +299,11 @@
      * @openwire:property version=1
      */
     public byte getPriority() {
-        return priority;
+        return (byte) pb.getPriority();
     }
 
     public void setPriority(byte priority) {
-        this.priority = priority;
+        this.pb.setPriority(priority);
     }
 
     /**
@@ -318,11 +312,20 @@
      * @openwire:property version=1 cache=true
      */
     public BrokerId[] getBrokerPath() {
-        return brokerPath;
+        BrokerId rc[]=null;
+        if( pb.hasBrokerPath() ) {
+            rc = PBConversionSupport.convertBrokerIdList(pb.getBrokerPathList());
+        }
+        return rc;
     }
 
     public void setBrokerPath(BrokerId[] brokerPath) {
-        this.brokerPath = brokerPath;
+        if( brokerPath!=null ) {
+            ArrayList<String> rc = PBConversionSupport.convertBrokerIdList(brokerPath);
+            pb.setBrokerPathList(rc);
+        } else {
+            pb.clearBrokerPath();
+        }
     }
 
     /**
@@ -365,14 +368,14 @@
      * @return Returns the optimizedAcknowledge.
      */
     public boolean isOptimizedAcknowledge() {
-        return optimizedAcknowledge;
+        return pb.getOptimizedAcknowledge();
     }
 
     /**
      * @param optimizedAcknowledge The optimizedAcknowledge to set.
      */
     public void setOptimizedAcknowledge(boolean optimizedAcknowledge) {
-        this.optimizedAcknowledge = optimizedAcknowledge;
+        this.pb.setOptimizedAcknowledge(optimizedAcknowledge);
     }
 
     /**
@@ -397,11 +400,11 @@
      * @openwire:property version=1
      */
     public boolean isNoRangeAcks() {
-        return noRangeAcks;
+        return pb.getNoRangeAcks();
     }
 
     public void setNoRangeAcks(boolean noRangeAcks) {
-        this.noRangeAcks = noRangeAcks;
+        this.pb.setNoRangeAcks(noRangeAcks);
     }
 
     public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) {
@@ -432,4 +435,8 @@
         return result;
     }
 
+    public PBConsumerInfo getPBCommand() {
+        return pb;
+    }
+
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java Wed Oct 15 12:12:26 2008
@@ -29,7 +29,7 @@
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONTROL_COMMAND;
 
-    private String command;
+    private PBControlCommand pb;
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
@@ -39,14 +39,18 @@
      * @openwire:property version=1
      */
     public String getCommand() {
-        return command;
+        return pb.getCommand();
     }
 
     public void setCommand(String command) {
-        this.command = command;
+        this.pb.setCommand(command);
     }
 
     public Response visit(CommandVisitor visitor) throws Exception {
         return visitor.processControlCommand(this);
     }
+
+    public PBControlCommand getPBCommand() {
+        return pb;
+    }
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DataStructure.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DataStructure.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DataStructure.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DataStructure.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.command;
 
+
 /**
  * @version $Revision$
  */
@@ -26,5 +27,5 @@
      */
     byte getDataStructureType();
     boolean isMarshallAware();
-    
+        
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java Wed Oct 15 12:12:26 2008
@@ -17,7 +17,10 @@
 package org.apache.activemq.command;
 
 import java.io.IOException;
+import java.util.ArrayList;
 
+import org.apache.activemq.command.PBDestinationInfo.PBDestinationOperationType;
+import org.apache.activemq.pbwire.PBConversionSupport;
 import org.apache.activemq.state.CommandVisitor;
 
 /**
@@ -33,19 +36,21 @@
     public static final byte ADD_OPERATION_TYPE = 0;
     public static final byte REMOVE_OPERATION_TYPE = 1;
 
-    protected ConnectionId connectionId;
-    protected ActiveMQDestination destination;
-    protected byte operationType;
-    protected long timeout;
-    protected BrokerId[] brokerPath;
+    protected PBDestinationInfo pb = new PBDestinationInfo();
+
+    private ActiveMQDestination destination;
 
     public DestinationInfo() {
     }
 
     public DestinationInfo(ConnectionId connectionId, byte operationType, ActiveMQDestination destination) {
-        this.connectionId = connectionId;
-        this.operationType = operationType;
-        this.destination = destination;
+        setConnectionId(connectionId);
+        setOperationType(operationType);
+        setDestination(destination);
+    }
+
+    public DestinationInfo(PBDestinationInfo pb) {
+        this.pb=pb;
     }
 
     public byte getDataStructureType() {
@@ -53,32 +58,47 @@
     }
 
     public boolean isAddOperation() {
-        return operationType == ADD_OPERATION_TYPE;
+        return pb.getOperationType() == PBDestinationInfo.PBDestinationOperationType.ADD_OPERATION_TYPE;
     }
 
     public boolean isRemoveOperation() {
-        return operationType == REMOVE_OPERATION_TYPE;
+        return pb.getOperationType() == PBDestinationInfo.PBDestinationOperationType.REMOVE_OPERATION_TYPE;
     }
 
     /**
      * @openwire:property version=1 cache=true
      */
     public ConnectionId getConnectionId() {
-        return connectionId;
+        if( pb.hasConnectionId() ) {
+            return new ConnectionId(pb.getConnectionId());
+        }
+        return null;
     }
 
     public void setConnectionId(ConnectionId connectionId) {
-        this.connectionId = connectionId;
+        if( connectionId == null ) {
+            pb.clearConnectionId();
+        } else {
+            this.pb.setConnectionId(connectionId.getValue());
+        }
     }
 
     /**
      * @openwire:property version=1 cache=true
      */
     public ActiveMQDestination getDestination() {
+        if( pb.hasDestination() ) {
+            if( destination==null ) {
+                destination = PBConversionSupport.convert(pb.getDestination());
+            }
+        } else {
+            destination = null;
+        }
         return destination;
     }
 
     public void setDestination(ActiveMQDestination destination) {
+        this.pb.setDestination(destination.getPB());
         this.destination = destination;
     }
 
@@ -86,22 +106,22 @@
      * @openwire:property version=1
      */
     public byte getOperationType() {
-        return operationType;
+        return (byte) pb.getOperationType().getNumber();
     }
 
     public void setOperationType(byte operationType) {
-        this.operationType = operationType;
+        this.pb.setOperationType(PBDestinationOperationType.valueOf(operationType));
     }
 
     /**
      * @openwire:property version=1
      */
     public long getTimeout() {
-        return timeout;
+        return pb.getTimeout();
     }
 
     public void setTimeout(long timeout) {
-        this.timeout = timeout;
+        this.pb.setTimeout(timeout);
     }
 
     /**
@@ -110,11 +130,20 @@
      * @openwire:property version=1 cache=true
      */
     public BrokerId[] getBrokerPath() {
-        return brokerPath;
+        BrokerId rc[]=null;
+        if( pb.hasBrokerPath() ) {
+            rc = PBConversionSupport.convertBrokerIdList(pb.getBrokerPathList());
+        }
+        return rc;
     }
 
     public void setBrokerPath(BrokerId[] brokerPath) {
-        this.brokerPath = brokerPath;
+        if( brokerPath!=null ) {
+            ArrayList<String> rc = PBConversionSupport.convertBrokerIdList(brokerPath);
+            pb.setBrokerPathList(rc);
+        } else {
+            pb.clearBrokerPath();
+        }
     }
 
     public Response visit(CommandVisitor visitor) throws Exception {
@@ -126,4 +155,8 @@
         throw new IOException("Unknown operation type: " + getOperationType());
     }
 
+    public PBDestinationInfo getPBCommand() {
+        return pb;
+    }
+
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/FlushCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/FlushCommand.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/FlushCommand.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/FlushCommand.java Wed Oct 15 12:12:26 2008
@@ -29,6 +29,8 @@
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.FLUSH_COMMAND;
     public static final Command COMMAND = new FlushCommand();
 
+    protected PBFlushCommand pb = new PBFlushCommand();
+    
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
@@ -37,4 +39,8 @@
         return visitor.processFlush(this);
     }
 
+    public PBFlushCommand getPBCommand() {
+        return pb;
+    }
+
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalQueueAck.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalQueueAck.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalQueueAck.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalQueueAck.java Wed Oct 15 12:12:26 2008
@@ -16,18 +16,26 @@
  */
 package org.apache.activemq.command;
 
+import org.apache.activemq.pbwire.PBConversionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
 
 /**
  * @openwire:marshaller code="52"
  * @version $Revision$
  */
-public class JournalQueueAck implements DataStructure {
+public class JournalQueueAck implements DataStructure, ProtocolBufferBacked {
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.JOURNAL_REMOVE;
 
-    ActiveMQDestination destination;
-    MessageAck messageAck;
+    final PBJournalQueueAck pb;
+    
+    public JournalQueueAck(){
+        this(new PBJournalQueueAck());
+    }
+
+    public JournalQueueAck(PBJournalQueueAck pb) {
+        this.pb=pb;
+    }
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
@@ -37,22 +45,36 @@
      * @openwire:property version=1
      */
     public ActiveMQDestination getDestination() {
-        return destination;
+        if( pb.hasDestination() ) {
+            return PBConversionSupport.convert(pb.getDestination());
+        }
+        return null;
     }
 
     public void setDestination(ActiveMQDestination destination) {
-        this.destination = destination;
+        if( destination==null ) {
+            pb.clearDestination();
+        } else {
+            pb.setDestination(destination.getPB());
+        }
     }
 
     /**
      * @openwire:property version=1
      */
     public MessageAck getMessageAck() {
-        return messageAck;
+        if( pb.hasMessageAck() ) {
+            return new MessageAck(pb.getMessageAck());
+        }
+        return null;
     }
 
     public void setMessageAck(MessageAck messageAck) {
-        this.messageAck = messageAck;
+        if( messageAck==null ) {
+            pb.clearMessageAck();
+        } else {
+            pb.setMessageAck(messageAck.getPBCommand());
+        }
     }
 
     public boolean isMarshallAware() {
@@ -60,7 +82,15 @@
     }
 
     public String toString() {
-        return IntrospectionSupport.toString(this, JournalQueueAck.class);
+        return getClass().getSimpleName()+"[\n"+getPBCommand().toString()+"]";
+    }
+
+    public Command getCommandObject() {
+        return null;
+    }
+
+    public PBJournalQueueAck getPBCommand() {
+        return pb;
     }
 
 }