You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/10/15 21:12:28 UTC
svn commit: r704995 [2/3] - in /activemq/sandbox/chirino-pb/activemq-core:
./ src/main/java/org/apache/activemq/advisory/
src/main/java/org/apache/activemq/broker/
src/main/java/org/apache/activemq/command/
src/main/java/org/apache/activemq/pbwire/ src...
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTopicAck.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTopicAck.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTopicAck.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTopicAck.java Wed Oct 15 12:12:26 2008
@@ -16,22 +16,25 @@
*/
package org.apache.activemq.command;
+import org.apache.activemq.pbwire.PBConversionSupport;
import org.apache.activemq.util.IntrospectionSupport;
/**
* @openwire:marshaller code="50"
* @version $Revision$
*/
-public class JournalTopicAck implements DataStructure {
+public class JournalTopicAck implements DataStructure, ProtocolBufferBacked {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.JOURNAL_ACK;
- ActiveMQDestination destination;
- String clientId;
- String subscritionName;
- MessageId messageId;
- long messageSequenceId;
- TransactionId transactionId;
+ private final PBJournalTopicAck pb;
+
+ public JournalTopicAck() {
+ this(new PBJournalTopicAck());
+ }
+ public JournalTopicAck(PBJournalTopicAck pb) {
+ this.pb = pb;
+ }
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@@ -41,66 +44,87 @@
* @openwire:property version=1
*/
public ActiveMQDestination getDestination() {
- return destination;
+ if( pb.hasDestination() ) {
+ return PBConversionSupport.convert(pb.getDestination());
+ }
+ return null;
}
public void setDestination(ActiveMQDestination destination) {
- this.destination = destination;
+ if( destination==null ) {
+ pb.clearDestination();
+ } else {
+ pb.setDestination(destination.getPB());
+ }
}
/**
* @openwire:property version=1
*/
public MessageId getMessageId() {
- return messageId;
+ if( pb.hasMessageId() ) {
+ return new MessageId(pb.getMessageId());
+ }
+ return null;
}
public void setMessageId(MessageId messageId) {
- this.messageId = messageId;
+ if( messageId==null ) {
+ pb.clearMessageId();
+ } else {
+ pb.setMessageId(messageId.getPB());
+ }
}
/**
* @openwire:property version=1
*/
public long getMessageSequenceId() {
- return messageSequenceId;
+ return pb.getMessageSequenceId();
}
public void setMessageSequenceId(long messageSequenceId) {
- this.messageSequenceId = messageSequenceId;
+ this.pb.setMessageSequenceId(messageSequenceId);
}
/**
* @openwire:property version=1
*/
public String getSubscritionName() {
- return subscritionName;
+ return pb.getSubscritionName();
}
public void setSubscritionName(String subscritionName) {
- this.subscritionName = subscritionName;
+ this.pb.setSubscritionName(subscritionName);
}
/**
* @openwire:property version=1
*/
public String getClientId() {
- return clientId;
+ return pb.getClientId();
}
public void setClientId(String clientId) {
- this.clientId = clientId;
+ this.pb.setClientId(clientId);
}
/**
* @openwire:property version=1
*/
public TransactionId getTransactionId() {
- return transactionId;
+ if( pb.hasTransactionId() ) {
+ return PBConversionSupport.convert(pb.getTransactionId());
+ }
+ return null;
}
public void setTransactionId(TransactionId transaction) {
- this.transactionId = transaction;
+ if( transaction == null ) {
+ pb.clearTransactionId();
+ } else {
+ pb.setTransactionId(transaction.getPB());
+ }
}
public boolean isMarshallAware() {
@@ -108,6 +132,12 @@
}
public String toString() {
- return IntrospectionSupport.toString(this, JournalTopicAck.class);
+ return getClass().getSimpleName()+"[\n"+pb.toString()+"]";
+ }
+ public Command getCommandObject() {
+ return null;
+ }
+ public PBJournalTopicAck getPBCommand() {
+ return pb;
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java Wed Oct 15 12:12:26 2008
@@ -16,24 +16,30 @@
*/
package org.apache.activemq.command;
+import org.apache.activemq.pbwire.PBCommand;
import org.apache.activemq.util.IntrospectionSupport;
/**
* @openwire:marshaller code="53"
* @version $Revision: 1.6 $
*/
-public class JournalTrace implements DataStructure {
+public class JournalTrace implements DataStructure, ProtocolBufferBacked {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.JOURNAL_TRACE;
- private String message;
+ protected final PBJournalTrace pb;
public JournalTrace() {
-
+ this(new PBJournalTrace());
}
public JournalTrace(String message) {
- this.message = message;
+ this();
+ setMessage(message);
+ }
+
+ public JournalTrace(PBJournalTrace pb) {
+ this.pb = pb;
}
public byte getDataStructureType() {
@@ -44,14 +50,14 @@
* @openwire:property version=1
*/
public String getMessage() {
- return message;
+ return pb.getMessage();
}
/**
* @openwire:property version=1
*/
public void setMessage(String message) {
- this.message = message;
+ this.pb.setMessage(message);
}
public boolean isMarshallAware() {
@@ -59,6 +65,14 @@
}
public String toString() {
- return IntrospectionSupport.toString(this, JournalTrace.class);
+ return getClass().getSimpleName()+"[\n"+pb.toString()+"]";
+ }
+
+ public Command getCommandObject() {
+ return null;
+ }
+
+ public PBJournalTrace getPBCommand() {
+ return pb;
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTransaction.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTransaction.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTransaction.java Wed Oct 15 12:12:26 2008
@@ -16,12 +16,13 @@
*/
package org.apache.activemq.command;
+import org.apache.activemq.pbwire.PBConversionSupport;
import org.apache.activemq.util.IntrospectionSupport;
/**
* @openwire:marshaller code="54"
*/
-public class JournalTransaction implements DataStructure {
+public class JournalTransaction implements DataStructure, ProtocolBufferBacked {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.JOURNAL_TRANSACTION;
@@ -31,17 +32,21 @@
public static final byte LOCAL_COMMIT = 4;
public static final byte LOCAL_ROLLBACK = 5;
- public byte type;
- public boolean wasPrepared;
- public TransactionId transactionId;
+ protected final PBJournalTransaction pb;
public JournalTransaction(byte type, TransactionId transactionId, boolean wasPrepared) {
- this.type = type;
- this.transactionId = transactionId;
- this.wasPrepared = wasPrepared;
+ this();
+ setType(type);
+ setTransactionId(transactionId);
+ setWasPrepared(wasPrepared);
}
public JournalTransaction() {
+ this(new PBJournalTransaction());
+ }
+
+ public JournalTransaction(PBJournalTransaction pb) {
+ this.pb = pb;
}
public byte getDataStructureType() {
@@ -52,33 +57,40 @@
* @openwire:property version=1
*/
public TransactionId getTransactionId() {
- return transactionId;
+ if( pb.hasTransactionId() ) {
+ return PBConversionSupport.convert(pb.getTransactionId());
+ }
+ return null;
}
public void setTransactionId(TransactionId transactionId) {
- this.transactionId = transactionId;
+ if( transactionId==null ) {
+ pb.clearTransactionId();
+ } else {
+ pb.setTransactionId(PBConversionSupport.convert(transactionId));
+ }
}
/**
* @openwire:property version=1
*/
public byte getType() {
- return type;
+ return (byte) pb.getType();
}
public void setType(byte type) {
- this.type = type;
+ this.pb.setType(type);
}
/**
* @openwire:property version=1
*/
public boolean getWasPrepared() {
- return wasPrepared;
+ return pb.getWasPrepared();
}
public void setWasPrepared(boolean wasPrepared) {
- this.wasPrepared = wasPrepared;
+ this.pb.setWasPrepared(wasPrepared);
}
public boolean isMarshallAware() {
@@ -86,6 +98,14 @@
}
public String toString() {
- return IntrospectionSupport.toString(this, JournalTransaction.class);
+ return getClass().getSimpleName()+"[\n"+getPBCommand().toString()+"]";
+ }
+
+ public Command getCommandObject() {
+ return null;
+ }
+
+ public PBJournalTransaction getPBCommand() {
+ return pb;
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java Wed Oct 15 12:12:26 2008
@@ -30,6 +30,15 @@
private transient Endpoint from;
private transient Endpoint to;
+ protected PBKeepAliveInfo pb = new PBKeepAliveInfo();
+
+ public KeepAliveInfo() {
+ }
+
+ public KeepAliveInfo(PBKeepAliveInfo pb) {
+ this.pb=pb;
+ }
+
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
@@ -98,6 +107,10 @@
}
public String toString() {
- return IntrospectionSupport.toString(this, KeepAliveInfo.class);
+ return getClass().getSimpleName()+"[\n"+getPBCommand().toString()+"]";
+ }
+
+ public PBKeepAliveInfo getPBCommand() {
+ return pb;
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/LocalTransactionId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/LocalTransactionId.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/LocalTransactionId.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/LocalTransactionId.java Wed Oct 15 12:12:26 2008
@@ -24,18 +24,21 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_LOCAL_TRANSACTION_ID;
- protected ConnectionId connectionId;
- protected long value;
+ protected PBTransactionId pb = new PBTransactionId();
private transient String transactionKey;
private transient int hashCode;
public LocalTransactionId() {
}
+
+ public LocalTransactionId(PBTransactionId pb) {
+ this.pb = pb;
+ }
public LocalTransactionId(ConnectionId connectionId, long transactionId) {
- this.connectionId = connectionId;
- this.value = transactionId;
+ setConnectionId(connectionId);
+ setValue(transactionId);
}
public byte getDataStructureType() {
@@ -52,7 +55,7 @@
public String getTransactionKey() {
if (transactionKey == null) {
- transactionKey = "TX:" + connectionId + ":" + value;
+ transactionKey = "TX:" + getConnectionId() + ":" + getValue();
}
return transactionKey;
}
@@ -63,7 +66,7 @@
public int hashCode() {
if (hashCode == 0) {
- hashCode = connectionId.hashCode() ^ (int)value;
+ hashCode = pb.hashCode();
}
return hashCode;
}
@@ -76,7 +79,7 @@
return false;
}
LocalTransactionId tx = (LocalTransactionId)o;
- return value == tx.value && connectionId.equals(tx.connectionId);
+ return tx.pb.equals(pb);
}
/**
@@ -85,9 +88,9 @@
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
public int compareTo(LocalTransactionId o) {
- int result = connectionId.compareTo(o.connectionId);
+ int result = getConnectionId().compareTo(o.getConnectionId());
if (result == 0) {
- result = (int)(value - o.value);
+ result = (int)(getValue() - o.getValue());
}
return result;
}
@@ -96,22 +99,26 @@
* @openwire:property version=1
*/
public long getValue() {
- return value;
+ return pb.getLocalTransactionId().getId();
}
public void setValue(long transactionId) {
- this.value = transactionId;
+ this.pb.getLocalTransactionId().setId(transactionId);
}
/**
* @openwire:property version=1 cache=true
*/
public ConnectionId getConnectionId() {
- return connectionId;
+ return new ConnectionId(pb.getLocalTransactionId().getConnectionId());
}
public void setConnectionId(ConnectionId connectionId) {
- this.connectionId = connectionId;
+ pb.getLocalTransactionId().setConnectionId(connectionId.getValue());
+ }
+
+ public PBTransactionId getPB() {
+ return pb;
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Message.java Wed Oct 15 12:12:26 2008
@@ -19,15 +19,20 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.pbwire.PBCommand;
+import org.apache.activemq.pbwire.PBConversionSupport;
+import org.apache.activemq.pbwire.PBWireFormat;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
@@ -35,6 +40,8 @@
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat;
+import com.google.protobuf.ByteString;
+
/**
* Represents an ActiveMQ message
*
@@ -47,74 +54,37 @@
* The default minimum amount of memory a message is assumed to use
*/
public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
-
- protected MessageId messageId;
- protected ActiveMQDestination originalDestination;
- protected TransactionId originalTransactionId;
-
- protected ProducerId producerId;
- protected ActiveMQDestination destination;
- protected TransactionId transactionId;
-
- protected long expiration;
- protected long timestamp;
- protected long arrival;
- protected long brokerInTime;
- protected long brokerOutTime;
- protected String correlationId;
- protected ActiveMQDestination replyTo;
- protected boolean persistent;
- protected String type;
- protected byte priority;
- protected String groupID;
- protected int groupSequence;
- protected ConsumerId targetConsumerId;
- protected boolean compressed;
- protected String userID;
-
- protected ByteSequence content;
- protected ByteSequence marshalledProperties;
- protected DataStructure dataStructure;
- protected int redeliveryCounter;
+
+ protected PBMessage pb;
protected int size;
protected Map<String, Object> properties;
protected boolean readOnlyProperties;
protected boolean readOnlyBody;
protected transient boolean recievedByDFBridge;
- protected boolean droppable;
+ protected DataStructure dataStructure;
+ protected ByteSequence marshalledProperties;
private transient short referenceCount;
private transient ActiveMQConnection connection;
private transient org.apache.activemq.broker.region.Destination regionDestination;
private transient MemoryUsage memoryUsage;
- private BrokerId[] brokerPath;
- private BrokerId[] cluster;
+ private ActiveMQDestination destination;
+ public Message(PBMessage pb) {
+ this.pb = pb;
+ }
+
public abstract Message copy();
public abstract void clearBody() throws JMSException;
protected void copy(Message copy) {
super.copy(copy);
- copy.producerId = producerId;
- copy.transactionId = transactionId;
+ copy.pb = pb.clone();
+
copy.destination = destination;
- copy.messageId = messageId != null ? messageId.copy() : null;
- copy.originalDestination = originalDestination;
- copy.originalTransactionId = originalTransactionId;
- copy.expiration = expiration;
- copy.timestamp = timestamp;
- copy.correlationId = correlationId;
- copy.replyTo = replyTo;
- copy.persistent = persistent;
- copy.redeliveryCounter = redeliveryCounter;
- copy.type = type;
- copy.priority = priority;
copy.size = size;
- copy.groupID = groupID;
- copy.userID = userID;
- copy.groupSequence = groupSequence;
if (properties != null) {
copy.properties = new HashMap<String, Object>(properties);
@@ -122,33 +92,26 @@
copy.properties = properties;
}
- copy.content = content;
copy.marshalledProperties = marshalledProperties;
copy.dataStructure = dataStructure;
copy.readOnlyProperties = readOnlyProperties;
copy.readOnlyBody = readOnlyBody;
- copy.compressed = compressed;
copy.recievedByDFBridge = recievedByDFBridge;
- copy.arrival = arrival;
copy.connection = connection;
copy.regionDestination = regionDestination;
- copy.brokerInTime = brokerInTime;
- copy.brokerOutTime = brokerOutTime;
copy.memoryUsage=this.memoryUsage;
- copy.brokerPath = brokerPath;
-
- // lets not copy the following fields
- // copy.targetConsumerId = targetConsumerId;
- // copy.referenceCount = referenceCount;
}
public Object getProperty(String name) throws IOException {
if (properties == null) {
- if (marshalledProperties == null) {
- return null;
+ if (marshalledProperties != null) {
+ properties = unmarsallProperties(marshalledProperties);
+ } else if( pb.hasProperty() ) {
+ properties = PBConversionSupport.convertMap(pb.getPropertyList());
+ } else {
+ properties = new HashMap<String, Object>();
}
- properties = unmarsallProperties(marshalledProperties);
}
return properties.get(name);
}
@@ -156,16 +119,20 @@
@SuppressWarnings("unchecked")
public Map<String, Object> getProperties() throws IOException {
if (properties == null) {
- if (marshalledProperties == null) {
- return Collections.EMPTY_MAP;
+ if (marshalledProperties != null) {
+ properties = unmarsallProperties(marshalledProperties);
+ } else if( pb.hasProperty() ) {
+ properties = PBConversionSupport.convertMap(pb.getPropertyList());
+ } else {
+ properties = new HashMap<String, Object>();
}
- properties = unmarsallProperties(marshalledProperties);
}
return Collections.unmodifiableMap(properties);
}
public void clearProperties() {
marshalledProperties = null;
+ pb.clearProperty();
properties = null;
}
@@ -176,11 +143,14 @@
protected void lazyCreateProperties() throws IOException {
if (properties == null) {
- if (marshalledProperties == null) {
- properties = new HashMap<String, Object>();
- } else {
+ if (marshalledProperties != null) {
properties = unmarsallProperties(marshalledProperties);
marshalledProperties = null;
+ } else if( pb.hasProperty() ) {
+ properties = PBConversionSupport.convertMap(pb.getPropertyList());
+ pb.clearProperty();
+ } else {
+ properties = new HashMap<String, Object>();
}
}
}
@@ -190,13 +160,22 @@
}
public void beforeMarshall(WireFormat wireFormat) throws IOException {
- // Need to marshal the properties.
- if (marshalledProperties == null && properties != null) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream os = new DataOutputStream(baos);
- MarshallingSupport.marshalPrimitiveMap(properties, os);
- os.close();
- marshalledProperties = baos.toByteSequence();
+ if( properties!=null ) {
+ if( wireFormat.getClass() == PBWireFormat.class && !pb.hasProperty() ) {
+ pb.setPropertyList(PBConversionSupport.convert(properties));
+ } else if (marshalledProperties == null) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream os = new DataOutputStream(baos);
+ MarshallingSupport.marshalPrimitiveMap(properties, os);
+ os.close();
+ marshalledProperties = baos.toByteSequence();
+ }
+ }
+ }
+
+ private void marshalPBProps() {
+ if( properties!=null && !pb.hasProperty() ) {
+ pb.setPropertyList(PBConversionSupport.convert(properties));
}
}
@@ -219,180 +198,229 @@
* @openwire:property version=1 cache=true
*/
public ProducerId getProducerId() {
- return producerId;
+ if( pb.hasMessageId() && pb.getMessageId().hasProducerId() ) {
+ return new ProducerId(pb.getMessageId().getProducerId());
+ }
+ return null;
}
public void setProducerId(ProducerId producerId) {
- this.producerId = producerId;
+ this.pb.getMessageId().setProducerId(producerId.getPB());
}
/**
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getDestination() {
+ if( destination==null ) {
+ destination = PBConversionSupport.convert(pb.getDestination());
+ }
return destination;
}
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;
+ pb.setDestination(destination.getPB());
}
/**
* @openwire:property version=1 cache=true
*/
public TransactionId getTransactionId() {
- return transactionId;
+ if( pb.hasTransactionId() ) {
+ return PBConversionSupport.convert(pb.getTransactionId());
+ } else {
+ return null;
+ }
}
public void setTransactionId(TransactionId transactionId) {
- this.transactionId = transactionId;
+ if( transactionId==null ) {
+ pb.clearTransactionId();
+ } else {
+ this.pb.setTransactionId(PBConversionSupport.convert(transactionId));
+ }
}
public boolean isInTransaction() {
- return transactionId != null;
+ return pb.hasTransactionId();
}
/**
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getOriginalDestination() {
- return originalDestination;
+ if( pb.hasOriginalDestination() ) {
+ return PBConversionSupport.convert(pb.getOriginalDestination());
+ }
+ return null;
}
public void setOriginalDestination(ActiveMQDestination destination) {
- this.originalDestination = destination;
+ if( destination==null ) {
+ pb.clearOriginalDestination();
+ } else {
+ this.pb.setOriginalDestination(destination.getPB());
+ }
}
/**
* @openwire:property version=1
*/
public MessageId getMessageId() {
- return messageId;
+ return new MessageId(pb.getMessageId());
}
public void setMessageId(MessageId messageId) {
- this.messageId = messageId;
+ this.pb.setMessageId(messageId.getPB());
}
/**
* @openwire:property version=1 cache=true
*/
public TransactionId getOriginalTransactionId() {
- return originalTransactionId;
+ if( pb.hasOriginalTransactionId() ) {
+ return PBConversionSupport.convert(pb.getOriginalTransactionId());
+ }
+ return null;
}
public void setOriginalTransactionId(TransactionId transactionId) {
- this.originalTransactionId = transactionId;
+ if( transactionId==null ) {
+ pb.clearOriginalTransactionId();
+ } else {
+ this.pb.setOriginalTransactionId(PBConversionSupport.convert(transactionId));
+ }
}
/**
* @openwire:property version=1
*/
public String getGroupID() {
- return groupID;
+ return pb.getGroupID();
}
public void setGroupID(String groupID) {
- this.groupID = groupID;
+ this.pb.setGroupID(groupID);
}
/**
* @openwire:property version=1
*/
public int getGroupSequence() {
- return groupSequence;
+ return pb.getGroupSequence();
}
public void setGroupSequence(int groupSequence) {
- this.groupSequence = groupSequence;
+ this.pb.setGroupSequence(groupSequence);
}
/**
* @openwire:property version=1
*/
public String getCorrelationId() {
- return correlationId;
+ return pb.getCorrelationId();
}
public void setCorrelationId(String correlationId) {
- this.correlationId = correlationId;
+ this.pb.setCorrelationId(correlationId);
}
/**
* @openwire:property version=1
*/
public boolean isPersistent() {
- return persistent;
+ return pb.getPersistent();
}
- public void setPersistent(boolean deliveryMode) {
- this.persistent = deliveryMode;
+ public void setPersistent(boolean persistent) {
+ this.pb.setPersistent(persistent);
}
/**
* @openwire:property version=1
*/
public long getExpiration() {
- return expiration;
+ return pb.getExpiration();
}
public void setExpiration(long expiration) {
- this.expiration = expiration;
+ if( expiration==0) {
+ pb.clearExpiration();
+ } else {
+ this.pb.setExpiration(expiration);
+ }
}
/**
* @openwire:property version=1
*/
public byte getPriority() {
- return priority;
+ return (byte) pb.getPriority();
}
public void setPriority(byte priority) {
- this.priority = priority;
+ this.pb.setPriority(priority);
}
/**
* @openwire:property version=1
*/
public ActiveMQDestination getReplyTo() {
- return replyTo;
+ if( pb.hasReplyTo() ) {
+ return PBConversionSupport.convert(pb.getReplyTo());
+ }
+ return null;
}
public void setReplyTo(ActiveMQDestination replyTo) {
- this.replyTo = replyTo;
+ if( replyTo==null ) {
+ pb.clearReplyTo();
+ } else {
+ this.pb.setReplyTo((replyTo).getPB());
+ }
}
/**
* @openwire:property version=1
*/
public long getTimestamp() {
- return timestamp;
+ return pb.getTimestamp();
}
public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
+ this.pb.setTimestamp(timestamp);
}
/**
* @openwire:property version=1
*/
public String getType() {
- return type;
+ return pb.getType();
}
public void setType(String type) {
- this.type = type;
+ this.pb.setType(type);
}
/**
* @openwire:property version=1
*/
public ByteSequence getContent() {
- return content;
+ if( !pb.hasContent() ) {
+ return null;
+ } else {
+ return new ByteSequence(pb.getContent().toByteArray());
+ }
}
public void setContent(ByteSequence content) {
- this.content = content;
+ if( content==null ) {
+ pb.clearContent();
+ } else {
+ content.compact();
+ pb.setContent(ByteString.copyFrom(content.data));
+ }
}
/**
@@ -427,11 +455,18 @@
* @openwire:property version=1 cache=true
*/
public ConsumerId getTargetConsumerId() {
- return targetConsumerId;
+ if( pb.hasTargetConsumerId() ) {
+ return new ConsumerId(pb.getTargetConsumerId());
+ }
+ return null;
}
public void setTargetConsumerId(ConsumerId targetConsumerId) {
- this.targetConsumerId = targetConsumerId;
+ if( targetConsumerId==null ) {
+ pb.clearTargetConsumerId();
+ } else {
+ this.pb.setTargetConsumerId(targetConsumerId.getPB());
+ }
}
public boolean isExpired() {
@@ -443,22 +478,22 @@
}
public boolean isAdvisory() {
- return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
+ return getType() != null && getType().equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
}
/**
* @openwire:property version=1
*/
public boolean isCompressed() {
- return compressed;
+ return pb.getCompressed();
}
public void setCompressed(boolean compressed) {
- this.compressed = compressed;
+ this.pb.setCompressed(compressed);
}
public boolean isRedelivered() {
- return redeliveryCounter > 0;
+ return pb.getRedeliveryCounter() > 0;
}
public void setRedelivered(boolean redelivered) {
@@ -474,18 +509,18 @@
}
public void incrementRedeliveryCounter() {
- redeliveryCounter++;
+ setRedeliveryCounter(getRedeliveryCounter()+1);
}
/**
* @openwire:property version=1
*/
public int getRedeliveryCounter() {
- return redeliveryCounter;
+ return pb.getRedeliveryCounter();
}
- public void setRedeliveryCounter(int deliveryCounter) {
- this.redeliveryCounter = deliveryCounter;
+ public void setRedeliveryCounter(int redeliveryCounter) {
+ this.pb.setRedeliveryCounter(redeliveryCounter);
}
/**
@@ -494,11 +529,20 @@
* @openwire:property version=1 cache=true
*/
public BrokerId[] getBrokerPath() {
- return brokerPath;
+ BrokerId rc[]=null;
+ if( pb.hasBrokerPath() ) {
+ rc = PBConversionSupport.convertBrokerIdList(pb.getBrokerPathList());
+ }
+ return rc;
}
public void setBrokerPath(BrokerId[] brokerPath) {
- this.brokerPath = brokerPath;
+ if( brokerPath!=null ) {
+ ArrayList<String> rc = PBConversionSupport.convertBrokerIdList(brokerPath);
+ pb.setBrokerPathList(rc);
+ } else {
+ pb.clearBrokerPath();
+ }
}
public boolean isReadOnlyProperties() {
@@ -533,11 +577,11 @@
* @openwire:property version=1
*/
public long getArrival() {
- return arrival;
+ return pb.getArrival();
}
public void setArrival(long arrival) {
- this.arrival = arrival;
+ this.pb.setArrival(arrival);
}
/**
@@ -548,11 +592,11 @@
* @openwire:property version=1
*/
public String getUserID() {
- return userID;
+ return pb.getUserId();
}
- public void setUserID(String jmsxUserID) {
- this.userID = jmsxUserID;
+ public void setUserID(String userId) {
+ this.pb.setUserId(userId);
}
public int getReferenceCount() {
@@ -625,13 +669,8 @@
public int getSize() {
int minimumMessageSize = getMinimumMessageSize();
if (size < minimumMessageSize || size == 0) {
- size = minimumMessageSize;
- if (marshalledProperties != null) {
- size += marshalledProperties.getLength();
- }
- if (content != null) {
- size += content.getLength();
- }
+ marshalPBProps();
+ size = pb.serializedSizeFramed();
}
return size;
}
@@ -669,11 +708,11 @@
* @openwire:property version=2 cache=true
*/
public boolean isDroppable() {
- return droppable;
+ return pb.getDroppable();
}
public void setDroppable(boolean droppable) {
- this.droppable = droppable;
+ this.pb.setDroppable(droppable);
}
/**
@@ -683,11 +722,20 @@
* @openwire:property version=3 cache=true
*/
public BrokerId[] getCluster() {
- return cluster;
+ BrokerId rc[]=null;
+ if( pb.hasCluster() ) {
+ rc = PBConversionSupport.convertBrokerIdList(pb.getClusterList());
+ }
+ return rc;
}
- public void setCluster(BrokerId[] cluster) {
- this.cluster = cluster;
+ public void setCluster(BrokerId[] brokerPath) {
+ if( brokerPath!=null ) {
+ ArrayList<String> rc = PBConversionSupport.convertBrokerIdList(brokerPath);
+ pb.setClusterList(rc);
+ } else {
+ pb.clearCluster();
+ }
}
public boolean isMessage() {
@@ -698,25 +746,30 @@
* @openwire:property version=3
*/
public long getBrokerInTime() {
- return this.brokerInTime;
+ return this.pb.getBrokerInTime();
}
public void setBrokerInTime(long brokerInTime) {
- this.brokerInTime = brokerInTime;
+ this.pb.setBrokerInTime(brokerInTime);
}
/**
* @openwire:property version=3
*/
public long getBrokerOutTime() {
- return this.brokerOutTime;
+ return this.pb.getBrokerOutTime();
}
public void setBrokerOutTime(long brokerOutTime) {
- this.brokerOutTime = brokerOutTime;
+ this.pb.setBrokerOutTime(brokerOutTime);
}
public boolean isDropped() {
return false;
}
+
+ public PBMessage getPBCommand() {
+ return pb;
+ }
+
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.command;
+import org.apache.activemq.command.PBMessageAck.PBAckType;
+import org.apache.activemq.pbwire.PBConversionSupport;
import org.apache.activemq.state.CommandVisitor;
/**
@@ -59,35 +61,30 @@
*/
public static final byte INDIVIDUAL_ACK_TYPE = 4;
- protected byte ackType;
- protected ConsumerId consumerId;
- protected MessageId firstMessageId;
- protected MessageId lastMessageId;
- protected ActiveMQDestination destination;
- protected TransactionId transactionId;
- protected int messageCount;
+ protected PBMessageAck pb = new PBMessageAck();
protected transient String consumerKey;
+ private ActiveMQDestination destination;
+
public MessageAck() {
}
public MessageAck(MessageDispatch md, byte ackType, int messageCount) {
- this.ackType = ackType;
- this.consumerId = md.getConsumerId();
- this.destination = md.getDestination();
- this.lastMessageId = md.getMessage().getMessageId();
- this.messageCount = messageCount;
+ setAckType(ackType);
+ setConsumerId(md.getConsumerId());
+ setDestination(md.getDestination());
+ setLastMessageId(md.getMessage().getMessageId());
+ setMessageCount(messageCount);
+ }
+
+ public MessageAck(PBMessageAck pb) {
+ this.pb = pb;
}
public void copy(MessageAck copy) {
super.copy(copy);
- copy.firstMessageId = firstMessageId;
- copy.lastMessageId = lastMessageId;
- copy.destination = destination;
- copy.transactionId = transactionId;
- copy.ackType = ackType;
- copy.consumerId = consumerId;
+ copy.pb = pb.clone();
}
public byte getDataStructureType() {
@@ -99,93 +96,129 @@
}
public boolean isPoisonAck() {
- return ackType == POSION_ACK_TYPE;
+ return pb.getAckType() == PBMessageAck.PBAckType.POSION_ACK_TYPE;
}
public boolean isStandardAck() {
- return ackType == STANDARD_ACK_TYPE;
+ return pb.getAckType() == PBMessageAck.PBAckType.STANDARD_ACK_TYPE;
}
public boolean isDeliveredAck() {
- return ackType == DELIVERED_ACK_TYPE;
+ return pb.getAckType() == PBMessageAck.PBAckType.DELIVERED_ACK_TYPE;
}
public boolean isRedeliveredAck() {
- return ackType == REDELIVERED_ACK_TYPE;
+ return pb.getAckType() == PBMessageAck.PBAckType.REDELIVERED_ACK_TYPE;
}
public boolean isIndividualAck() {
- return ackType == INDIVIDUAL_ACK_TYPE;
+ return pb.getAckType() == PBMessageAck.PBAckType.INDIVIDUAL_ACK_TYPE;
}
/**
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getDestination() {
+ if( pb.hasDestination() ) {
+ if( destination==null ) {
+ destination = PBConversionSupport.convert(pb.getDestination());
+ }
+ } else {
+ destination = null;
+ }
return destination;
}
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;
+ pb.setDestination(destination.getPB());
}
/**
* @openwire:property version=1 cache=true
*/
public TransactionId getTransactionId() {
- return transactionId;
+ if( pb.hasTransactionId() ) {
+ return PBConversionSupport.convert(pb.getTransactionId());
+ }
+ return null;
}
public void setTransactionId(TransactionId transactionId) {
- this.transactionId = transactionId;
+ if( transactionId==null ) {
+ pb.clearTransactionId();
+ } else {
+ this.pb.setTransactionId(PBConversionSupport.convert(transactionId));
+ }
}
public boolean isInTransaction() {
- return transactionId != null;
+ return this.pb.hasTransactionId();
}
/**
* @openwire:property version=1 cache=true
*/
public ConsumerId getConsumerId() {
- return consumerId;
+ if( pb.hasConsumerId() ) {
+ return new ConsumerId(pb.getConsumerId());
+ }
+ return null;
}
public void setConsumerId(ConsumerId consumerId) {
- this.consumerId = consumerId;
+ if( consumerId == null ) {
+ pb.clearConsumerId();
+ } else {
+ this.pb.setConsumerId(consumerId.getPB());
+ }
}
/**
* @openwire:property version=1
*/
public byte getAckType() {
- return ackType;
+ return (byte) pb.getAckType().getNumber();
}
public void setAckType(byte ackType) {
- this.ackType = ackType;
+ this.pb.setAckType(PBAckType.valueOf(ackType));
}
/**
* @openwire:property version=1
*/
public MessageId getFirstMessageId() {
- return firstMessageId;
+ if( pb.hasFirstMessageId() ) {
+ return new MessageId(pb.getFirstMessageId());
+ }
+ return null;
}
public void setFirstMessageId(MessageId firstMessageId) {
- this.firstMessageId = firstMessageId;
+ if( firstMessageId==null ) {
+ pb.clearFirstMessageId();
+ } else {
+ pb.setFirstMessageId(firstMessageId.getPB());
+ }
}
/**
* @openwire:property version=1
*/
public MessageId getLastMessageId() {
- return lastMessageId;
+ if( pb.hasLastMessageId() ) {
+ return new MessageId(pb.getLastMessageId());
+ }
+ return null;
}
public void setLastMessageId(MessageId lastMessageId) {
- this.lastMessageId = lastMessageId;
+ if( lastMessageId==null ) {
+ pb.clearLastMessageId();
+ } else {
+ pb.setLastMessageId(lastMessageId.getPB());
+ }
}
/**
@@ -194,11 +227,11 @@
* @openwire:property version=1
*/
public int getMessageCount() {
- return messageCount;
+ return pb.getMessageCount();
}
public void setMessageCount(int messageCount) {
- this.messageCount = messageCount;
+ this.pb.setMessageCount(messageCount);
}
public Response visit(CommandVisitor visitor) throws Exception {
@@ -214,4 +247,8 @@
setMessageCount(1);
}
+ public PBMessageAck getPBCommand() {
+ return pb;
+ }
+
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.command;
+import org.apache.activemq.pbwire.PBConversionSupport;
import org.apache.activemq.state.CommandVisitor;
/**
@@ -27,15 +28,23 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH;
- protected ConsumerId consumerId;
- protected ActiveMQDestination destination;
- protected Message message;
- protected int redeliveryCounter;
+ PBMessageDispatch pb = new PBMessageDispatch();
protected transient long deliverySequenceId;
protected transient Object consumer;
protected transient Runnable transmitCallback;
+ private ActiveMQDestination destination;
+
+ private Message message;
+
+ public MessageDispatch() {
+ }
+
+ public MessageDispatch(PBMessageDispatch pb) {
+ this.pb = pb;
+ }
+
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
@@ -48,33 +57,64 @@
* @openwire:property version=1 cache=true
*/
public ConsumerId getConsumerId() {
- return consumerId;
+ if( pb.hasConsumerId() ) {
+ return new ConsumerId(pb.getConsumerId());
+ }
+ return null;
}
public void setConsumerId(ConsumerId consumerId) {
- this.consumerId = consumerId;
+ if( consumerId==null ) {
+ pb.clearConsumerId();
+ } else {
+ this.pb.setConsumerId(consumerId.getPB());
+ }
}
/**
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getDestination() {
+ if( pb.hasDestination() ) {
+ if( destination==null ) {
+ destination = PBConversionSupport.convert(pb.getDestination());
+ }
+ } else {
+ destination = null;
+ }
return destination;
}
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;
+ if( destination ==null ) {
+ pb.clearDestination();
+ } else {
+ this.pb.setDestination(destination.getPB());
+ }
}
/**
* @openwire:property version=1
*/
public Message getMessage() {
+ if( pb.hasMessage() ) {
+ if( message == null ) {
+ message = PBConversionSupport.convert(pb.getMessage());
+ }
+ } else {
+ message = null;
+ }
return message;
}
public void setMessage(Message message) {
this.message = message;
+ if( message == null ) {
+ pb.clearMessage();
+ } else {
+ this.pb.setMessage(message.getPBCommand());
+ }
}
public long getDeliverySequenceId() {
@@ -89,11 +129,11 @@
* @openwire:property version=1
*/
public int getRedeliveryCounter() {
- return redeliveryCounter;
+ return pb.getRedeliveryCounter();
}
- public void setRedeliveryCounter(int deliveryCounter) {
- this.redeliveryCounter = deliveryCounter;
+ public void setRedeliveryCounter(int redeliveryCounter) {
+ this.pb.setRedeliveryCounter(redeliveryCounter);
}
public Object getConsumer() {
@@ -116,4 +156,8 @@
this.transmitCallback = transmitCallback;
}
+ public PBMessageDispatch getPBCommand() {
+ return pb;
+ }
+
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.command;
+import org.apache.activemq.pbwire.PBConversionSupport;
import org.apache.activemq.state.CommandVisitor;
/**
@@ -26,11 +27,8 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH_NOTIFICATION;
- protected ConsumerId consumerId;
- protected ActiveMQDestination destination;
- protected MessageId messageId;
- protected long deliverySequenceId;
-
+ protected PBMessageDispatchNotification pb = new PBMessageDispatchNotification();
+
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
@@ -43,22 +41,36 @@
* @openwire:property version=1 cache=true
*/
public ConsumerId getConsumerId() {
- return consumerId;
+ if( pb.hasConsumerId() ) {
+ return new ConsumerId(pb.getConsumerId());
+ }
+ return null;
}
public void setConsumerId(ConsumerId consumerId) {
- this.consumerId = consumerId;
+ if( consumerId==null ) {
+ pb.clearConsumerId();
+ } else {
+ this.pb.setConsumerId(consumerId.getPB());
+ }
}
/**
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getDestination() {
- return destination;
+ if( pb.hasDestination() ) {
+ return PBConversionSupport.convert(pb.getDestination());
+ }
+ return null;
}
public void setDestination(ActiveMQDestination destination) {
- this.destination = destination;
+ if( destination==null ) {
+ pb.clearDestination();
+ } else {
+ this.pb.setDestination(destination.getPB());
+ }
}
/**
@@ -66,11 +78,11 @@
*/
public long getDeliverySequenceId() {
- return deliverySequenceId;
+ return pb.getDeliverySequenceId();
}
public void setDeliverySequenceId(long deliverySequenceId) {
- this.deliverySequenceId = deliverySequenceId;
+ this.pb.setDeliverySequenceId(deliverySequenceId);
}
public Response visit(CommandVisitor visitor) throws Exception {
@@ -81,11 +93,22 @@
* @openwire:property version=1
*/
public MessageId getMessageId() {
- return messageId;
+ if( pb.hasMessageId() ) {
+ return new MessageId(pb.getMessageId());
+ }
+ return null;
}
public void setMessageId(MessageId messageId) {
- this.messageId = messageId;
+ if( messageId ==null ) {
+ pb.clearMessageId();
+ } else {
+ this.pb.setMessageId(messageId.getPB());
+ }
+ }
+
+ public PBMessageDispatchNotification getPBCommand() {
+ return pb;
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageId.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageId.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageId.java Wed Oct 15 12:12:26 2008
@@ -24,20 +24,17 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ID;
- protected ProducerId producerId;
- protected long producerSequenceId;
- protected long brokerSequenceId;
-
private transient String key;
private transient int hashCode;
+ protected PBMessageId pb = new PBMessageId();
+
public MessageId() {
- this.producerId = new ProducerId();
}
public MessageId(ProducerInfo producerInfo, long producerSequenceId) {
- this.producerId = producerInfo.getProducerId();
- this.producerSequenceId = producerSequenceId;
+ this.pb.setProducerId(producerInfo.getProducerId().getPB());
+ this.pb.setId(producerSequenceId);
}
public MessageId(String messageKey) {
@@ -49,8 +46,12 @@
}
public MessageId(ProducerId producerId, long producerSequenceId) {
- this.producerId = producerId;
- this.producerSequenceId = producerSequenceId;
+ setProducerId(producerId);
+ setProducerSequenceId(producerSequenceId);
+ }
+
+ public MessageId(PBMessageId pb) {
+ this.pb = pb;
}
/**
@@ -61,10 +62,10 @@
// Parse off the sequenceId
int p = messageKey.lastIndexOf(":");
if (p >= 0) {
- producerSequenceId = Long.parseLong(messageKey.substring(p + 1));
+ setProducerSequenceId( Long.parseLong(messageKey.substring(p + 1)) );
messageKey = messageKey.substring(0, p);
}
- producerId = new ProducerId(messageKey);
+ setProducerId(new ProducerId(messageKey));
}
/**
@@ -89,19 +90,19 @@
}
MessageId id = (MessageId)o;
- return producerSequenceId == id.producerSequenceId && producerId.equals(id.producerId);
+ return pb.equals(id.pb);
}
public int hashCode() {
if (hashCode == 0) {
- hashCode = producerId.hashCode() ^ (int)producerSequenceId;
+ hashCode = pb.hashCode();
}
return hashCode;
}
public String toString() {
if (key == null) {
- key = producerId.toString() + ":" + producerSequenceId;
+ key = getProducerId() + ":" + getProducerSequenceId();
}
return key;
}
@@ -110,33 +111,33 @@
* @openwire:property version=1 cache=true
*/
public ProducerId getProducerId() {
- return producerId;
+ return new ProducerId(pb.getProducerId());
}
public void setProducerId(ProducerId producerId) {
- this.producerId = producerId;
+ pb.setProducerId(producerId.getPB());
}
/**
* @openwire:property version=1
*/
public long getProducerSequenceId() {
- return producerSequenceId;
+ return pb.getId();
}
public void setProducerSequenceId(long producerSequenceId) {
- this.producerSequenceId = producerSequenceId;
+ pb.setId(producerSequenceId);
}
/**
* @openwire:property version=1
*/
public long getBrokerSequenceId() {
- return brokerSequenceId;
+ return pb.getBrokerSequenceId();
}
public void setBrokerSequenceId(long brokerSequenceId) {
- this.brokerSequenceId = brokerSequenceId;
+ this.pb.setBrokerSequenceId(brokerSequenceId);
}
public boolean isMarshallAware() {
@@ -144,9 +145,9 @@
}
public MessageId copy() {
- MessageId copy = new MessageId(producerId, producerSequenceId);
+ MessageId copy = new MessageId();
+ copy.pb = pb.clone();
copy.key = key;
- copy.brokerSequenceId = brokerSequenceId;
return copy;
}
@@ -162,4 +163,8 @@
}
return result;
}
+
+ public PBMessageId getPB() {
+ return pb;
+ }
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.command;
+import org.apache.activemq.pbwire.PBConversionSupport;
import org.apache.activemq.state.CommandVisitor;
/**
@@ -29,11 +30,9 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_PULL;
- protected ConsumerId consumerId;
- protected ActiveMQDestination destination;
- protected long timeout;
- private MessageId messageId;
- private String correlationId;
+ protected PBMessagePull pb = new PBMessagePull();
+
+ private ActiveMQDestination destination;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@@ -55,33 +54,52 @@
* @openwire:property version=1 cache=true
*/
public ConsumerId getConsumerId() {
- return consumerId;
+ if( pb.hasConsumerId() ) {
+ return new ConsumerId(pb.getConsumerId());
+ }
+ return null;
}
public void setConsumerId(ConsumerId consumerId) {
- this.consumerId = consumerId;
+ if( consumerId==null ) {
+ pb.clearConsumerId();
+ } else {
+ this.pb.setConsumerId(consumerId.getPB());
+ }
}
/**
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getDestination() {
+ if( pb.hasDestination() ) {
+ if( destination==null) {
+ destination = PBConversionSupport.convert(pb.getDestination());
+ }
+ } else {
+ destination=null;
+ }
return destination;
}
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;
+ if( destination==null ) {
+ pb.clearDestination();
+ } else {
+ this.pb.setDestination(destination.getPB());
+ }
}
/**
* @openwire:property version=1
*/
public long getTimeout() {
- return timeout;
+ return pb.getTimeout();
}
public void setTimeout(long timeout) {
- this.timeout = timeout;
+ this.pb.setTimeout(timeout);
}
/**
@@ -91,11 +109,11 @@
* @openwire:property version=3
*/
public String getCorrelationId() {
- return correlationId;
+ return pb.getCorrelationId();
}
public void setCorrelationId(String correlationId) {
- this.correlationId = correlationId;
+ this.pb.setCorrelationId(correlationId);
}
@@ -106,10 +124,21 @@
* @openwire:property version=3
*/
public MessageId getMessageId() {
- return messageId;
+ if( pb.hasMessageId() ) {
+ return new MessageId(pb.getMessageId());
+ }
+ return null;
}
public void setMessageId(MessageId messageId) {
- this.messageId = messageId;
+ if( messageId == null ) {
+ pb.clearMessageId();
+ } else {
+ this.pb.setMessageId(messageId.getPB());
+ }
+ }
+
+ public PBMessagePull getPBCommand() {
+ return pb;
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java Wed Oct 15 12:12:26 2008
@@ -18,6 +18,8 @@
import org.apache.activemq.state.CommandVisitor;
+import com.google.protobuf.ByteString;
+
/**
* Represents a partial command; a large command that has been split up into
* pieces.
@@ -29,8 +31,7 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PARTIAL_COMMAND;
- private int commandId;
- private byte[] data;
+ protected PBPartialCommand pb = new PBPartialCommand();
private transient Endpoint from;
private transient Endpoint to;
@@ -46,11 +47,11 @@
* @openwire:property version=1
*/
public int getCommandId() {
- return commandId;
+ return pb.getCommandId();
}
public void setCommandId(int commandId) {
- this.commandId = commandId;
+ this.pb.setCommandId(commandId);
}
/**
@@ -59,11 +60,18 @@
* @openwire:property version=1 mandatory=true
*/
public byte[] getData() {
- return data;
+ if( pb.hasData() ) {
+ return pb.getData().toByteArray();
+ }
+ return null;
}
public void setData(byte[] data) {
- this.data = data;
+ if( data == null ) {
+ pb.clearData();
+ } else {
+ this.pb.setData(ByteString.copyFrom(data));
+ }
}
public Endpoint getFrom() {
@@ -131,10 +139,18 @@
public String toString() {
int size = 0;
- if (data != null) {
- size = data.length;
+ if (pb.hasData()) {
+ size = pb.getData().size();
}
- return "PartialCommand[id: " + commandId + " data: " + size + " byte(s)]";
+ return "PartialCommand[id: " + getCommandId() + " data: " + size + " byte(s)]";
+ }
+
+ public PBPartialCommand getPBCommand() {
+ return pb;
+ }
+
+ public Command getCommandObject() {
+ return null;
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java Wed Oct 15 12:12:26 2008
@@ -31,21 +31,19 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ACK;
- protected ProducerId producerId;
- protected int size;
+ protected PBProducerAck pb = new PBProducerAck();
public ProducerAck() {
}
public ProducerAck(ProducerId producerId, int size) {
- this.producerId = producerId;
- this.size = size;
+ setProducerId(producerId);
+ setSize(size);
}
public void copy(ProducerAck copy) {
super.copy(copy);
- copy.producerId = producerId;
- copy.size = size;
+ copy.pb = pb.clone();
}
public byte getDataStructureType() {
@@ -62,11 +60,18 @@
* @openwire:property version=3
*/
public ProducerId getProducerId() {
- return producerId;
+ if( pb.hasProducerId() ) {
+ return new ProducerId(pb.getProducerId());
+ }
+ return null;
}
public void setProducerId(ProducerId producerId) {
- this.producerId = producerId;
+ if( producerId==null ) {
+ pb.clearProducerId();
+ } else {
+ this.pb.setProducerId(producerId.getPB());
+ }
}
/**
@@ -75,11 +80,15 @@
* @openwire:property version=3
*/
public int getSize() {
- return size;
+ return pb.getSize();
}
public void setSize(int size) {
- this.size = size;
+ this.pb.setSize(size);
+ }
+
+ public PBProducerAck getPBCommand() {
+ return pb;
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerId.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerId.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerId.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.command;
+import org.apache.activemq.pbwire.PBCommand;
+
+
/**
* @openwire:marshaller code="123"
* @version $Revision$
@@ -24,9 +27,7 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ID;
- protected String connectionId;
- protected long sessionId;
- protected long value;
+ protected PBProducerId pb = new PBProducerId();
protected transient int hashCode;
protected transient String key;
@@ -36,27 +37,28 @@
}
public ProducerId(SessionId sessionId, long producerId) {
- this.connectionId = sessionId.getConnectionId();
- this.sessionId = sessionId.getValue();
- this.value = producerId;
+ this.pb.setSessionId(sessionId.getPB());
+ this.pb.setId(producerId);
}
public ProducerId(ProducerId id) {
- this.connectionId = id.getConnectionId();
- this.sessionId = id.getSessionId();
- this.value = id.getValue();
+ this.pb = id.pb.clone();
}
public ProducerId(String producerKey) {
// Parse off the producerId
int p = producerKey.lastIndexOf(":");
if (p >= 0) {
- value = Long.parseLong(producerKey.substring(p + 1));
+ setValue(Long.parseLong(producerKey.substring(p + 1)));
producerKey = producerKey.substring(0, p);
}
setProducerSessionKey(producerKey);
}
+ public ProducerId(PBProducerId producerId) {
+ pb = producerId;
+ }
+
public SessionId getParentId() {
if (parentId == null) {
parentId = new SessionId(this);
@@ -66,7 +68,7 @@
public int hashCode() {
if (hashCode == 0) {
- hashCode = connectionId.hashCode() ^ (int)sessionId ^ (int)value;
+ hashCode = pb.hashCode();
}
return hashCode;
}
@@ -78,8 +80,7 @@
if (o == null || o.getClass() != ProducerId.class) {
return false;
}
- ProducerId id = (ProducerId)o;
- return sessionId == id.sessionId && value == id.value && connectionId.equals(id.connectionId);
+ return this.pb.equals(((ProducerId)o).pb);
}
/**
@@ -89,16 +90,16 @@
// Parse off the value
int p = sessionKey.lastIndexOf(":");
if (p >= 0) {
- sessionId = Long.parseLong(sessionKey.substring(p + 1));
+ setSessionId( Long.parseLong(sessionKey.substring(p + 1)) );
sessionKey = sessionKey.substring(0, p);
}
// The rest is the value
- connectionId = sessionKey;
+ setConnectionId(sessionKey);
}
public String toString() {
if (key == null) {
- key = connectionId + ":" + sessionId + ":" + value;
+ key = getConnectionId() + ":" + getSessionId() + ":" + getValue();
}
return key;
}
@@ -111,36 +112,40 @@
* @openwire:property version=1 cache=true
*/
public String getConnectionId() {
- return connectionId;
+ return this.pb.getSessionId().getConnectionId();
}
public void setConnectionId(String connectionId) {
- this.connectionId = connectionId;
+ this.pb.getSessionId().setConnectionId(connectionId);
}
/**
* @openwire:property version=1
*/
public long getValue() {
- return value;
+ return this.pb.getId();
}
public void setValue(long producerId) {
- this.value = producerId;
+ this.pb.setId(producerId);
}
/**
* @openwire:property version=1
*/
public long getSessionId() {
- return sessionId;
+ return this.pb.getSessionId().getId();
}
public void setSessionId(long sessionId) {
- this.sessionId = sessionId;
+ this.pb.getSessionId().setId(sessionId);
}
public boolean isMarshallAware() {
return false;
}
+
+ public PBProducerId getPB() {
+ return pb;
+ }
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.command;
+import java.util.ArrayList;
+
+import org.apache.activemq.pbwire.PBConversionSupport;
import org.apache.activemq.state.CommandVisitor;
/**
@@ -27,21 +30,23 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_INFO;
- protected ProducerId producerId;
- protected ActiveMQDestination destination;
- protected BrokerId[] brokerPath;
- protected boolean dispatchAsync;
- protected int windowSize;
+ protected PBProducerInfo pb = new PBProducerInfo();
+
+ private ActiveMQDestination destination;
public ProducerInfo() {
}
public ProducerInfo(ProducerId producerId) {
- this.producerId = producerId;
+ setProducerId(producerId);
}
public ProducerInfo(SessionInfo sessionInfo, long producerId) {
- this.producerId = new ProducerId(sessionInfo.getSessionId(), producerId);
+ setProducerId(new ProducerId(sessionInfo.getSessionId(), producerId));
+ }
+
+ public ProducerInfo(PBProducerInfo pb) {
+ this.pb=pb;
}
public ProducerInfo copy() {
@@ -52,8 +57,7 @@
public void copy(ProducerInfo info) {
super.copy(info);
- info.producerId = producerId;
- info.destination = destination;
+ info.pb = pb.clone();
}
public byte getDataStructureType() {
@@ -64,22 +68,41 @@
* @openwire:property version=1 cache=true
*/
public ProducerId getProducerId() {
- return producerId;
+ if( pb.hasProducerId() ) {
+ return new ProducerId(pb.getProducerId());
+ }
+ return null;
}
public void setProducerId(ProducerId producerId) {
- this.producerId = producerId;
+ if( producerId==null ) {
+ pb.clearProducerId();
+ } else {
+ this.pb.setProducerId(producerId.getPB());
+ }
}
/**
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getDestination() {
+ if( pb.hasDestination() ) {
+ if( destination == null ) {
+ destination = PBConversionSupport.convert(pb.getDestination());
+ }
+ } else {
+ destination = null;
+ }
return destination;
}
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;
+ if( destination==null ) {
+ this.pb.clearDestination();
+ } else {
+ this.pb.setDestination(destination.getPB());
+ }
}
public RemoveInfo createRemoveCommand() {
@@ -94,11 +117,20 @@
* @openwire:property version=1 cache=true
*/
public BrokerId[] getBrokerPath() {
- return brokerPath;
+ BrokerId rc[]=null;
+ if( pb.hasBrokerPath() ) {
+ rc = PBConversionSupport.convertBrokerIdList(pb.getBrokerPathList());
+ }
+ return rc;
}
public void setBrokerPath(BrokerId[] brokerPath) {
- this.brokerPath = brokerPath;
+ if( brokerPath!=null ) {
+ ArrayList<String> rc = PBConversionSupport.convertBrokerIdList(brokerPath);
+ pb.setBrokerPathList(rc);
+ } else {
+ pb.clearBrokerPath();
+ }
}
public Response visit(CommandVisitor visitor) throws Exception {
@@ -113,11 +145,11 @@
* @openwire:property version=2
*/
public boolean isDispatchAsync() {
- return dispatchAsync;
+ return pb.getDispatchAsync();
}
public void setDispatchAsync(boolean dispatchAsync) {
- this.dispatchAsync = dispatchAsync;
+ this.pb.setDispatchAsync(dispatchAsync);
}
/**
@@ -128,11 +160,15 @@
* @openwire:property version=3
*/
public int getWindowSize() {
- return windowSize;
+ return pb.getWindowSize();
}
public void setWindowSize(int windowSize) {
- this.windowSize = windowSize;
+ this.pb.setWindowSize(windowSize);
+ }
+
+ public PBProducerInfo getPBCommand() {
+ return pb;
}
}
Added: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProtocolBufferBacked.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProtocolBufferBacked.java?rev=704995&view=auto
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProtocolBufferBacked.java (added)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProtocolBufferBacked.java Wed Oct 15 12:12:26 2008
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.pbwire.PBCommand;
+
+public interface ProtocolBufferBacked {
+
+ @SuppressWarnings("unchecked")
+ PBCommand getPBCommand();
+
+ Command getCommandObject();
+
+}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/RemoveInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/RemoveInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/RemoveInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/RemoveInfo.java Wed Oct 15 12:12:26 2008
@@ -30,13 +30,17 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_INFO;
- protected DataStructure objectId;
+ protected PBRemoveInfo pb = new PBRemoveInfo();
public RemoveInfo() {
}
public RemoveInfo(DataStructure objectId) {
- this.objectId = objectId;
+ setObjectId(objectId);
+ }
+
+ public RemoveInfo(PBRemoveInfo pb) {
+ this.pb=pb;
}
public byte getDataStructureType() {
@@ -47,14 +51,43 @@
* @openwire:property version=1 cache=true
*/
public DataStructure getObjectId() {
- return objectId;
+ if( pb.hasConnectionId() ) {
+ return new ConnectionId(pb.getConnectionId());
+ }
+ if( pb.hasSessionId() ) {
+ return new SessionId(pb.getSessionId());
+ }
+ if( pb.hasConsumerId() ) {
+ return new ConsumerId(pb.getConsumerId());
+ }
+ if( pb.hasProducerId() ) {
+ return new ProducerId(pb.getProducerId());
+ }
+ return null;
}
public void setObjectId(DataStructure objectId) {
- this.objectId = objectId;
+ pb.clear();
+ switch( objectId.getDataStructureType() ) {
+ case ConnectionId.DATA_STRUCTURE_TYPE:
+ pb.setConnectionId(((ConnectionId)objectId).getValue());
+ break;
+ case SessionId.DATA_STRUCTURE_TYPE:
+ pb.setSessionId(((SessionId)objectId).getPB());
+ break;
+ case ConsumerId.DATA_STRUCTURE_TYPE:
+ pb.setConsumerId(((ConsumerId)objectId).getPB());
+ break;
+ case ProducerId.DATA_STRUCTURE_TYPE:
+ pb.setProducerId(((ProducerId)objectId).getPB());
+ break;
+ default:
+ throw new RuntimeException("Unknown remove command type: " + objectId.getDataStructureType());
+ }
}
public Response visit(CommandVisitor visitor) throws Exception {
+ DataStructure objectId = getObjectId();
switch (objectId.getDataStructureType()) {
case ConnectionId.DATA_STRUCTURE_TYPE:
return visitor.processRemoveConnection((ConnectionId)objectId);
@@ -73,28 +106,32 @@
* Returns true if this event is for a removed connection
*/
public boolean isConnectionRemove() {
- return objectId.getDataStructureType() == ConnectionId.DATA_STRUCTURE_TYPE;
+ return pb.hasConnectionId();
}
/**
* Returns true if this event is for a removed session
*/
public boolean isSessionRemove() {
- return objectId.getDataStructureType() == SessionId.DATA_STRUCTURE_TYPE;
+ return pb.hasSessionId();
}
/**
* Returns true if this event is for a removed consumer
*/
public boolean isConsumerRemove() {
- return objectId.getDataStructureType() == ConsumerId.DATA_STRUCTURE_TYPE;
+ return pb.hasConsumerId();
}
/**
* Returns true if this event is for a removed producer
*/
public boolean isProducerRemove() {
- return objectId.getDataStructureType() == ProducerId.DATA_STRUCTURE_TYPE;
+ return pb.hasProducerId();
+ }
+
+ public PBRemoveInfo getPBCommand() {
+ return pb;
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java Wed Oct 15 12:12:26 2008
@@ -26,9 +26,7 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_SUBSCRIPTION_INFO;
- protected ConnectionId connectionId;
- protected String clientId;
- protected String subscriptionName;
+ protected PBRemoveSubscriptionInfo pb = new PBRemoveSubscriptionInfo();
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@@ -38,11 +36,18 @@
* @openwire:property version=1 cache=true
*/
public ConnectionId getConnectionId() {
- return connectionId;
+ if( pb.hasConnectionId() ) {
+ return new ConnectionId(pb.getConnectionId());
+ }
+ return null;
}
public void setConnectionId(ConnectionId connectionId) {
- this.connectionId = connectionId;
+ if( connectionId==null ) {
+ pb.clearConnectionId();
+ } else {
+ this.pb.setConnectionId(connectionId.getValue());
+ }
}
/**
@@ -50,37 +55,41 @@
* @deprecated
*/
public String getSubcriptionName() {
- return subscriptionName;
+ return getSubscriptionName();
}
/**
* @deprecated
*/
public void setSubcriptionName(String subscriptionName) {
- this.subscriptionName = subscriptionName;
+ setSubscriptionName(subscriptionName);
}
public String getSubscriptionName() {
- return subscriptionName;
+ return pb.getSubscriptionName();
}
public void setSubscriptionName(String subscriptionName) {
- this.subscriptionName = subscriptionName;
+ this.pb.setSubscriptionName(subscriptionName);
}
/**
* @openwire:property version=1
*/
public String getClientId() {
- return clientId;
+ return pb.getClientId();
}
public void setClientId(String clientId) {
- this.clientId = clientId;
+ this.pb.setClientId(clientId);
}
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processRemoveSubscription(this);
}
+ public PBRemoveSubscriptionInfo getPBCommand() {
+ return pb;
+ }
+
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java Wed Oct 15 12:12:26 2008
@@ -31,11 +31,7 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REPLAY;
- private String producerId;
- private int firstAckNumber;
- private int lastAckNumber;
- private int firstNakNumber;
- private int lastNakNumber;
+ protected PBReplayCommand pb = new PBReplayCommand();
public ReplayCommand() {
}
@@ -45,7 +41,7 @@
}
public String getProducerId() {
- return producerId;
+ return pb.getProducerId();
}
/**
@@ -54,11 +50,11 @@
* @openwire:property version=1 cache=false
*/
public void setProducerId(String producerId) {
- this.producerId = producerId;
+ this.pb.setProducerId(producerId);
}
public int getFirstAckNumber() {
- return firstAckNumber;
+ return pb.getFirstAckNumber();
}
/**
@@ -68,11 +64,11 @@
* @openwire:property version=1
*/
public void setFirstAckNumber(int firstSequenceNumber) {
- this.firstAckNumber = firstSequenceNumber;
+ this.pb.setFirstAckNumber(firstSequenceNumber);
}
public int getLastAckNumber() {
- return lastAckNumber;
+ return pb.getLastAckNumber();
}
/**
@@ -81,8 +77,8 @@
*
* @openwire:property version=1
*/
- public void setLastAckNumber(int lastSequenceNumber) {
- this.lastAckNumber = lastSequenceNumber;
+ public void setLastAckNumber(int lastAckNumber) {
+ this.pb.setLastAckNumber(lastAckNumber);
}
public Response visit(CommandVisitor visitor) throws Exception {
@@ -95,11 +91,11 @@
* @openwire:property version=1
*/
public int getFirstNakNumber() {
- return firstNakNumber;
+ return pb.getFirstNakNumber();
}
public void setFirstNakNumber(int firstNakNumber) {
- this.firstNakNumber = firstNakNumber;
+ this.pb.setFirstNakNumber(firstNakNumber);
}
/**
@@ -108,15 +104,19 @@
* @openwire:property version=1
*/
public int getLastNakNumber() {
- return lastNakNumber;
+ return pb.getLastNakNumber();
}
public void setLastNakNumber(int lastNakNumber) {
- this.lastNakNumber = lastNakNumber;
+ this.pb.setLastNakNumber(lastNakNumber);
}
public String toString() {
return "ReplayCommand {commandId = " + getCommandId() + ", firstNakNumber = " + getFirstNakNumber() + ", lastNakNumber = " + getLastNakNumber() + "}";
}
+
+ public PBReplayCommand getPBCommand() {
+ return pb;
+ }
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Response.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Response.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Response.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Response.java Wed Oct 15 12:12:26 2008
@@ -25,7 +25,15 @@
public class Response extends BaseCommand {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.RESPONSE;
- int correlationId;
+
+ protected PBResponse pb = new PBResponse();
+
+ public Response() {
+
+ }
+ public Response(PBResponse pb) {
+ this.pb = pb;
+ }
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@@ -35,11 +43,11 @@
* @openwire:property version=1
*/
public int getCorrelationId() {
- return correlationId;
+ return pb.getCorrelationId();
}
- public void setCorrelationId(int responseId) {
- this.correlationId = responseId;
+ public void setCorrelationId(int correlationId) {
+ this.pb.setCorrelationId(correlationId);
}
public boolean isResponse() {
@@ -53,4 +61,8 @@
public Response visit(CommandVisitor visitor) throws Exception {
return null;
}
+
+ public PBResponse getPBCommand() {
+ return pb;
+ }
}