You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/07 18:23:12 UTC

svn commit: r515654 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/openwire/v3/ main/java/org/apache/activemq/state/ test/java/org/apache/activemq/openw...

Author: chirino
Date: Wed Mar  7 09:23:07 2007
New Revision: 515654

URL: http://svn.apache.org/viewvc?view=rev&rev=515654
Log:
Added a new windowSize field to the ProducerInfo command and added a new ProducerAck command.  These will be needed to implement better producer flow control
where threads do not block on the broker side.


Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerAckMarshaller.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerAckTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MarshallerFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerInfoMarshaller.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerInfoTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Mar  7 09:23:07 2007
@@ -51,6 +51,7 @@
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -685,6 +686,12 @@
         return null;
     }
 
+    
+    public Response processProducerAck(ProducerAck ack) throws Exception {
+		// A broker should not get ProducerAck messages.
+		return null;
+	}    
+
     public Connector getConnector(){
         return connector;
     }
@@ -1150,5 +1157,6 @@
 			log.debug("Could not stop transport: "+e,e);
 		}
     	}
-	}    
+	}
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java Wed Mar  7 09:23:07 2007
@@ -57,6 +57,7 @@
     // and the server.
     //
     ///////////////////////////////////////////////////    
+    byte  PRODUCER_ACK                      = 19;
     byte  MESSAGE_PULL                      = 20;
     byte  MESSAGE_DISPATCH                  = 21;
     byte  MESSAGE_ACK                       = 22;

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java?view=auto&rev=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java Wed Mar  7 09:23:07 2007
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+
+/**
+ * A ProducerAck command is sent by a broker to a producer to let it know it has received and processed 
+ * messages that it has produced.  The producer will be flow controlled if it does not receive 
+ * ProducerAck commands back from the broker.
+ * 
+ * @openwire:marshaller code="19" version="3"
+ * @version $Revision: 1.11 $
+ */
+public class ProducerAck extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE=CommandTypes.PRODUCER_ACK;
+    
+    protected ProducerId producerId;
+    protected int size;
+    
+    public ProducerAck() {
+    }
+    
+    public void copy(ProducerAck copy) {
+        super.copy(copy);
+        copy.producerId = producerId;
+        copy.size = size;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+    
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processProducerAck( this );
+    }
+
+    /**
+     * The producer id that this ack message is destined for.
+     * 
+     * @openwire:property version=3
+     */
+	public ProducerId getProducerId() {
+		return producerId;
+	}
+
+	public void setProducerId(ProducerId producerId) {
+		this.producerId = producerId;
+	}
+
+    /**
+     * The number of bytes that are being acked.
+     * 
+     * @openwire:property version=3
+     */
+	public int getSize() {
+		return size;
+	}
+
+	public void setSize(int size) {
+		this.size = size;
+	}
+
+
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java Wed Mar  7 09:23:07 2007
@@ -32,6 +32,7 @@
     protected ActiveMQDestination destination;
     protected BrokerId[] brokerPath;
     protected boolean dispatchAsync;
+    protected int windowSize;
     
     public ProducerInfo() {
     }
@@ -115,6 +116,21 @@
 
 	public void setDispatchAsync(boolean dispatchAsync) {
 		this.dispatchAsync = dispatchAsync;
+	}
+
+    /**
+     * Used to configure the producer window size.  A producer will
+     * send up to the configured window size worth of payload data to
+     * the broker before waiting for an Ack that allows him to send more.
+     * 
+     * @openwire:property version=3
+     */
+	public int getWindowSize() {
+		return windowSize;
+	}
+
+	public void setWindowSize(int windowSize) {
+		this.windowSize = windowSize;
 	}
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MarshallerFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MarshallerFactory.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MarshallerFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MarshallerFactory.java Wed Mar  7 09:23:07 2007
@@ -82,6 +82,7 @@
         add(new MessagePullMarshaller());
         add(new NetworkBridgeFilterMarshaller());
         add(new PartialCommandMarshaller());
+        add(new ProducerAckMarshaller());
         add(new ProducerIdMarshaller());
         add(new ProducerInfoMarshaller());
         add(new RemoveInfoMarshaller());

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerAckMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerAckMarshaller.java?view=auto&rev=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerAckMarshaller.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerAckMarshaller.java Wed Mar  7 09:23:07 2007
@@ -0,0 +1 @@
+/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.activemq.openwire.v3;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.activemq.openwire.*;
import org.apache.activ
 emq.command.*;



/**
 * Marshalling code for Open Wire Format for ProducerAckMarshaller
 *
 *
 * NOTE!: This file is auto generated - do not modify!
 *        if you need to make a change, please see the modify the groovy scripts in the
 *        under src/gram/script and then use maven openwire:generate to regenerate 
 *        this file.
 *
 * @version $Revision$
 */
public class ProducerAckMarshaller extends BaseCommandMarshaller {

    /**
     * Return the type of Data Structure we marshal
     * @return short representation of the type data structure
     */
    public byte getDataStructureType() {
        return ProducerAck.DATA_STRUCTURE_TYPE;
    }
    
    /**
     * @return a new object instance
     */
    public DataStructure createObject() {
        return new ProducerAck();
    }

    /**
     * Un-marshal an object instance from the data input stream
     *
     * @param o the object to un-marshal
     * @param dataIn the data input stream to build the objec
 t from
     * @throws IOException
     */
    public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs) throws IOException {
        super.tightUnmarshal(wireFormat, o, dataIn, bs);

        ProducerAck info = (ProducerAck)o;
        info.setProducerId((org.apache.activemq.command.ProducerId) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
        info.setSize(dataIn.readInt());

    }


    /**
     * Write the booleans that this object uses to a BooleanStream
     */
    public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {

        ProducerAck info = (ProducerAck)o;

        int rc = super.tightMarshal1(wireFormat, o, bs);
        rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getProducerId(), bs);

        return rc + 4;
    }

    /**
     * Write a object instance to data output stream
     *
     * @param o the instance to be marshaled
     * @param dataOut th
 e output stream
     * @throws IOException thrown if an error occurs
     */
    public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutput dataOut, BooleanStream bs) throws IOException {
        super.tightMarshal2(wireFormat, o, dataOut, bs);

        ProducerAck info = (ProducerAck)o;
        tightMarshalNestedObject2(wireFormat, (DataStructure)info.getProducerId(), dataOut, bs);
        dataOut.writeInt(info.getSize());

    }

    /**
     * Un-marshal an object instance from the data input stream
     *
     * @param o the object to un-marshal
     * @param dataIn the data input stream to build the object from
     * @throws IOException
     */
    public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn) throws IOException {
        super.looseUnmarshal(wireFormat, o, dataIn);

        ProducerAck info = (ProducerAck)o;
        info.setProducerId((org.apache.activemq.command.ProducerId) looseUnmarsalNestedObject(wireFormat, dataI
 n));
        info.setSize(dataIn.readInt());

    }


    /**
     * Write the booleans that this object uses to a BooleanStream
     */
    public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut) throws IOException {

        ProducerAck info = (ProducerAck)o;

        super.looseMarshal(wireFormat, o, dataOut);
        looseMarshalNestedObject(wireFormat, (DataStructure)info.getProducerId(), dataOut);
        dataOut.writeInt(info.getSize());

    }
}
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerInfoMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerInfoMarshaller.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerInfoMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerInfoMarshaller.java Wed Mar  7 09:23:07 2007
@@ -1 +1 @@
-/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.activemq.openwire.v3;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.activemq.openwire.*;
import org.apache.activ
 emq.command.*;



/**
 * Marshalling code for Open Wire Format for ProducerInfoMarshaller
 *
 *
 * NOTE!: This file is auto generated - do not modify!
 *        if you need to make a change, please see the modify the groovy scripts in the
 *        under src/gram/script and then use maven openwire:generate to regenerate 
 *        this file.
 *
 * @version $Revision$
 */
public class ProducerInfoMarshaller extends BaseCommandMarshaller {

    /**
     * Return the type of Data Structure we marshal
     * @return short representation of the type data structure
     */
    public byte getDataStructureType() {
        return ProducerInfo.DATA_STRUCTURE_TYPE;
    }
    
    /**
     * @return a new object instance
     */
    public DataStructure createObject() {
        return new ProducerInfo();
    }

    /**
     * Un-marshal an object instance from the data input stream
     *
     * @param o the object to un-marshal
     * @param dataIn the data input stream to build the o
 bject from
     * @throws IOException
     */
    public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs) throws IOException {
        super.tightUnmarshal(wireFormat, o, dataIn, bs);

        ProducerInfo info = (ProducerInfo)o;
        info.setProducerId((org.apache.activemq.command.ProducerId) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
        info.setDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs));

        if (bs.readBoolean()) {
            short size = dataIn.readShort();
            org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size];
            for( int i=0; i < size; i++ ) {
                value[i] = (org.apache.activemq.command.BrokerId) tightUnmarsalNestedObject(wireFormat,dataIn, bs);
            }
            info.setBrokerPath(value);
        }
        else {
            info.setBrokerPath(null);
    
     }
        info.setDispatchAsync(bs.readBoolean());

    }


    /**
     * Write the booleans that this object uses to a BooleanStream
     */
    public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {

        ProducerInfo info = (ProducerInfo)o;

        int rc = super.tightMarshal1(wireFormat, o, bs);
        rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getProducerId(), bs);
        rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getDestination(), bs);
        rc += tightMarshalObjectArray1(wireFormat, info.getBrokerPath(), bs);
        bs.writeBoolean(info.isDispatchAsync());

        return rc + 0;
    }

    /**
     * Write a object instance to data output stream
     *
     * @param o the instance to be marshaled
     * @param dataOut the output stream
     * @throws IOException thrown if an error occurs
     */
    public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataO
 utput dataOut, BooleanStream bs) throws IOException {
        super.tightMarshal2(wireFormat, o, dataOut, bs);

        ProducerInfo info = (ProducerInfo)o;
        tightMarshalCachedObject2(wireFormat, (DataStructure)info.getProducerId(), dataOut, bs);
        tightMarshalCachedObject2(wireFormat, (DataStructure)info.getDestination(), dataOut, bs);
        tightMarshalObjectArray2(wireFormat, info.getBrokerPath(), dataOut, bs);
        bs.readBoolean();

    }

    /**
     * Un-marshal an object instance from the data input stream
     *
     * @param o the object to un-marshal
     * @param dataIn the data input stream to build the object from
     * @throws IOException
     */
    public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn) throws IOException {
        super.looseUnmarshal(wireFormat, o, dataIn);

        ProducerInfo info = (ProducerInfo)o;
        info.setProducerId((org.apache.activemq.command.ProducerId) looseUnmarsalCachedObject
 (wireFormat, dataIn));
        info.setDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn));

        if (dataIn.readBoolean()) {
            short size = dataIn.readShort();
            org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size];
            for( int i=0; i < size; i++ ) {
                value[i] = (org.apache.activemq.command.BrokerId) looseUnmarsalNestedObject(wireFormat,dataIn);
            }
            info.setBrokerPath(value);
        }
        else {
            info.setBrokerPath(null);
        }
        info.setDispatchAsync(dataIn.readBoolean());

    }


    /**
     * Write the booleans that this object uses to a BooleanStream
     */
    public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut) throws IOException {

        ProducerInfo info = (ProducerInfo)o;

        super.looseMarshal(wireFormat, o, dataOut);
        looseMarsh
 alCachedObject(wireFormat, (DataStructure)info.getProducerId(), dataOut);
        looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut);
        looseMarshalObjectArray(wireFormat, info.getBrokerPath(), dataOut);
        dataOut.writeBoolean(info.isDispatchAsync());

    }
}
\ No newline at end of file
+/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.activemq.openwire.v3;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.activemq.openwire.*;
import org.apache.activ
 emq.command.*;



/**
 * Marshalling code for Open Wire Format for ProducerInfoMarshaller
 *
 *
 * NOTE!: This file is auto generated - do not modify!
 *        if you need to make a change, please see the modify the groovy scripts in the
 *        under src/gram/script and then use maven openwire:generate to regenerate 
 *        this file.
 *
 * @version $Revision$
 */
public class ProducerInfoMarshaller extends BaseCommandMarshaller {

    /**
     * Return the type of Data Structure we marshal
     * @return short representation of the type data structure
     */
    public byte getDataStructureType() {
        return ProducerInfo.DATA_STRUCTURE_TYPE;
    }
    
    /**
     * @return a new object instance
     */
    public DataStructure createObject() {
        return new ProducerInfo();
    }

    /**
     * Un-marshal an object instance from the data input stream
     *
     * @param o the object to un-marshal
     * @param dataIn the data input stream to build the o
 bject from
     * @throws IOException
     */
    public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs) throws IOException {
        super.tightUnmarshal(wireFormat, o, dataIn, bs);

        ProducerInfo info = (ProducerInfo)o;
        info.setProducerId((org.apache.activemq.command.ProducerId) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
        info.setDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs));

        if (bs.readBoolean()) {
            short size = dataIn.readShort();
            org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size];
            for( int i=0; i < size; i++ ) {
                value[i] = (org.apache.activemq.command.BrokerId) tightUnmarsalNestedObject(wireFormat,dataIn, bs);
            }
            info.setBrokerPath(value);
        }
        else {
            info.setBrokerPath(null);
    
     }
        info.setDispatchAsync(bs.readBoolean());
        info.setWindowSize(dataIn.readInt());

    }


    /**
     * Write the booleans that this object uses to a BooleanStream
     */
    public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {

        ProducerInfo info = (ProducerInfo)o;

        int rc = super.tightMarshal1(wireFormat, o, bs);
        rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getProducerId(), bs);
        rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getDestination(), bs);
        rc += tightMarshalObjectArray1(wireFormat, info.getBrokerPath(), bs);
        bs.writeBoolean(info.isDispatchAsync());

        return rc + 4;
    }

    /**
     * Write a object instance to data output stream
     *
     * @param o the instance to be marshaled
     * @param dataOut the output stream
     * @throws IOException thrown if an error occurs
     */
    public void tightMarsh
 al2(OpenWireFormat wireFormat, Object o, DataOutput dataOut, BooleanStream bs) throws IOException {
        super.tightMarshal2(wireFormat, o, dataOut, bs);

        ProducerInfo info = (ProducerInfo)o;
        tightMarshalCachedObject2(wireFormat, (DataStructure)info.getProducerId(), dataOut, bs);
        tightMarshalCachedObject2(wireFormat, (DataStructure)info.getDestination(), dataOut, bs);
        tightMarshalObjectArray2(wireFormat, info.getBrokerPath(), dataOut, bs);
        bs.readBoolean();
        dataOut.writeInt(info.getWindowSize());

    }

    /**
     * Un-marshal an object instance from the data input stream
     *
     * @param o the object to un-marshal
     * @param dataIn the data input stream to build the object from
     * @throws IOException
     */
    public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn) throws IOException {
        super.looseUnmarshal(wireFormat, o, dataIn);

        ProducerInfo info = (ProducerInfo)o;
 
        info.setProducerId((org.apache.activemq.command.ProducerId) looseUnmarsalCachedObject(wireFormat, dataIn));
        info.setDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn));

        if (dataIn.readBoolean()) {
            short size = dataIn.readShort();
            org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size];
            for( int i=0; i < size; i++ ) {
                value[i] = (org.apache.activemq.command.BrokerId) looseUnmarsalNestedObject(wireFormat,dataIn);
            }
            info.setBrokerPath(value);
        }
        else {
            info.setBrokerPath(null);
        }
        info.setDispatchAsync(dataIn.readBoolean());
        info.setWindowSize(dataIn.readInt());

    }


    /**
     * Write the booleans that this object uses to a BooleanStream
     */
    public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut
 ) throws IOException {

        ProducerInfo info = (ProducerInfo)o;

        super.looseMarshal(wireFormat, o, dataOut);
        looseMarshalCachedObject(wireFormat, (DataStructure)info.getProducerId(), dataOut);
        looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut);
        looseMarshalObjectArray(wireFormat, info.getBrokerPath(), dataOut);
        dataOut.writeBoolean(info.isDispatchAsync());
        dataOut.writeInt(info.getWindowSize());

    }
}
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java Wed Mar  7 09:23:07 2007
@@ -29,6 +29,7 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -75,6 +76,7 @@
     Response processForgetTransaction(TransactionInfo info) throws Exception;
     Response processEndTransaction(TransactionInfo info) throws Exception;
     Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception;
+	Response processProducerAck(ProducerAck ack) throws Exception;
     
 }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Wed Mar  7 09:23:07 2007
@@ -33,6 +33,7 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -480,6 +481,10 @@
         return null;
     }
 
+    public Response processProducerAck(ProducerAck ack) throws Exception {
+		return null;
+	}
+
     public boolean isRestoreConsumers() {
         return restoreConsumers;
     }
@@ -519,5 +524,6 @@
 	public void setRestoreTransaction(boolean restoreTransaction) {
 		this.restoreTransaction = restoreTransaction;
 	}
+
 
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerAckTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerAckTest.java?view=auto&rev=515654
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerAckTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerAckTest.java Wed Mar  7 09:23:07 2007
@@ -0,0 +1 @@
+/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.openwire.v3;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.activemq.openwire.*;
import org.a
 pache.activemq.command.*;


/**
 * Test case for the OpenWire marshalling for ProducerAck
 *
 *
 * NOTE!: This file is auto generated - do not modify!
 *        if you need to make a change, please see the modify the groovy scripts in the
 *        under src/gram/script and then use maven openwire:generate to regenerate 
 *        this file.
 *
 * @version $Revision: $
 */
public class ProducerAckTest extends BaseCommandTestSupport {


    public static ProducerAckTest SINGLETON = new ProducerAckTest();

    public Object createObject() throws Exception {
        ProducerAck info = new ProducerAck();
        populateObject(info);
        return info;
    }

    protected void populateObject(Object object) throws Exception {
        super.populateObject(object);
        ProducerAck info = (ProducerAck) object;

        info.setProducerId(createProducerId("ProducerId:1"));
        info.setSize(1);
    }
}
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerInfoTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerInfoTest.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerInfoTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerInfoTest.java Wed Mar  7 09:23:07 2007
@@ -1 +1 @@
-/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.openwire.v3;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.activemq.openwire.*;
import org.a
 pache.activemq.command.*;


/**
 * Test case for the OpenWire marshalling for ProducerInfo
 *
 *
 * NOTE!: This file is auto generated - do not modify!
 *        if you need to make a change, please see the modify the groovy scripts in the
 *        under src/gram/script and then use maven openwire:generate to regenerate 
 *        this file.
 *
 * @version $Revision: $
 */
public class ProducerInfoTest extends BaseCommandTestSupport {


    public static ProducerInfoTest SINGLETON = new ProducerInfoTest();

    public Object createObject() throws Exception {
        ProducerInfo info = new ProducerInfo();
        populateObject(info);
        return info;
    }

    protected void populateObject(Object object) throws Exception {
        super.populateObject(object);
        ProducerInfo info = (ProducerInfo) object;

        info.setProducerId(createProducerId("ProducerId:1"));
        info.setDestination(createActiveMQDestination("Destination:2"));
        {
            Br
 okerId value[] = new BrokerId[2];
            for( int i=0; i < 2; i++ ) {
                value[i] = createBrokerId("BrokerPath:3");
            }
            info.setBrokerPath(value);
        }
        info.setDispatchAsync(true);
    }
}
\ No newline at end of file
+/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.openwire.v3;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.activemq.openwire.*;
import org.a
 pache.activemq.command.*;


/**
 * Test case for the OpenWire marshalling for ProducerInfo
 *
 *
 * NOTE!: This file is auto generated - do not modify!
 *        if you need to make a change, please see the modify the groovy scripts in the
 *        under src/gram/script and then use maven openwire:generate to regenerate 
 *        this file.
 *
 * @version $Revision: $
 */
public class ProducerInfoTest extends BaseCommandTestSupport {


    public static ProducerInfoTest SINGLETON = new ProducerInfoTest();

    public Object createObject() throws Exception {
        ProducerInfo info = new ProducerInfo();
        populateObject(info);
        return info;
    }

    protected void populateObject(Object object) throws Exception {
        super.populateObject(object);
        ProducerInfo info = (ProducerInfo) object;

        info.setProducerId(createProducerId("ProducerId:1"));
        info.setDestination(createActiveMQDestination("Destination:2"));
        {
            Br
 okerId value[] = new BrokerId[2];
            for( int i=0; i < 2; i++ ) {
                value[i] = createBrokerId("BrokerPath:3");
            }
            info.setBrokerPath(value);
        }
        info.setDispatchAsync(true);
        info.setWindowSize(1);
    }
}
\ No newline at end of file