You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/10/15 21:12:28 UTC
svn commit: r704995 [1/3] - in /activemq/sandbox/chirino-pb/activemq-core:
./ src/main/java/org/apache/activemq/advisory/
src/main/java/org/apache/activemq/broker/
src/main/java/org/apache/activemq/command/
src/main/java/org/apache/activemq/pbwire/ src...
Author: chirino
Date: Wed Oct 15 12:12:26 2008
New Revision: 704995
URL: http://svn.apache.org/viewvc?rev=704995&view=rev
Log:
First pass of a protocol buffer based marshalling impl
Added:
activemq/sandbox/chirino-pb/activemq-core/PBWIRE-README.txt
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProtocolBufferBacked.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBCommand.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBCommandVisitor.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBConversionSupport.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBWireFormat.java (with props)
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBWireFormatFactory.java (with props)
activemq/sandbox/chirino-pb/activemq-core/src/main/proto/
activemq/sandbox/chirino-pb/activemq-core/src/main/proto/pbwire.proto
activemq/sandbox/chirino-pb/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/pb
activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/pbwire/
activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/pbwire/PBAMQStoreQueueTest.java
activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/pbwire/SimpleTest.java
Modified:
activemq/sandbox/chirino-pb/activemq-core/pom.xml
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQQueue.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTopic.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Command.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerId.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DataStructure.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/FlushCommand.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalQueueAck.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTopicAck.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalTransaction.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/LocalTransactionId.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Message.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessageId.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerId.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/RemoveInfo.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Response.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/SessionId.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/SessionInfo.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/TransactionId.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/TransactionInfo.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
Added: activemq/sandbox/chirino-pb/activemq-core/PBWIRE-README.txt
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/PBWIRE-README.txt?rev=704995&view=auto
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/PBWIRE-README.txt (added)
+++ activemq/sandbox/chirino-pb/activemq-core/PBWIRE-README.txt Wed Oct 15 12:12:26 2008
@@ -0,0 +1,13 @@
+=======================================================================
+ Implementation Notes for PB Wire Format
+=======================================================================
+
+- Need to see the actual number of byte[] copies that are done for each
+ marshall and unmarshall. I suspect that we will need a custom
+ CodedInputStream/CodedOutputStream implementations so we can do more
+ efficient scatter gather type byte[] handling.
+
+- There are several objects which just wrap a String value, we might
+ need to consider re-factoring that out since it does not add value:
+ * BrokerId
+ * ConnectionId
\ No newline at end of file
Modified: activemq/sandbox/chirino-pb/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/pom.xml?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/pom.xml (original)
+++ activemq/sandbox/chirino-pb/activemq-core/pom.xml Wed Oct 15 12:12:26 2008
@@ -91,6 +91,13 @@
<artifactId>camel-jms</artifactId>
<optional>true</optional>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq.protobuf</groupId>
+ <artifactId>activemq-protobuf</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
@@ -343,6 +350,20 @@
<build>
<plugins>
+
+ <plugin>
+ <groupId>org.apache.activemq.protobuf</groupId>
+ <artifactId>activemq-protobuf</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
<!-- Configure which tests are included/excuded -->
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Wed Oct 15 12:12:26 2008
@@ -418,7 +418,9 @@
advisoryMessage.setPersistent(false);
advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
- advisoryMessage.setTargetConsumerId(targetConsumerId);
+ if( targetConsumerId!=null ) {
+ advisoryMessage.setTargetConsumerId(targetConsumerId);
+ }
advisoryMessage.setDestination(topic);
advisoryMessage.setResponseRequired(false);
advisoryMessage.setProducerId(advisoryProducerId);
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Oct 15 12:12:26 2008
@@ -209,6 +209,7 @@
}
public void serviceTransportException(IOException e) {
+ LOG.warn("Transport failed:",e);
BrokerService bService=connector.getBrokerService();
if(bService.isShutdownOnSlaveFailure()){
if(brokerInfo!=null){
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java Wed Oct 15 12:12:26 2008
@@ -33,6 +33,7 @@
import javax.jms.MessageNotReadableException;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.PBMessage.PBMessageType;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
@@ -99,6 +100,14 @@
protected transient DataInputStream dataIn;
protected transient int length;
+ public ActiveMQBytesMessage() {
+ super(new PBMessage().setMessageType(PBMessageType.BYTES_MESSAGE));
+ }
+
+ public ActiveMQBytesMessage(PBMessage message) {
+ super(message);
+ }
+
public Message copy() {
ActiveMQBytesMessage copy = new ActiveMQBytesMessage();
copy(copy);
@@ -123,7 +132,7 @@
if (dataOut != null) {
dataOut.close();
ByteSequence bs = bytesOut.toByteSequence();
- if (compressed) {
+ if (isCompressed()) {
int pos = bs.offset;
ByteSequenceData.writeIntBig(bs, length);
bs.offset = pos;
@@ -784,7 +793,7 @@
throw JMSExceptionSupport.create(e);
}
length = 0;
- compressed = true;
+ setCompressed(true);
Deflater deflater = new Deflater(Deflater.BEST_SPEED);
os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) {
public void write(byte[] arg0) throws IOException {
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQDestination.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQDestination.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQDestination.java Wed Oct 15 12:12:26 2008
@@ -62,26 +62,27 @@
private static final long serialVersionUID = -3885260014960795889L;
- protected String physicalName;
-
protected transient ActiveMQDestination[] compositeDestinations;
protected transient String[] destinationPaths;
protected transient boolean isPattern;
protected transient int hashValue;
protected Map<String, String> options;
+ protected PBActiveMQDestination pb;
- public ActiveMQDestination() {
- }
- protected ActiveMQDestination(String name) {
- setPhysicalName(name);
- }
-
- public ActiveMQDestination(ActiveMQDestination composites[]) {
+ public ActiveMQDestination(PBActiveMQDestination pb, ActiveMQDestination composites[]) {
+ this(pb);
setCompositeDestinations(composites);
}
+ public ActiveMQDestination(PBActiveMQDestination pb) {
+ this.pb = pb;
+ if( pb.hasName() ) {
+ setPhysicalName(pb.getName());
+ }
+ }
+
// static helper methods for working with destinations
// -------------------------------------------------------------------------
public static ActiveMQDestination createDestination(String name, byte defaultType) {
@@ -185,14 +186,14 @@
sb.append(destinations[i].getQualifiedName());
}
}
- physicalName = sb.toString();
+ pb.setName(sb.toString());
}
public String getQualifiedName() {
if (isComposite()) {
- return physicalName;
+ return pb.getName();
}
- return getQualifiedPrefix() + physicalName;
+ return getQualifiedPrefix() + pb.getName();
}
protected abstract String getQualifiedPrefix();
@@ -201,7 +202,7 @@
* @openwire:property version=1
*/
public String getPhysicalName() {
- return physicalName;
+ return pb.getName();
}
public void setPhysicalName(String physicalName) {
@@ -233,7 +234,7 @@
throw new IllegalArgumentException("Invalid destination name: " + physicalName + ", it's options are not encoded properly: " + e);
}
}
- this.physicalName = physicalName;
+ this.pb.setName(physicalName);
this.destinationPaths = null;
this.hashValue = 0;
if (composite) {
@@ -268,7 +269,7 @@
}
List<String> l = new ArrayList<String>();
- StringTokenizer iter = new StringTokenizer(physicalName, PATH_SEPERATOR);
+ StringTokenizer iter = new StringTokenizer(pb.getName(), PATH_SEPERATOR);
while (iter.hasMoreTokens()) {
String name = iter.nextToken().trim();
if (name.length() == 0) {
@@ -305,12 +306,12 @@
}
ActiveMQDestination d = (ActiveMQDestination)o;
- return physicalName.equals(d.physicalName);
+ return d.pb.equals(pb);
}
public int hashCode() {
if (hashValue == 0) {
- hashValue = physicalName.hashCode();
+ hashValue = pb.hashCode();
}
return hashValue;
}
@@ -368,4 +369,8 @@
public boolean isPattern() {
return isPattern;
}
+
+ public PBActiveMQDestination getPB() {
+ return pb;
+ }
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java Wed Oct 15 12:12:26 2008
@@ -34,6 +34,7 @@
import javax.jms.MessageNotWriteableException;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.PBMessage.PBMessageType;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
@@ -101,6 +102,14 @@
protected transient Map<String, Object> map = new HashMap<String, Object>();
+ public ActiveMQMapMessage() {
+ super(new PBMessage().setMessageType(PBMessageType.MAP_MESSAGE));
+ }
+
+ public ActiveMQMapMessage(PBMessage message) {
+ super(message);
+ }
+
public Message copy() {
ActiveMQMapMessage copy = new ActiveMQMapMessage();
copy(copy);
@@ -125,7 +134,7 @@
OutputStream os = bytesOut;
ActiveMQConnection connection = getConnection();
if (connection != null && connection.isUseCompression()) {
- compressed = true;
+ setCompressed(true);
os = new DeflaterOutputStream(os);
}
DataOutputStream dataOut = new DataOutputStream(os);
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java Wed Oct 15 12:12:26 2008
@@ -32,6 +32,7 @@
import javax.jms.MessageNotWriteableException;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.PBMessage.PBMessageType;
import org.apache.activemq.filter.PropertyExpression;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.util.Callback;
@@ -49,6 +50,14 @@
protected transient Callback acknowledgeCallback;
+ public ActiveMQMessage() {
+ super(new PBMessage().setMessageType(PBMessageType.EMPTY_MESSAGE));
+ }
+
+ public ActiveMQMessage(PBMessage message) {
+ super(message);
+ }
+
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
@@ -131,7 +140,7 @@
// so lets set the IDs to be 1
MessageId id = new MessageId();
id.setTextView(value);
- this.setMessageId(messageId);
+ this.setMessageId(id);
}
} else {
this.setMessageId(null);
@@ -638,4 +647,5 @@
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processMessage(this);
}
+
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java Wed Oct 15 12:12:26 2008
@@ -31,6 +31,7 @@
import javax.jms.ObjectMessage;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.PBMessage.PBMessageType;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
@@ -70,6 +71,14 @@
protected transient Serializable object;
+ public ActiveMQObjectMessage() {
+ super(new PBMessage().setMessageType(PBMessageType.OBJECT_MESSAGE));
+ }
+
+ public ActiveMQObjectMessage(PBMessage message) {
+ super(message);
+ }
+
public Message copy() {
ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
copy(copy);
@@ -90,7 +99,7 @@
OutputStream os = bytesOut;
ActiveMQConnection connection = getConnection();
if (connection != null && connection.isUseCompression()) {
- compressed = true;
+ setCompressed(true);
os = new DeflaterOutputStream(os);
}
DataOutputStream dataOut = new DataOutputStream(os);
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQQueue.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQQueue.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQQueue.java Wed Oct 15 12:12:26 2008
@@ -19,6 +19,8 @@
import javax.jms.JMSException;
import javax.jms.Queue;
+import org.apache.activemq.command.PBActiveMQDestination.DestinationType;
+
/**
*
* @org.apache.xbean.XBean element="queue" description="An ActiveMQ Queue
@@ -33,10 +35,15 @@
private static final long serialVersionUID = -3885260014960795889L;
public ActiveMQQueue() {
+ super(new PBActiveMQDestination().setType(DestinationType.QUEUE));
}
public ActiveMQQueue(String name) {
- super(name);
+ super(new PBActiveMQDestination().setName(name).setType(DestinationType.QUEUE));
+ }
+
+ public ActiveMQQueue(PBActiveMQDestination pb) {
+ super(pb);
}
public byte getDataStructureType() {
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java Wed Oct 15 12:12:26 2008
@@ -35,6 +35,7 @@
import javax.jms.StreamMessage;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.PBMessage.PBMessageType;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
@@ -118,6 +119,14 @@
protected transient DataInputStream dataIn;
protected transient int remainingBytes = -1;
+ public ActiveMQStreamMessage() {
+ super(new PBMessage().setMessageType(PBMessageType.STREAM_MESSAGE));
+ }
+
+ public ActiveMQStreamMessage(PBMessage message) {
+ super(message);
+ }
+
public Message copy() {
ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
copy(copy);
@@ -1113,7 +1122,7 @@
OutputStream os = bytesOut;
ActiveMQConnection connection = getConnection();
if (connection != null && connection.isUseCompression()) {
- compressed = true;
+ setCompressed(true);
os = new DeflaterOutputStream(os);
}
this.dataOut = new DataOutputStream(os);
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java Wed Oct 15 12:12:26 2008
@@ -32,15 +32,13 @@
protected transient String connectionId;
protected transient int sequenceId;
- public ActiveMQTempDestination() {
- }
- public ActiveMQTempDestination(String name) {
- super(name);
+ public ActiveMQTempDestination(PBActiveMQDestination pb, ActiveMQDestination[] composites) {
+ super(pb, composites);
}
- public ActiveMQTempDestination(String connectionId, long sequenceId) {
- super(connectionId + ":" + sequenceId);
+ public ActiveMQTempDestination(PBActiveMQDestination pb) {
+ super(pb);
}
public boolean isTemporary() {
@@ -65,9 +63,9 @@
// Parse off the sequenceId off the end.
// this can fail if the temp destination is
// generated by another JMS system via the JMS<->JMS Bridge
- int p = this.physicalName.lastIndexOf(":");
+ int p = this.pb.getName().lastIndexOf(":");
if (p >= 0) {
- String seqStr = this.physicalName.substring(p + 1).trim();
+ String seqStr = this.pb.getName().substring(p + 1).trim();
if (seqStr != null && seqStr.length() > 0) {
try {
sequenceId = Integer.parseInt(seqStr);
@@ -75,7 +73,7 @@
LOG.debug("Did not parse sequence Id from " + physicalName);
}
// The rest should be the connection id.
- connectionId = this.physicalName.substring(0, p);
+ connectionId = this.pb.getName().substring(0, p);
}
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java Wed Oct 15 12:12:26 2008
@@ -19,6 +19,8 @@
import javax.jms.JMSException;
import javax.jms.TemporaryQueue;
+import org.apache.activemq.command.PBActiveMQDestination.DestinationType;
+
/**
* @openwire:marshaller code="102"
* @version $Revision: 1.6 $
@@ -29,14 +31,19 @@
private static final long serialVersionUID = 6683049467527633867L;
public ActiveMQTempQueue() {
+ super(new PBActiveMQDestination().setType(DestinationType.TEMP_QUEUE));
}
public ActiveMQTempQueue(String name) {
- super(name);
+ super(new PBActiveMQDestination().setType(DestinationType.TEMP_QUEUE).setName(name));
}
public ActiveMQTempQueue(ConnectionId connectionId, long sequenceId) {
- super(connectionId.getValue(), sequenceId);
+ super(new PBActiveMQDestination().setType(DestinationType.TEMP_QUEUE).setName(connectionId+":"+sequenceId));
+ }
+
+ public ActiveMQTempQueue(PBActiveMQDestination destination) {
+ super(destination);
}
public byte getDataStructureType() {
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java Wed Oct 15 12:12:26 2008
@@ -19,6 +19,8 @@
import javax.jms.JMSException;
import javax.jms.TemporaryTopic;
+import org.apache.activemq.command.PBActiveMQDestination.DestinationType;
+
/**
* @openwire:marshaller code="103"
* @version $Revision: 1.6 $
@@ -29,14 +31,19 @@
private static final long serialVersionUID = -4325596784597300253L;
public ActiveMQTempTopic() {
+ super(new PBActiveMQDestination().setType(DestinationType.TEMP_TOPIC));
}
public ActiveMQTempTopic(String name) {
- super(name);
+ super(new PBActiveMQDestination().setType(DestinationType.TEMP_TOPIC).setName(name));
}
public ActiveMQTempTopic(ConnectionId connectionId, long sequenceId) {
- super(connectionId.getValue(), sequenceId);
+ super(new PBActiveMQDestination().setType(DestinationType.TEMP_TOPIC).setName(connectionId+":"+sequenceId));
+ }
+
+ public ActiveMQTempTopic(PBActiveMQDestination destination) {
+ super(destination);
}
public byte getDataStructureType() {
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java Wed Oct 15 12:12:26 2008
@@ -29,6 +29,7 @@
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.PBMessage.PBMessageType;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
@@ -46,6 +47,15 @@
protected String text;
+
+ public ActiveMQTextMessage() {
+ super(new PBMessage().setMessageType(PBMessageType.TEXT_MESSAGE));
+ }
+
+ public ActiveMQTextMessage(PBMessage message) {
+ super(message);
+ }
+
public Message copy() {
ActiveMQTextMessage copy = new ActiveMQTextMessage();
copy(copy);
@@ -110,7 +120,7 @@
OutputStream os = bytesOut;
ActiveMQConnection connection = getConnection();
if (connection != null && connection.isUseCompression()) {
- compressed = true;
+ setCompressed(true);
os = new DeflaterOutputStream(os);
}
DataOutputStream dataOut = new DataOutputStream(os);
@@ -137,7 +147,7 @@
}
public int getSize() {
- if (size == 0 && content == null && text != null) {
+ if (size == 0 && !pb.hasContent() && text != null) {
size = getMinimumMessageSize();
if (marshalledProperties != null) {
size += marshalledProperties.getLength();
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTopic.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTopic.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTopic.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTopic.java Wed Oct 15 12:12:26 2008
@@ -19,6 +19,8 @@
import javax.jms.JMSException;
import javax.jms.Topic;
+import org.apache.activemq.command.PBActiveMQDestination.DestinationType;
+
/**
* @org.apache.xbean.XBean element="topic" description="An ActiveMQ Topic
* Destination"
@@ -31,10 +33,15 @@
private static final long serialVersionUID = 7300307405896488588L;
public ActiveMQTopic() {
+ super(new PBActiveMQDestination().setType(DestinationType.TOPIC));
}
public ActiveMQTopic(String name) {
- super(name);
+ super(new PBActiveMQDestination().setType(DestinationType.TOPIC).setName(name));
+ }
+
+ public ActiveMQTopic(PBActiveMQDestination destination) {
+ super(destination);
}
public byte getDataStructureType() {
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java Wed Oct 15 12:12:26 2008
@@ -60,7 +60,7 @@
}
public String toString() {
- return IntrospectionSupport.toString(this, BaseCommand.class);
+ return getClass().getSimpleName()+"[\n"+getPBCommand().toString()+"]";
}
public boolean isWireFormatInfo() {
@@ -121,5 +121,8 @@
this.to = to;
}
+ public Command getCommandObject() {
+ return this;
+ }
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.command;
+import java.util.ArrayList;
+
import org.apache.activemq.state.CommandVisitor;
/**
@@ -28,19 +30,16 @@
* @version $Revision: 1.7 $
*/
public class BrokerInfo extends BaseCommand {
+
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.BROKER_INFO;
- BrokerId brokerId;
- String brokerURL;
- boolean slaveBroker;
- boolean masterBroker;
- boolean faultTolerantConfiguration;
- boolean networkConnection;
- boolean duplexConnection;
- BrokerInfo peerBrokerInfos[];
- String brokerName;
- long connectionId;
- String brokerUploadUrl;
- String networkProperties;
+ PBBrokerInfo pb = new PBBrokerInfo();
+
+ public BrokerInfo() {
+ }
+
+ public BrokerInfo(PBBrokerInfo pb) {
+ this.pb = pb;
+ }
public boolean isBrokerInfo() {
return true;
@@ -54,44 +53,70 @@
* @openwire:property version=1 cache=true
*/
public BrokerId getBrokerId() {
- return brokerId;
+ if( pb.hasBrokerId() ) {
+ return new BrokerId(pb.getBrokerId());
+ }
+ return null;
}
public void setBrokerId(BrokerId brokerId) {
- this.brokerId = brokerId;
+ if( brokerId==null ) {
+ pb.clearBrokerId();
+ } else {
+ this.pb.setBrokerId(brokerId.getValue());
+ }
}
/**
* @openwire:property version=1
*/
public String getBrokerURL() {
- return brokerURL;
+ return pb.getBrokerUrl();
}
public void setBrokerURL(String brokerURL) {
- this.brokerURL = brokerURL;
+ this.pb.setBrokerUrl(brokerURL);
}
/**
* @openwire:property version=1 testSize=0
*/
public BrokerInfo[] getPeerBrokerInfos() {
- return peerBrokerInfos;
+ if( !pb.hasPeerBrokerInfos() ) {
+ return null;
+ }
+ BrokerInfo rc[]= new BrokerInfo[pb.getPeerBrokerInfosCount()];
+ for (int i = 0; i < rc.length; i++) {
+ rc[i] = new BrokerInfo(pb.getPeerBrokerInfos(i));
+ }
+ return rc;
}
public void setPeerBrokerInfos(BrokerInfo[] peerBrokerInfos) {
- this.peerBrokerInfos = peerBrokerInfos;
+ if( peerBrokerInfos == null ) {
+ pb.clearPeerBrokerInfos();
+ } else {
+ ArrayList<PBBrokerInfo> t = new ArrayList<PBBrokerInfo>(peerBrokerInfos.length);
+ for (int i = 0; i < peerBrokerInfos.length; i++) {
+ t.add(peerBrokerInfos[i].getPB());
+ }
+ pb.setPeerBrokerInfosList(t);
+ }
+ }
+
+ public PBBrokerInfo getPB() {
+ return pb;
}
/**
* @openwire:property version=1
*/
public String getBrokerName() {
- return brokerName;
+ return pb.getBrokerName();
}
public void setBrokerName(String brokerName) {
- this.brokerName = brokerName;
+ this.pb.setBrokerName(brokerName);
}
public Response visit(CommandVisitor visitor) throws Exception {
@@ -102,25 +127,25 @@
* @openwire:property version=1
*/
public boolean isSlaveBroker() {
- return slaveBroker;
+ return pb.getSlaveBroker();
}
public void setSlaveBroker(boolean slaveBroker) {
- this.slaveBroker = slaveBroker;
+ this.pb.setSlaveBroker(slaveBroker);
}
/**
* @openwire:property version=1
*/
public boolean isMasterBroker() {
- return masterBroker;
+ return pb.getMasterBroker();
}
/**
* @param masterBroker The masterBroker to set.
*/
public void setMasterBroker(boolean masterBroker) {
- this.masterBroker = masterBroker;
+ this.pb.setMasterBroker(masterBroker);
}
/**
@@ -128,14 +153,14 @@
* @return Returns the faultTolerantConfiguration.
*/
public boolean isFaultTolerantConfiguration() {
- return faultTolerantConfiguration;
+ return pb.getFaultTolerantConfiguration();
}
/**
* @param faultTolerantConfiguration The faultTolerantConfiguration to set.
*/
public void setFaultTolerantConfiguration(boolean faultTolerantConfiguration) {
- this.faultTolerantConfiguration = faultTolerantConfiguration;
+ this.pb.setFaultTolerantConfiguration(faultTolerantConfiguration);
}
/**
@@ -143,14 +168,14 @@
* @return the duplexConnection
*/
public boolean isDuplexConnection() {
- return this.duplexConnection;
+ return this.pb.getDuplexConnection();
}
/**
* @param duplexConnection the duplexConnection to set
*/
public void setDuplexConnection(boolean duplexConnection) {
- this.duplexConnection = duplexConnection;
+ this.pb.setDuplexConnection(duplexConnection);
}
/**
@@ -158,14 +183,14 @@
* @return the networkConnection
*/
public boolean isNetworkConnection() {
- return this.networkConnection;
+ return this.pb.getNetworkConnection();
}
/**
* @param networkConnection the networkConnection to set
*/
public void setNetworkConnection(boolean networkConnection) {
- this.networkConnection = networkConnection;
+ this.pb.setNetworkConnection(networkConnection);
}
/**
@@ -174,11 +199,11 @@
* @openwire:property version=2
*/
public long getConnectionId() {
- return connectionId;
+ return pb.getConnectionId();
}
public void setConnectionId(long connectionId) {
- this.connectionId = connectionId;
+ this.pb.setConnectionId(connectionId);
}
/**
@@ -188,11 +213,11 @@
* @openwire:property version=3
*/
public String getBrokerUploadUrl() {
- return brokerUploadUrl;
+ return pb.getBrokerUploadUrl();
}
public void setBrokerUploadUrl(String brokerUploadUrl) {
- this.brokerUploadUrl = brokerUploadUrl;
+ this.pb.setBrokerUrl(brokerUploadUrl);
}
/**
@@ -200,13 +225,17 @@
* @return the networkProperties
*/
public String getNetworkProperties() {
- return this.networkProperties;
+ return this.pb.getNetworkProperties();
}
/**
* @param networkProperties the networkProperties to set
*/
public void setNetworkProperties(String networkProperties) {
- this.networkProperties = networkProperties;
+ this.pb.setNetworkProperties(networkProperties);
+ }
+
+ public PBBrokerInfo getPBCommand() {
+ return pb;
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Command.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Command.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Command.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/Command.java Wed Oct 15 12:12:26 2008
@@ -24,7 +24,7 @@
*
* @version $Revision: 1.7 $
*/
-public interface Command extends DataStructure {
+public interface Command extends DataStructure, ProtocolBufferBacked {
void setCommandId(int value);
@@ -70,4 +70,5 @@
Endpoint getTo();
void setTo(Endpoint to);
+
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java Wed Oct 15 12:12:26 2008
@@ -26,11 +26,15 @@
*/
public class ConnectionControl extends BaseCommand {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONNECTION_CONTROL;
- protected boolean suspend;
- protected boolean resume;
- protected boolean close;
- protected boolean exit;
- protected boolean faultTolerant;
+
+ PBConnectionControl pb = new PBConnectionControl();
+
+ public ConnectionControl() {
+ }
+
+ public ConnectionControl(PBConnectionControl pb) {
+ this.pb = pb;
+ }
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@@ -45,14 +49,14 @@
* @return Returns the close.
*/
public boolean isClose() {
- return close;
+ return pb.getClose();
}
/**
* @param close The close to set.
*/
public void setClose(boolean close) {
- this.close = close;
+ this.pb.setClose(close);
}
/**
@@ -60,14 +64,14 @@
* @return Returns the exit.
*/
public boolean isExit() {
- return exit;
+ return pb.getExit();
}
/**
* @param exit The exit to set.
*/
public void setExit(boolean exit) {
- this.exit = exit;
+ this.pb.setExit(exit);
}
/**
@@ -75,14 +79,14 @@
* @return Returns the faultTolerant.
*/
public boolean isFaultTolerant() {
- return faultTolerant;
+ return pb.getFaultTolerant();
}
/**
* @param faultTolerant The faultTolerant to set.
*/
public void setFaultTolerant(boolean faultTolerant) {
- this.faultTolerant = faultTolerant;
+ this.pb.setFaultTolerant(faultTolerant);
}
/**
@@ -90,14 +94,14 @@
* @return Returns the resume.
*/
public boolean isResume() {
- return resume;
+ return pb.getResume();
}
/**
* @param resume The resume to set.
*/
public void setResume(boolean resume) {
- this.resume = resume;
+ this.pb.setResume(resume);
}
/**
@@ -105,13 +109,17 @@
* @return Returns the suspend.
*/
public boolean isSuspend() {
- return suspend;
+ return pb.getSuspend();
}
/**
* @param suspend The suspend to set.
*/
public void setSuspend(boolean suspend) {
- this.suspend = suspend;
+ this.pb.setSuspend(suspend);
+ }
+
+ public PBConnectionControl getPBCommand() {
+ return pb;
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.command;
+import org.apache.activemq.pbwire.PBConversionSupport;
import org.apache.activemq.state.CommandVisitor;
/**
@@ -27,8 +28,14 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONNECTION_ERROR;
- private ConnectionId connectionId;
- private Throwable exception;
+ PBConnectionError pb = new PBConnectionError();
+
+ public ConnectionError() {
+ }
+
+ public ConnectionError(PBConnectionError pb) {
+ this.pb=pb;
+ }
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@@ -42,22 +49,40 @@
* @openwire:property version=1
*/
public Throwable getException() {
- return exception;
+ if( pb.hasException() ) {
+ return PBConversionSupport.convert(pb.getException(), true);
+ }
+ return null;
}
public void setException(Throwable exception) {
- this.exception = exception;
+ if( exception == null ) {
+ pb.clearException();
+ } else {
+ this.pb.setException(PBConversionSupport.convert(exception, true));
+ }
}
/**
* @openwire:property version=1
*/
public ConnectionId getConnectionId() {
- return connectionId;
+ if( pb.hasConnectionId() ) {
+ return new ConnectionId(pb.getConnectionId());
+ }
+ return null;
}
public void setConnectionId(ConnectionId connectionId) {
- this.connectionId = connectionId;
+ if( connectionId==null ) {
+ pb.clearConnectionId();
+ } else {
+ this.pb.setConnectionId(connectionId.getValue());
+ }
+ }
+
+ public PBConnectionError getPBCommand() {
+ return pb;
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.command;
+import java.util.ArrayList;
+
+import org.apache.activemq.pbwire.PBConversionSupport;
import org.apache.activemq.state.CommandVisitor;
/**
@@ -27,21 +30,19 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONNECTION_INFO;
- protected ConnectionId connectionId;
- protected String clientId;
- protected String userName;
- protected String password;
- protected BrokerId[] brokerPath;
- protected boolean brokerMasterConnector;
- protected boolean manageable;
- protected boolean clientMaster = true;
+ protected PBConnectionInfo pb = new PBConnectionInfo();
+
protected transient Object transportContext;
public ConnectionInfo() {
}
public ConnectionInfo(ConnectionId connectionId) {
- this.connectionId = connectionId;
+ setConnectionId(connectionId);
+ }
+
+ public ConnectionInfo(PBConnectionInfo pb) {
+ this.pb=pb;
}
public byte getDataStructureType() {
@@ -50,34 +51,36 @@
public void copy(ConnectionInfo copy) {
super.copy(copy);
- copy.clientId = clientId;
- copy.userName = userName;
- copy.password = password;
- copy.brokerPath = brokerPath;
- copy.brokerMasterConnector = brokerMasterConnector;
- copy.manageable = manageable;
+ pb = copy.pb.clone();
}
/**
* @openwire:property version=1 cache=true
*/
public ConnectionId getConnectionId() {
- return connectionId;
+ if( pb.hasConnectionId() ) {
+ return new ConnectionId(pb.getConnectionId());
+ }
+ return null;
}
public void setConnectionId(ConnectionId connectionId) {
- this.connectionId = connectionId;
+ if( connectionId==null ) {
+ pb.clearConnectionId();
+ } else {
+ this.pb.setConnectionId(connectionId.getValue());
+ }
}
/**
* @openwire:property version=1
*/
public String getClientId() {
- return clientId;
+ return pb.getClientId();
}
public void setClientId(String clientId) {
- this.clientId = clientId;
+ this.pb.setClientId(clientId);
}
public RemoveInfo createRemoveCommand() {
@@ -90,22 +93,22 @@
* @openwire:property version=1
*/
public String getPassword() {
- return password;
+ return pb.getPassword();
}
public void setPassword(String password) {
- this.password = password;
+ this.pb.setPassword(password);
}
/**
* @openwire:property version=1
*/
public String getUserName() {
- return userName;
+ return pb.getUserName();
}
public void setUserName(String userName) {
- this.userName = userName;
+ this.pb.setUserName(userName);
}
/**
@@ -114,13 +117,23 @@
* @openwire:property version=1 cache=true
*/
public BrokerId[] getBrokerPath() {
- return brokerPath;
+ BrokerId rc[]=null;
+ if( pb.hasBrokerPath() ) {
+ rc = PBConversionSupport.convertBrokerIdList(pb.getBrokerPathList());
+ }
+ return rc;
}
public void setBrokerPath(BrokerId[] brokerPath) {
- this.brokerPath = brokerPath;
+ if( brokerPath!=null ) {
+ ArrayList<String> rc = PBConversionSupport.convertBrokerIdList(brokerPath);
+ pb.setBrokerPathList(rc);
+ } else {
+ pb.clearBrokerPath();
+ }
}
+
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processAddConnection(this);
}
@@ -129,28 +142,28 @@
* @openwire:property version=1
*/
public boolean isBrokerMasterConnector() {
- return brokerMasterConnector;
+ return pb.getBrokerMasterConnector();
}
/**
* @param brokerMasterConnector The brokerMasterConnector to set.
*/
- public void setBrokerMasterConnector(boolean slaveBroker) {
- this.brokerMasterConnector = slaveBroker;
+ public void setBrokerMasterConnector(boolean brokerMasterConnector) {
+ this.pb.setBrokerMasterConnector(brokerMasterConnector);
}
/**
* @openwire:property version=1
*/
public boolean isManageable() {
- return manageable;
+ return pb.getManageable();
}
/**
* @param manageable The manageable to set.
*/
public void setManageable(boolean manageable) {
- this.manageable = manageable;
+ this.pb.setManageable(manageable);
}
/**
@@ -180,14 +193,18 @@
* @return the clientMaster
*/
public boolean isClientMaster() {
- return this.clientMaster;
+ return this.pb.getClientMaster();
}
/**
* @param clientMaster the clientMaster to set
*/
public void setClientMaster(boolean clientMaster) {
- this.clientMaster = clientMaster;
+ this.pb.setClientMaster(clientMaster);
+ }
+
+ public PBConnectionInfo getPBCommand() {
+ return pb;
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java Wed Oct 15 12:12:26 2008
@@ -28,12 +28,7 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_CONTROL;
- protected ConsumerId consumerId;
- protected boolean close;
- protected boolean stop;
- protected boolean start;
- protected boolean flush;
- protected int prefetch;
+ protected PBConsumerControl pb = new PBConsumerControl();
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@@ -48,14 +43,14 @@
* @return Returns the close.
*/
public boolean isClose() {
- return close;
+ return pb.getClose();
}
/**
* @param close The close to set.
*/
public void setClose(boolean close) {
- this.close = close;
+ this.pb.setClose(close);
}
/**
@@ -63,14 +58,21 @@
* @return Returns the consumerId.
*/
public ConsumerId getConsumerId() {
- return consumerId;
+ if( pb.hasConsumerId() ) {
+ return new ConsumerId(pb.getConsumerId());
+ }
+ return null;
}
/**
* @param consumerId The consumerId to set.
*/
public void setConsumerId(ConsumerId consumerId) {
- this.consumerId = consumerId;
+ if( consumerId==null ) {
+ pb.clearConsumerId();
+ } else {
+ this.pb.setConsumerId(consumerId.getPB());
+ }
}
/**
@@ -78,14 +80,14 @@
* @return Returns the prefetch.
*/
public int getPrefetch() {
- return prefetch;
+ return pb.getPrefetch();
}
/**
* @param prefetch The prefetch to set.
*/
public void setPrefetch(int prefetch) {
- this.prefetch = prefetch;
+ this.pb.setPrefetch(prefetch);
}
/**
@@ -93,14 +95,14 @@
* @return the flush
*/
public boolean isFlush() {
- return this.flush;
+ return this.pb.getFlush();
}
/**
* @param flush the flush to set
*/
public void setFlush(boolean flush) {
- this.flush = flush;
+ this.pb.setFlush(flush);
}
/**
@@ -108,14 +110,14 @@
* @return the start
*/
public boolean isStart() {
- return this.start;
+ return this.pb.getStart();
}
/**
* @param start the start to set
*/
public void setStart(boolean start) {
- this.start = start;
+ this.pb.setStart(start);
}
/**
@@ -123,13 +125,17 @@
* @return the stop
*/
public boolean isStop() {
- return this.stop;
+ return this.pb.getStop();
}
/**
* @param stop the stop to set
*/
public void setStop(boolean stop) {
- this.stop = stop;
+ this.pb.setStop(stop);
+ }
+
+ public PBConsumerControl getPBCommand() {
+ return pb;
}
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerId.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerId.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerId.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.command;
+import org.apache.activemq.pbwire.PBCommand;
+import org.apache.activemq.protobuf.BaseMessage;
+
/**
* @openwire:marshaller code="122"
* @version $Revision$
@@ -24,27 +27,30 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_ID;
- protected String connectionId;
- protected long sessionId;
- protected long value;
-
protected transient int hashCode;
protected transient String key;
protected transient SessionId parentId;
+ private PBConsumerId pb = new PBConsumerId();
+
public ConsumerId() {
}
public ConsumerId(SessionId sessionId, long consumerId) {
- this.connectionId = sessionId.getConnectionId();
- this.sessionId = sessionId.getValue();
- this.value = consumerId;
+ setSessionId(sessionId);
+ setValue(consumerId);
+ }
+
+ public void setSessionId(SessionId sessionId) {
+ pb.setSessionId(sessionId.getPB());
+ }
+
+ public ConsumerId(PBConsumerId id) {
+ this.pb = id;
}
public ConsumerId(ConsumerId id) {
- this.connectionId = id.getConnectionId();
- this.sessionId = id.getSessionId();
- this.value = id.getValue();
+ this.pb = id.getPB().clone();
}
public SessionId getParentId() {
@@ -56,7 +62,7 @@
public int hashCode() {
if (hashCode == 0) {
- hashCode = connectionId.hashCode() ^ (int)sessionId ^ (int)value;
+ hashCode = pb.hashCode();
}
return hashCode;
}
@@ -69,7 +75,7 @@
return false;
}
ConsumerId id = (ConsumerId)o;
- return sessionId == id.sessionId && value == id.value && connectionId.equals(id.connectionId);
+ return pb.equals(id.pb);
}
public byte getDataStructureType() {
@@ -78,7 +84,7 @@
public String toString() {
if (key == null) {
- key = connectionId + ":" + sessionId + ":" + value;
+ key = getConnectionId() + ":" + getSessionId() + ":" + getValue();
}
return key;
}
@@ -87,36 +93,40 @@
* @openwire:property version=1
*/
public String getConnectionId() {
- return connectionId;
+ return pb.getSessionId().getConnectionId();
}
public void setConnectionId(String connectionId) {
- this.connectionId = connectionId;
+ this.pb.getSessionId().setConnectionId(connectionId);
}
/**
* @openwire:property version=1
*/
public long getSessionId() {
- return sessionId;
+ return pb.getSessionId().getId();
}
public void setSessionId(long sessionId) {
- this.sessionId = sessionId;
+ this.pb.getSessionId().setId(sessionId);
}
/**
* @openwire:property version=1
*/
public long getValue() {
- return value;
+ return this.pb.getId();
}
public void setValue(long consumerId) {
- this.value = consumerId;
+ this.pb.setId(consumerId);
}
public boolean isMarshallAware() {
return false;
}
+
+ public PBConsumerId getPB() {
+ return pb;
+ }
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java Wed Oct 15 12:12:26 2008
@@ -20,6 +20,7 @@
import java.util.List;
import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.pbwire.PBConversionSupport;
import org.apache.activemq.state.CommandVisitor;
/**
@@ -35,30 +36,16 @@
public static final byte NETWORK_CONSUMER_PRIORITY = -5;
public static final byte LOW_PRIORITY = -10;
- protected ConsumerId consumerId;
- protected ActiveMQDestination destination;
- protected int prefetchSize;
- protected int maximumPendingMessageLimit;
- protected boolean browser;
- protected boolean dispatchAsync;
- protected String selector;
- protected String subscriptionName;
- protected boolean noLocal;
- protected boolean exclusive;
- protected boolean retroactive;
- protected byte priority;
- protected BrokerId[] brokerPath;
- protected boolean optimizedAcknowledge;
+ PBConsumerInfo pb = new PBConsumerInfo();
+
// used by the broker
protected transient int currentPrefetchSize;
- // if true, the consumer will not send range
- protected boolean noRangeAcks;
- // acks.
-
protected BooleanExpression additionalPredicate;
protected transient boolean networkSubscription; // this subscription
protected transient List<ConsumerId> networkConsumerIds; // the original consumerId
+ private ActiveMQDestination destination;
+
// originated from a
// network connection
@@ -66,11 +53,15 @@
}
public ConsumerInfo(ConsumerId consumerId) {
- this.consumerId = consumerId;
+ this.setConsumerId(consumerId);
}
public ConsumerInfo(SessionInfo sessionInfo, long consumerId) {
- this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId);
+ this(new ConsumerId(sessionInfo.getSessionId(), consumerId));
+ }
+
+ public ConsumerInfo(PBConsumerInfo pb) {
+ this.pb=pb;
}
public ConsumerInfo copy() {
@@ -81,19 +72,7 @@
public void copy(ConsumerInfo info) {
super.copy(info);
- info.consumerId = consumerId;
- info.destination = destination;
- info.prefetchSize = prefetchSize;
- info.maximumPendingMessageLimit = maximumPendingMessageLimit;
- info.browser = browser;
- info.dispatchAsync = dispatchAsync;
- info.selector = selector;
- info.subscriptionName = subscriptionName;
- info.noLocal = noLocal;
- info.exclusive = exclusive;
- info.retroactive = retroactive;
- info.priority = priority;
- info.brokerPath = brokerPath;
+ pb = info.pb.clone();
info.networkSubscription = networkSubscription;
if (networkConsumerIds != null) {
if (info.networkConsumerIds==null){
@@ -104,7 +83,7 @@
}
public boolean isDurable() {
- return subscriptionName != null;
+ return getSubscriptionName() != null;
}
public byte getDataStructureType() {
@@ -117,11 +96,18 @@
* @openwire:property version=1 cache=true
*/
public ConsumerId getConsumerId() {
- return consumerId;
+ if( pb.hasConsumerId() ) {
+ return new ConsumerId(pb.getConsumerId());
+ }
+ return null;
}
public void setConsumerId(ConsumerId consumerId) {
- this.consumerId = consumerId;
+ if( consumerId==null ) {
+ pb.clearConsumerId();
+ } else {
+ this.pb.setConsumerId(consumerId.getPB());
+ }
}
/**
@@ -130,11 +116,11 @@
* @openwire:property version=1
*/
public boolean isBrowser() {
- return browser;
+ return pb.getBrowser();
}
public void setBrowser(boolean browser) {
- this.browser = browser;
+ this.pb.setBrowser(browser);
}
/**
@@ -144,11 +130,19 @@
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getDestination() {
+ if( pb.hasDestination() ) {
+ if( destination==null ) {
+ destination = PBConversionSupport.convert(pb.getDestination());
+ }
+ } else {
+ destination = null;
+ }
return destination;
}
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;
+ this.pb.setDestination(destination.getPB());
}
/**
@@ -158,11 +152,11 @@
* @openwire:property version=1
*/
public int getPrefetchSize() {
- return prefetchSize;
+ return pb.getPrefetchSize();
}
public void setPrefetchSize(int prefetchSize) {
- this.prefetchSize = prefetchSize;
+ this.pb.setPrefetchSize(prefetchSize);
this.currentPrefetchSize = prefetchSize;
}
@@ -173,11 +167,11 @@
* @openwire:property version=1
*/
public int getMaximumPendingMessageLimit() {
- return maximumPendingMessageLimit;
+ return pb.getMaximumPendingMessageLimit();
}
public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
- this.maximumPendingMessageLimit = maximumPendingMessageLimit;
+ this.pb.setMaximumPendingMessageLimit(maximumPendingMessageLimit);
}
/**
@@ -190,11 +184,11 @@
* @openwire:property version=1
*/
public boolean isDispatchAsync() {
- return dispatchAsync;
+ return pb.getDispatchAsync();
}
public void setDispatchAsync(boolean dispatchAsync) {
- this.dispatchAsync = dispatchAsync;
+ this.pb.setDispatchAsync(dispatchAsync);
}
/**
@@ -204,11 +198,11 @@
* @openwire:property version=1
*/
public String getSelector() {
- return selector;
+ return pb.getSelector();
}
public void setSelector(String selector) {
- this.selector = selector;
+ this.pb.setSelector(selector);
}
/**
@@ -217,11 +211,11 @@
* @openwire:property version=1
*/
public String getSubscriptionName() {
- return subscriptionName;
+ return pb.getSubscriptionName();
}
- public void setSubscriptionName(String durableSubscriptionId) {
- this.subscriptionName = durableSubscriptionId;
+ public void setSubscriptionName(String subscriptionName) {
+ this.pb.setSubscriptionName(subscriptionName);
}
/**
@@ -230,7 +224,7 @@
* @see getSubscriptionName
*/
public String getSubcriptionName() {
- return subscriptionName;
+ return getSubscriptionName();
}
/**
@@ -238,8 +232,8 @@
* @see setSubscriptionName
* @param durableSubscriptionId
*/
- public void setSubcriptionName(String durableSubscriptionId) {
- this.subscriptionName = durableSubscriptionId;
+ public void setSubcriptionName(String subscriptionName) {
+ setSubscriptionName(subscriptionName);
}
/**
@@ -249,11 +243,11 @@
* @openwire:property version=1
*/
public boolean isNoLocal() {
- return noLocal;
+ return pb.getNoLocal();
}
public void setNoLocal(boolean noLocal) {
- this.noLocal = noLocal;
+ this.pb.setNoLocal(noLocal);
}
/**
@@ -265,11 +259,11 @@
* @openwire:property version=1
*/
public boolean isExclusive() {
- return exclusive;
+ return pb.getExclusive();
}
public void setExclusive(boolean exclusive) {
- this.exclusive = exclusive;
+ this.pb.setExclusive(exclusive);
}
/**
@@ -283,11 +277,11 @@
* @openwire:property version=1
*/
public boolean isRetroactive() {
- return retroactive;
+ return pb.getRetroactive();
}
public void setRetroactive(boolean retroactive) {
- this.retroactive = retroactive;
+ this.pb.setRetroactive(retroactive);
}
public RemoveInfo createRemoveCommand() {
@@ -305,11 +299,11 @@
* @openwire:property version=1
*/
public byte getPriority() {
- return priority;
+ return (byte) pb.getPriority();
}
public void setPriority(byte priority) {
- this.priority = priority;
+ this.pb.setPriority(priority);
}
/**
@@ -318,11 +312,20 @@
* @openwire:property version=1 cache=true
*/
public BrokerId[] getBrokerPath() {
- return brokerPath;
+ BrokerId rc[]=null;
+ if( pb.hasBrokerPath() ) {
+ rc = PBConversionSupport.convertBrokerIdList(pb.getBrokerPathList());
+ }
+ return rc;
}
public void setBrokerPath(BrokerId[] brokerPath) {
- this.brokerPath = brokerPath;
+ if( brokerPath!=null ) {
+ ArrayList<String> rc = PBConversionSupport.convertBrokerIdList(brokerPath);
+ pb.setBrokerPathList(rc);
+ } else {
+ pb.clearBrokerPath();
+ }
}
/**
@@ -365,14 +368,14 @@
* @return Returns the optimizedAcknowledge.
*/
public boolean isOptimizedAcknowledge() {
- return optimizedAcknowledge;
+ return pb.getOptimizedAcknowledge();
}
/**
* @param optimizedAcknowledge The optimizedAcknowledge to set.
*/
public void setOptimizedAcknowledge(boolean optimizedAcknowledge) {
- this.optimizedAcknowledge = optimizedAcknowledge;
+ this.pb.setOptimizedAcknowledge(optimizedAcknowledge);
}
/**
@@ -397,11 +400,11 @@
* @openwire:property version=1
*/
public boolean isNoRangeAcks() {
- return noRangeAcks;
+ return pb.getNoRangeAcks();
}
public void setNoRangeAcks(boolean noRangeAcks) {
- this.noRangeAcks = noRangeAcks;
+ this.pb.setNoRangeAcks(noRangeAcks);
}
public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) {
@@ -432,4 +435,8 @@
return result;
}
+ public PBConsumerInfo getPBCommand() {
+ return pb;
+ }
+
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java Wed Oct 15 12:12:26 2008
@@ -29,7 +29,7 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONTROL_COMMAND;
- private String command;
+ private PBControlCommand pb;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@@ -39,14 +39,18 @@
* @openwire:property version=1
*/
public String getCommand() {
- return command;
+ return pb.getCommand();
}
public void setCommand(String command) {
- this.command = command;
+ this.pb.setCommand(command);
}
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processControlCommand(this);
}
+
+ public PBControlCommand getPBCommand() {
+ return pb;
+ }
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DataStructure.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DataStructure.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DataStructure.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DataStructure.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.command;
+
/**
* @version $Revision$
*/
@@ -26,5 +27,5 @@
*/
byte getDataStructureType();
boolean isMarshallAware();
-
+
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java Wed Oct 15 12:12:26 2008
@@ -17,7 +17,10 @@
package org.apache.activemq.command;
import java.io.IOException;
+import java.util.ArrayList;
+import org.apache.activemq.command.PBDestinationInfo.PBDestinationOperationType;
+import org.apache.activemq.pbwire.PBConversionSupport;
import org.apache.activemq.state.CommandVisitor;
/**
@@ -33,19 +36,21 @@
public static final byte ADD_OPERATION_TYPE = 0;
public static final byte REMOVE_OPERATION_TYPE = 1;
- protected ConnectionId connectionId;
- protected ActiveMQDestination destination;
- protected byte operationType;
- protected long timeout;
- protected BrokerId[] brokerPath;
+ protected PBDestinationInfo pb = new PBDestinationInfo();
+
+ private ActiveMQDestination destination;
public DestinationInfo() {
}
public DestinationInfo(ConnectionId connectionId, byte operationType, ActiveMQDestination destination) {
- this.connectionId = connectionId;
- this.operationType = operationType;
- this.destination = destination;
+ setConnectionId(connectionId);
+ setOperationType(operationType);
+ setDestination(destination);
+ }
+
+ public DestinationInfo(PBDestinationInfo pb) {
+ this.pb=pb;
}
public byte getDataStructureType() {
@@ -53,32 +58,47 @@
}
public boolean isAddOperation() {
- return operationType == ADD_OPERATION_TYPE;
+ return pb.getOperationType() == PBDestinationInfo.PBDestinationOperationType.ADD_OPERATION_TYPE;
}
public boolean isRemoveOperation() {
- return operationType == REMOVE_OPERATION_TYPE;
+ return pb.getOperationType() == PBDestinationInfo.PBDestinationOperationType.REMOVE_OPERATION_TYPE;
}
/**
* @openwire:property version=1 cache=true
*/
public ConnectionId getConnectionId() {
- return connectionId;
+ if( pb.hasConnectionId() ) {
+ return new ConnectionId(pb.getConnectionId());
+ }
+ return null;
}
public void setConnectionId(ConnectionId connectionId) {
- this.connectionId = connectionId;
+ if( connectionId == null ) {
+ pb.clearConnectionId();
+ } else {
+ this.pb.setConnectionId(connectionId.getValue());
+ }
}
/**
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getDestination() {
+ if( pb.hasDestination() ) {
+ if( destination==null ) {
+ destination = PBConversionSupport.convert(pb.getDestination());
+ }
+ } else {
+ destination = null;
+ }
return destination;
}
public void setDestination(ActiveMQDestination destination) {
+ this.pb.setDestination(destination.getPB());
this.destination = destination;
}
@@ -86,22 +106,22 @@
* @openwire:property version=1
*/
public byte getOperationType() {
- return operationType;
+ return (byte) pb.getOperationType().getNumber();
}
public void setOperationType(byte operationType) {
- this.operationType = operationType;
+ this.pb.setOperationType(PBDestinationOperationType.valueOf(operationType));
}
/**
* @openwire:property version=1
*/
public long getTimeout() {
- return timeout;
+ return pb.getTimeout();
}
public void setTimeout(long timeout) {
- this.timeout = timeout;
+ this.pb.setTimeout(timeout);
}
/**
@@ -110,11 +130,20 @@
* @openwire:property version=1 cache=true
*/
public BrokerId[] getBrokerPath() {
- return brokerPath;
+ BrokerId rc[]=null;
+ if( pb.hasBrokerPath() ) {
+ rc = PBConversionSupport.convertBrokerIdList(pb.getBrokerPathList());
+ }
+ return rc;
}
public void setBrokerPath(BrokerId[] brokerPath) {
- this.brokerPath = brokerPath;
+ if( brokerPath!=null ) {
+ ArrayList<String> rc = PBConversionSupport.convertBrokerIdList(brokerPath);
+ pb.setBrokerPathList(rc);
+ } else {
+ pb.clearBrokerPath();
+ }
}
public Response visit(CommandVisitor visitor) throws Exception {
@@ -126,4 +155,8 @@
throw new IOException("Unknown operation type: " + getOperationType());
}
+ public PBDestinationInfo getPBCommand() {
+ return pb;
+ }
+
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/FlushCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/FlushCommand.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/FlushCommand.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/FlushCommand.java Wed Oct 15 12:12:26 2008
@@ -29,6 +29,8 @@
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.FLUSH_COMMAND;
public static final Command COMMAND = new FlushCommand();
+ protected PBFlushCommand pb = new PBFlushCommand();
+
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
@@ -37,4 +39,8 @@
return visitor.processFlush(this);
}
+ public PBFlushCommand getPBCommand() {
+ return pb;
+ }
+
}
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalQueueAck.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalQueueAck.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalQueueAck.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/JournalQueueAck.java Wed Oct 15 12:12:26 2008
@@ -16,18 +16,26 @@
*/
package org.apache.activemq.command;
+import org.apache.activemq.pbwire.PBConversionSupport;
import org.apache.activemq.util.IntrospectionSupport;
/**
* @openwire:marshaller code="52"
* @version $Revision$
*/
-public class JournalQueueAck implements DataStructure {
+public class JournalQueueAck implements DataStructure, ProtocolBufferBacked {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.JOURNAL_REMOVE;
- ActiveMQDestination destination;
- MessageAck messageAck;
+ final PBJournalQueueAck pb;
+
+ public JournalQueueAck(){
+ this(new PBJournalQueueAck());
+ }
+
+ public JournalQueueAck(PBJournalQueueAck pb) {
+ this.pb=pb;
+ }
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@@ -37,22 +45,36 @@
* @openwire:property version=1
*/
public ActiveMQDestination getDestination() {
- return destination;
+ if( pb.hasDestination() ) {
+ return PBConversionSupport.convert(pb.getDestination());
+ }
+ return null;
}
public void setDestination(ActiveMQDestination destination) {
- this.destination = destination;
+ if( destination==null ) {
+ pb.clearDestination();
+ } else {
+ pb.setDestination(destination.getPB());
+ }
}
/**
* @openwire:property version=1
*/
public MessageAck getMessageAck() {
- return messageAck;
+ if( pb.hasMessageAck() ) {
+ return new MessageAck(pb.getMessageAck());
+ }
+ return null;
}
public void setMessageAck(MessageAck messageAck) {
- this.messageAck = messageAck;
+ if( messageAck==null ) {
+ pb.clearMessageAck();
+ } else {
+ pb.setMessageAck(messageAck.getPBCommand());
+ }
}
public boolean isMarshallAware() {
@@ -60,7 +82,15 @@
}
public String toString() {
- return IntrospectionSupport.toString(this, JournalQueueAck.class);
+ return getClass().getSimpleName()+"[\n"+getPBCommand().toString()+"]";
+ }
+
+ public Command getCommandObject() {
+ return null;
+ }
+
+ public PBJournalQueueAck getPBCommand() {
+ return pb;
}
}