You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/07 18:23:12 UTC
svn commit: r515654 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/command/
main/java/org/apache/activemq/openwire/v3/
main/java/org/apache/activemq/state/ test/java/org/apache/activemq/openw...
Author: chirino
Date: Wed Mar 7 09:23:07 2007
New Revision: 515654
URL: http://svn.apache.org/viewvc?view=rev&rev=515654
Log:
Added a new windowSize field to the ProducerInfo command and added a new ProducerAck command. These will be needed to implement better producer flow control
where threads do not block on the broker side.
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerAckMarshaller.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerAckTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MarshallerFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerInfoMarshaller.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerInfoTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Mar 7 09:23:07 2007
@@ -51,6 +51,7 @@
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -685,6 +686,12 @@
return null;
}
+
+ public Response processProducerAck(ProducerAck ack) throws Exception {
+ // A broker should not get ProducerAck messages.
+ return null;
+ }
+
public Connector getConnector(){
return connector;
}
@@ -1150,5 +1157,6 @@
log.debug("Could not stop transport: "+e,e);
}
}
- }
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java Wed Mar 7 09:23:07 2007
@@ -57,6 +57,7 @@
// and the server.
//
///////////////////////////////////////////////////
+ byte PRODUCER_ACK = 19;
byte MESSAGE_PULL = 20;
byte MESSAGE_DISPATCH = 21;
byte MESSAGE_ACK = 22;
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java?view=auto&rev=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java Wed Mar 7 09:23:07 2007
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+
+/**
+ * A ProducerAck command is sent by a broker to a producer to let it know it has received and processed
+ * messages that it has produced. The producer will be flow controlled if it does not receive
+ * ProducerAck commands back from the broker.
+ *
+ * @openwire:marshaller code="19" version="3"
+ * @version $Revision: 1.11 $
+ */
+public class ProducerAck extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE=CommandTypes.PRODUCER_ACK;
+
+ protected ProducerId producerId;
+ protected int size;
+
+ public ProducerAck() {
+ }
+
+ public void copy(ProducerAck copy) {
+ super.copy(copy);
+ copy.producerId = producerId;
+ copy.size = size;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processProducerAck( this );
+ }
+
+ /**
+ * The producer id that this ack message is destined for.
+ *
+ * @openwire:property version=3
+ */
+ public ProducerId getProducerId() {
+ return producerId;
+ }
+
+ public void setProducerId(ProducerId producerId) {
+ this.producerId = producerId;
+ }
+
+ /**
+ * The number of bytes that are being acked.
+ *
+ * @openwire:property version=3
+ */
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+
+}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java Wed Mar 7 09:23:07 2007
@@ -32,6 +32,7 @@
protected ActiveMQDestination destination;
protected BrokerId[] brokerPath;
protected boolean dispatchAsync;
+ protected int windowSize;
public ProducerInfo() {
}
@@ -115,6 +116,21 @@
public void setDispatchAsync(boolean dispatchAsync) {
this.dispatchAsync = dispatchAsync;
+ }
+
+ /**
+ * Used to configure the producer window size. A producer will
+ * send up to the configured window size worth of payload data to
+ * the broker before waiting for an Ack that allows him to send more.
+ *
+ * @openwire:property version=3
+ */
+ public int getWindowSize() {
+ return windowSize;
+ }
+
+ public void setWindowSize(int windowSize) {
+ this.windowSize = windowSize;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MarshallerFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MarshallerFactory.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MarshallerFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MarshallerFactory.java Wed Mar 7 09:23:07 2007
@@ -82,6 +82,7 @@
add(new MessagePullMarshaller());
add(new NetworkBridgeFilterMarshaller());
add(new PartialCommandMarshaller());
+ add(new ProducerAckMarshaller());
add(new ProducerIdMarshaller());
add(new ProducerInfoMarshaller());
add(new RemoveInfoMarshaller());
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerAckMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerAckMarshaller.java?view=auto&rev=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerAckMarshaller.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerAckMarshaller.java Wed Mar 7 09:23:07 2007
@@ -0,0 +1 @@
+/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.v3;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.openwire.*;
import org.apache.activ
emq.command.*;
/**
* Marshalling code for Open Wire Format for ProducerAckMarshaller
*
*
* NOTE!: This file is auto generated - do not modify!
* if you need to make a change, please see the modify the groovy scripts in the
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
* @version $Revision$
*/
public class ProducerAckMarshaller extends BaseCommandMarshaller {
/**
* Return the type of Data Structure we marshal
* @return short representation of the type data structure
*/
public byte getDataStructureType() {
return ProducerAck.DATA_STRUCTURE_TYPE;
}
/**
* @return a new object instance
*/
public DataStructure createObject() {
return new ProducerAck();
}
/**
* Un-marshal an object instance from the data input stream
*
* @param o the object to un-marshal
* @param dataIn the data input stream to build the objec
t from
* @throws IOException
*/
public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs) throws IOException {
super.tightUnmarshal(wireFormat, o, dataIn, bs);
ProducerAck info = (ProducerAck)o;
info.setProducerId((org.apache.activemq.command.ProducerId) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
info.setSize(dataIn.readInt());
}
/**
* Write the booleans that this object uses to a BooleanStream
*/
public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {
ProducerAck info = (ProducerAck)o;
int rc = super.tightMarshal1(wireFormat, o, bs);
rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getProducerId(), bs);
return rc + 4;
}
/**
* Write a object instance to data output stream
*
* @param o the instance to be marshaled
* @param dataOut th
e output stream
* @throws IOException thrown if an error occurs
*/
public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutput dataOut, BooleanStream bs) throws IOException {
super.tightMarshal2(wireFormat, o, dataOut, bs);
ProducerAck info = (ProducerAck)o;
tightMarshalNestedObject2(wireFormat, (DataStructure)info.getProducerId(), dataOut, bs);
dataOut.writeInt(info.getSize());
}
/**
* Un-marshal an object instance from the data input stream
*
* @param o the object to un-marshal
* @param dataIn the data input stream to build the object from
* @throws IOException
*/
public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn) throws IOException {
super.looseUnmarshal(wireFormat, o, dataIn);
ProducerAck info = (ProducerAck)o;
info.setProducerId((org.apache.activemq.command.ProducerId) looseUnmarsalNestedObject(wireFormat, dataI
n));
info.setSize(dataIn.readInt());
}
/**
* Write the booleans that this object uses to a BooleanStream
*/
public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut) throws IOException {
ProducerAck info = (ProducerAck)o;
super.looseMarshal(wireFormat, o, dataOut);
looseMarshalNestedObject(wireFormat, (DataStructure)info.getProducerId(), dataOut);
dataOut.writeInt(info.getSize());
}
}
\ No newline at end of file
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerInfoMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerInfoMarshaller.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerInfoMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerInfoMarshaller.java Wed Mar 7 09:23:07 2007
@@ -1 +1 @@
-/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.v3;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.openwire.*;
import org.apache.activ
emq.command.*;
/**
* Marshalling code for Open Wire Format for ProducerInfoMarshaller
*
*
* NOTE!: This file is auto generated - do not modify!
* if you need to make a change, please see the modify the groovy scripts in the
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
* @version $Revision$
*/
public class ProducerInfoMarshaller extends BaseCommandMarshaller {
/**
* Return the type of Data Structure we marshal
* @return short representation of the type data structure
*/
public byte getDataStructureType() {
return ProducerInfo.DATA_STRUCTURE_TYPE;
}
/**
* @return a new object instance
*/
public DataStructure createObject() {
return new ProducerInfo();
}
/**
* Un-marshal an object instance from the data input stream
*
* @param o the object to un-marshal
* @param dataIn the data input stream to build the o
bject from
* @throws IOException
*/
public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs) throws IOException {
super.tightUnmarshal(wireFormat, o, dataIn, bs);
ProducerInfo info = (ProducerInfo)o;
info.setProducerId((org.apache.activemq.command.ProducerId) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
info.setDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
if (bs.readBoolean()) {
short size = dataIn.readShort();
org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size];
for( int i=0; i < size; i++ ) {
value[i] = (org.apache.activemq.command.BrokerId) tightUnmarsalNestedObject(wireFormat,dataIn, bs);
}
info.setBrokerPath(value);
}
else {
info.setBrokerPath(null);
}
info.setDispatchAsync(bs.readBoolean());
}
/**
* Write the booleans that this object uses to a BooleanStream
*/
public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {
ProducerInfo info = (ProducerInfo)o;
int rc = super.tightMarshal1(wireFormat, o, bs);
rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getProducerId(), bs);
rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getDestination(), bs);
rc += tightMarshalObjectArray1(wireFormat, info.getBrokerPath(), bs);
bs.writeBoolean(info.isDispatchAsync());
return rc + 0;
}
/**
* Write a object instance to data output stream
*
* @param o the instance to be marshaled
* @param dataOut the output stream
* @throws IOException thrown if an error occurs
*/
public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataO
utput dataOut, BooleanStream bs) throws IOException {
super.tightMarshal2(wireFormat, o, dataOut, bs);
ProducerInfo info = (ProducerInfo)o;
tightMarshalCachedObject2(wireFormat, (DataStructure)info.getProducerId(), dataOut, bs);
tightMarshalCachedObject2(wireFormat, (DataStructure)info.getDestination(), dataOut, bs);
tightMarshalObjectArray2(wireFormat, info.getBrokerPath(), dataOut, bs);
bs.readBoolean();
}
/**
* Un-marshal an object instance from the data input stream
*
* @param o the object to un-marshal
* @param dataIn the data input stream to build the object from
* @throws IOException
*/
public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn) throws IOException {
super.looseUnmarshal(wireFormat, o, dataIn);
ProducerInfo info = (ProducerInfo)o;
info.setProducerId((org.apache.activemq.command.ProducerId) looseUnmarsalCachedObject
(wireFormat, dataIn));
info.setDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn));
if (dataIn.readBoolean()) {
short size = dataIn.readShort();
org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size];
for( int i=0; i < size; i++ ) {
value[i] = (org.apache.activemq.command.BrokerId) looseUnmarsalNestedObject(wireFormat,dataIn);
}
info.setBrokerPath(value);
}
else {
info.setBrokerPath(null);
}
info.setDispatchAsync(dataIn.readBoolean());
}
/**
* Write the booleans that this object uses to a BooleanStream
*/
public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut) throws IOException {
ProducerInfo info = (ProducerInfo)o;
super.looseMarshal(wireFormat, o, dataOut);
looseMarsh
alCachedObject(wireFormat, (DataStructure)info.getProducerId(), dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut);
looseMarshalObjectArray(wireFormat, info.getBrokerPath(), dataOut);
dataOut.writeBoolean(info.isDispatchAsync());
}
}
\ No newline at end of file
+/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.v3;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.openwire.*;
import org.apache.activ
emq.command.*;
/**
* Marshalling code for Open Wire Format for ProducerInfoMarshaller
*
*
* NOTE!: This file is auto generated - do not modify!
* if you need to make a change, please see the modify the groovy scripts in the
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
* @version $Revision$
*/
public class ProducerInfoMarshaller extends BaseCommandMarshaller {
/**
* Return the type of Data Structure we marshal
* @return short representation of the type data structure
*/
public byte getDataStructureType() {
return ProducerInfo.DATA_STRUCTURE_TYPE;
}
/**
* @return a new object instance
*/
public DataStructure createObject() {
return new ProducerInfo();
}
/**
* Un-marshal an object instance from the data input stream
*
* @param o the object to un-marshal
* @param dataIn the data input stream to build the o
bject from
* @throws IOException
*/
public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs) throws IOException {
super.tightUnmarshal(wireFormat, o, dataIn, bs);
ProducerInfo info = (ProducerInfo)o;
info.setProducerId((org.apache.activemq.command.ProducerId) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
info.setDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
if (bs.readBoolean()) {
short size = dataIn.readShort();
org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size];
for( int i=0; i < size; i++ ) {
value[i] = (org.apache.activemq.command.BrokerId) tightUnmarsalNestedObject(wireFormat,dataIn, bs);
}
info.setBrokerPath(value);
}
else {
info.setBrokerPath(null);
}
info.setDispatchAsync(bs.readBoolean());
info.setWindowSize(dataIn.readInt());
}
/**
* Write the booleans that this object uses to a BooleanStream
*/
public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {
ProducerInfo info = (ProducerInfo)o;
int rc = super.tightMarshal1(wireFormat, o, bs);
rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getProducerId(), bs);
rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getDestination(), bs);
rc += tightMarshalObjectArray1(wireFormat, info.getBrokerPath(), bs);
bs.writeBoolean(info.isDispatchAsync());
return rc + 4;
}
/**
* Write a object instance to data output stream
*
* @param o the instance to be marshaled
* @param dataOut the output stream
* @throws IOException thrown if an error occurs
*/
public void tightMarsh
al2(OpenWireFormat wireFormat, Object o, DataOutput dataOut, BooleanStream bs) throws IOException {
super.tightMarshal2(wireFormat, o, dataOut, bs);
ProducerInfo info = (ProducerInfo)o;
tightMarshalCachedObject2(wireFormat, (DataStructure)info.getProducerId(), dataOut, bs);
tightMarshalCachedObject2(wireFormat, (DataStructure)info.getDestination(), dataOut, bs);
tightMarshalObjectArray2(wireFormat, info.getBrokerPath(), dataOut, bs);
bs.readBoolean();
dataOut.writeInt(info.getWindowSize());
}
/**
* Un-marshal an object instance from the data input stream
*
* @param o the object to un-marshal
* @param dataIn the data input stream to build the object from
* @throws IOException
*/
public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn) throws IOException {
super.looseUnmarshal(wireFormat, o, dataIn);
ProducerInfo info = (ProducerInfo)o;
info.setProducerId((org.apache.activemq.command.ProducerId) looseUnmarsalCachedObject(wireFormat, dataIn));
info.setDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn));
if (dataIn.readBoolean()) {
short size = dataIn.readShort();
org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size];
for( int i=0; i < size; i++ ) {
value[i] = (org.apache.activemq.command.BrokerId) looseUnmarsalNestedObject(wireFormat,dataIn);
}
info.setBrokerPath(value);
}
else {
info.setBrokerPath(null);
}
info.setDispatchAsync(dataIn.readBoolean());
info.setWindowSize(dataIn.readInt());
}
/**
* Write the booleans that this object uses to a BooleanStream
*/
public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut
) throws IOException {
ProducerInfo info = (ProducerInfo)o;
super.looseMarshal(wireFormat, o, dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getProducerId(), dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut);
looseMarshalObjectArray(wireFormat, info.getBrokerPath(), dataOut);
dataOut.writeBoolean(info.isDispatchAsync());
dataOut.writeInt(info.getWindowSize());
}
}
\ No newline at end of file
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java Wed Mar 7 09:23:07 2007
@@ -29,6 +29,7 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -75,6 +76,7 @@
Response processForgetTransaction(TransactionInfo info) throws Exception;
Response processEndTransaction(TransactionInfo info) throws Exception;
Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception;
+ Response processProducerAck(ProducerAck ack) throws Exception;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Wed Mar 7 09:23:07 2007
@@ -33,6 +33,7 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -480,6 +481,10 @@
return null;
}
+ public Response processProducerAck(ProducerAck ack) throws Exception {
+ return null;
+ }
+
public boolean isRestoreConsumers() {
return restoreConsumers;
}
@@ -519,5 +524,6 @@
public void setRestoreTransaction(boolean restoreTransaction) {
this.restoreTransaction = restoreTransaction;
}
+
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerAckTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerAckTest.java?view=auto&rev=515654
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerAckTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerAckTest.java Wed Mar 7 09:23:07 2007
@@ -0,0 +1 @@
+/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.v3;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.openwire.*;
import org.a
pache.activemq.command.*;
/**
* Test case for the OpenWire marshalling for ProducerAck
*
*
* NOTE!: This file is auto generated - do not modify!
* if you need to make a change, please see the modify the groovy scripts in the
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
* @version $Revision: $
*/
public class ProducerAckTest extends BaseCommandTestSupport {
public static ProducerAckTest SINGLETON = new ProducerAckTest();
public Object createObject() throws Exception {
ProducerAck info = new ProducerAck();
populateObject(info);
return info;
}
protected void populateObject(Object object) throws Exception {
super.populateObject(object);
ProducerAck info = (ProducerAck) object;
info.setProducerId(createProducerId("ProducerId:1"));
info.setSize(1);
}
}
\ No newline at end of file
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerInfoTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerInfoTest.java?view=diff&rev=515654&r1=515653&r2=515654
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerInfoTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerInfoTest.java Wed Mar 7 09:23:07 2007
@@ -1 +1 @@
-/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.v3;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.openwire.*;
import org.a
pache.activemq.command.*;
/**
* Test case for the OpenWire marshalling for ProducerInfo
*
*
* NOTE!: This file is auto generated - do not modify!
* if you need to make a change, please see the modify the groovy scripts in the
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
* @version $Revision: $
*/
public class ProducerInfoTest extends BaseCommandTestSupport {
public static ProducerInfoTest SINGLETON = new ProducerInfoTest();
public Object createObject() throws Exception {
ProducerInfo info = new ProducerInfo();
populateObject(info);
return info;
}
protected void populateObject(Object object) throws Exception {
super.populateObject(object);
ProducerInfo info = (ProducerInfo) object;
info.setProducerId(createProducerId("ProducerId:1"));
info.setDestination(createActiveMQDestination("Destination:2"));
{
Br
okerId value[] = new BrokerId[2];
for( int i=0; i < 2; i++ ) {
value[i] = createBrokerId("BrokerPath:3");
}
info.setBrokerPath(value);
}
info.setDispatchAsync(true);
}
}
\ No newline at end of file
+/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.v3;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.openwire.*;
import org.a
pache.activemq.command.*;
/**
* Test case for the OpenWire marshalling for ProducerInfo
*
*
* NOTE!: This file is auto generated - do not modify!
* if you need to make a change, please see the modify the groovy scripts in the
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
* @version $Revision: $
*/
public class ProducerInfoTest extends BaseCommandTestSupport {
public static ProducerInfoTest SINGLETON = new ProducerInfoTest();
public Object createObject() throws Exception {
ProducerInfo info = new ProducerInfo();
populateObject(info);
return info;
}
protected void populateObject(Object object) throws Exception {
super.populateObject(object);
ProducerInfo info = (ProducerInfo) object;
info.setProducerId(createProducerId("ProducerId:1"));
info.setDestination(createActiveMQDestination("Destination:2"));
{
Br
okerId value[] = new BrokerId[2];
for( int i=0; i < 2; i++ ) {
value[i] = createBrokerId("BrokerPath:3");
}
info.setBrokerPath(value);
}
info.setDispatchAsync(true);
info.setWindowSize(1);
}
}
\ No newline at end of file