You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/04/27 19:33:09 UTC
svn commit: r1097189 [39/42] - in /activemq/activemq-apollo/trunk: ./
apollo-openwire/ apollo-openwire/src/ apollo-openwire/src/main/
apollo-openwire/src/main/resources/
apollo-openwire/src/main/resources/META-INF/
apollo-openwire/src/main/resources/ME...
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,847 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.apollo.openwire.support.MarshallingSupport;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.apache.activemq.apollo.openwire.support.advisory.AdvisorySupport;
+import org.apache.activemq.apollo.filter.FilterException;
+import org.apache.activemq.apollo.filter.Filterable;
+import org.apache.activemq.apollo.openwire.support.broker.region.MessageReference;
+import org.apache.activemq.apollo.openwire.codec.OpenWireFormat;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+/**
+ * Represents an ActiveMQ message
+ *
+ * @openwire:marshaller
+ */
+public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
+
+ /**
+ * The default minimum amount of memory a message is assumed to use
+ */
+ public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
+
+ protected MessageId messageId;
+ protected ActiveMQDestination originalDestination;
+ protected TransactionId originalTransactionId;
+
+ protected ProducerId producerId;
+ protected ActiveMQDestination destination;
+ protected TransactionId transactionId;
+
+ protected long expiration;
+ protected long timestamp;
+ protected long arrival;
+ protected long brokerInTime;
+ protected long brokerOutTime;
+ protected String correlationId;
+ protected ActiveMQDestination replyTo;
+ protected boolean persistent;
+ protected String type;
+ protected byte priority;
+ protected String groupID;
+ protected int groupSequence;
+ protected ConsumerId targetConsumerId;
+ protected boolean compressed;
+ protected String userID;
+
+ protected Buffer content;
+ protected Buffer marshalledProperties;
+ protected DataStructure dataStructure;
+ protected int redeliveryCounter;
+
+ protected int size;
+ protected Map<String, Object> properties;
+ protected boolean readOnlyProperties;
+ protected boolean readOnlyBody;
+ protected transient boolean recievedByDFBridge;
+ protected boolean droppable;
+
+ private transient short referenceCount;
+
+ private BrokerId[] brokerPath;
+ private BrokerId[] cluster;
+
+ public abstract org.apache.activemq.apollo.openwire.command.Message copy();
+ public abstract void clearBody() throws OpenwireException;
+
+ protected void copy(org.apache.activemq.apollo.openwire.command.Message copy) {
+ super.copy(copy);
+ copy.producerId = producerId;
+ copy.transactionId = transactionId;
+ copy.destination = destination;
+ copy.messageId = messageId != null ? messageId.copy() : null;
+ copy.originalDestination = originalDestination;
+ copy.originalTransactionId = originalTransactionId;
+ copy.expiration = expiration;
+ copy.timestamp = timestamp;
+ copy.correlationId = correlationId;
+ copy.replyTo = replyTo;
+ copy.persistent = persistent;
+ copy.redeliveryCounter = redeliveryCounter;
+ copy.type = type;
+ copy.priority = priority;
+ copy.size = size;
+ copy.groupID = groupID;
+ copy.userID = userID;
+ copy.groupSequence = groupSequence;
+
+ if (properties != null) {
+ copy.properties = new HashMap<String, Object>(properties);
+ } else {
+ copy.properties = properties;
+ }
+
+ copy.content = content;
+ copy.marshalledProperties = marshalledProperties;
+ copy.dataStructure = dataStructure;
+ copy.readOnlyProperties = readOnlyProperties;
+ copy.readOnlyBody = readOnlyBody;
+ copy.compressed = compressed;
+ copy.recievedByDFBridge = recievedByDFBridge;
+
+ copy.arrival = arrival;
+ copy.brokerInTime = brokerInTime;
+ copy.brokerOutTime = brokerOutTime;
+ copy.brokerPath = brokerPath;
+
+ // lets not copy the following fields
+ // copy.targetConsumerId = targetConsumerId;
+ // copy.referenceCount = referenceCount;
+ }
+
+ public Object getProperty(String name) throws IOException {
+ if (properties == null) {
+ if (marshalledProperties == null) {
+ return null;
+ }
+ properties = unmarsallProperties(marshalledProperties);
+ }
+ return properties.get(name);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<String, Object> getProperties() throws IOException {
+ if (properties == null) {
+ if (marshalledProperties == null) {
+ return Collections.EMPTY_MAP;
+ }
+ properties = unmarsallProperties(marshalledProperties);
+ }
+ return Collections.unmodifiableMap(properties);
+ }
+
+ public void clearProperties() {
+ marshalledProperties = null;
+ properties = null;
+ }
+
+ public void setProperty(String name, Object value) throws IOException {
+ lazyCreateProperties();
+ properties.put(name, value);
+ }
+
+ protected void lazyCreateProperties() throws IOException {
+ if (properties == null) {
+ if (marshalledProperties == null) {
+ properties = new HashMap<String, Object>();
+ } else {
+ properties = unmarsallProperties(marshalledProperties);
+ marshalledProperties = null;
+ }
+ }
+ }
+
+ private Map<String, Object> unmarsallProperties(Buffer marshalledProperties) throws IOException {
+ return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
+ }
+
+ public void beforeMarshall(OpenWireFormat wireFormat) throws IOException {
+ // Need to marshal the properties.
+ if (marshalledProperties == null && properties != null) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream os = new DataOutputStream(baos);
+ MarshallingSupport.marshalPrimitiveMap(properties, os);
+ os.close();
+ marshalledProperties = baos.toBuffer();
+ }
+ }
+
+ public void afterMarshall(OpenWireFormat wireFormat) throws IOException {
+ }
+
+ public void beforeUnmarshall(OpenWireFormat wireFormat) throws IOException {
+ }
+
+ public void afterUnmarshall(OpenWireFormat wireFormat) throws IOException {
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ //
+ // Simple Field accessors
+ //
+ // /////////////////////////////////////////////////////////////////
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ProducerId getProducerId() {
+ return producerId;
+ }
+
+ public void setProducerId(ProducerId producerId) {
+ this.producerId = producerId;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(ActiveMQDestination destination) {
+ this.destination = destination;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public TransactionId getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(TransactionId transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public boolean isInTransaction() {
+ return transactionId != null;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ActiveMQDestination getOriginalDestination() {
+ return originalDestination;
+ }
+
+ public void setOriginalDestination(ActiveMQDestination destination) {
+ this.originalDestination = destination;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public MessageId getMessageId() {
+ return messageId;
+ }
+
+ public void setMessageId(MessageId messageId) {
+ this.messageId = messageId;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public TransactionId getOriginalTransactionId() {
+ return originalTransactionId;
+ }
+
+ public void setOriginalTransactionId(TransactionId transactionId) {
+ this.originalTransactionId = transactionId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getGroupID() {
+ return groupID;
+ }
+
+ public void setGroupID(String groupID) {
+ this.groupID = groupID;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public int getGroupSequence() {
+ return groupSequence;
+ }
+
+ public void setGroupSequence(int groupSequence) {
+ this.groupSequence = groupSequence;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getCorrelationId() {
+ return correlationId;
+ }
+
+ public void setCorrelationId(String correlationId) {
+ this.correlationId = correlationId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public boolean isPersistent() {
+ return persistent;
+ }
+
+ public void setPersistent(boolean deliveryMode) {
+ this.persistent = deliveryMode;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public long getExpiration() {
+ return expiration;
+ }
+
+ public void setExpiration(long expiration) {
+ this.expiration = expiration;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public byte getPriority() {
+ return priority;
+ }
+
+ public void setPriority(byte priority) {
+ this.priority = priority;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public ActiveMQDestination getReplyTo() {
+ return replyTo;
+ }
+
+ public void setReplyTo(ActiveMQDestination replyTo) {
+ this.replyTo = replyTo;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public Buffer getContent() {
+ return content;
+ }
+
+ public void setContent(Buffer content) {
+ this.content = content;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public Buffer getMarshalledProperties() {
+ return marshalledProperties;
+ }
+
+ public void setMarshalledProperties(Buffer marshalledProperties) {
+ this.marshalledProperties = marshalledProperties;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public DataStructure getDataStructure() {
+ return dataStructure;
+ }
+
+ public void setDataStructure(DataStructure data) {
+ this.dataStructure = data;
+ }
+
+ /**
+ * Can be used to route the message to a specific consumer. Should be null
+ * to allow the broker use normal JMS routing semantics. If the target
+ * consumer id is an active consumer on the broker, the message is dropped.
+ * Used by the AdvisoryBroker to replay advisory messages to a specific
+ * consumer.
+ *
+ * @openwire:property version=1 cache=true
+ */
+ public ConsumerId getTargetConsumerId() {
+ return targetConsumerId;
+ }
+
+ public void setTargetConsumerId(ConsumerId targetConsumerId) {
+ this.targetConsumerId = targetConsumerId;
+ }
+
+ public boolean isExpired() {
+ long expireTime = getExpiration();
+ if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean isAdvisory() {
+ return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public boolean isCompressed() {
+ return compressed;
+ }
+
+ public void setCompressed(boolean compressed) {
+ this.compressed = compressed;
+ }
+
+ public boolean isRedelivered() {
+ return redeliveryCounter > 0;
+ }
+
+ public void setRedelivered(boolean redelivered) {
+ if (redelivered) {
+ if (!isRedelivered()) {
+ setRedeliveryCounter(1);
+ }
+ } else {
+ if (isRedelivered()) {
+ setRedeliveryCounter(0);
+ }
+ }
+ }
+
+ public void incrementRedeliveryCounter() {
+ redeliveryCounter++;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public int getRedeliveryCounter() {
+ return redeliveryCounter;
+ }
+
+ public void setRedeliveryCounter(int deliveryCounter) {
+ this.redeliveryCounter = deliveryCounter;
+ }
+
+ /**
+ * The route of brokers the command has moved through.
+ *
+ * @openwire:property version=1 cache=true
+ */
+ public BrokerId[] getBrokerPath() {
+ return brokerPath;
+ }
+
+ public void setBrokerPath(BrokerId[] brokerPath) {
+ this.brokerPath = brokerPath;
+ }
+
+ public boolean isReadOnlyProperties() {
+ return readOnlyProperties;
+ }
+
+ public void setReadOnlyProperties(boolean readOnlyProperties) {
+ this.readOnlyProperties = readOnlyProperties;
+ }
+
+ public boolean isReadOnlyBody() {
+ return readOnlyBody;
+ }
+
+ public void setReadOnlyBody(boolean readOnlyBody) {
+ this.readOnlyBody = readOnlyBody;
+ }
+
+ /**
+ * Used to schedule the arrival time of a message to a broker. The broker
+ * will not dispatch a message to a consumer until it's arrival time has
+ * elapsed.
+ *
+ * @openwire:property version=1
+ */
+ public long getArrival() {
+ return arrival;
+ }
+
+ public void setArrival(long arrival) {
+ this.arrival = arrival;
+ }
+
+ /**
+ * Only set by the broker and defines the userID of the producer connection
+ * who sent this message. This is an optional field, it needs to be enabled
+ * on the broker to have this field populated.
+ *
+ * @openwire:property version=1
+ */
+ public String getUserID() {
+ return userID;
+ }
+
+ public void setUserID(String jmsxUserID) {
+ this.userID = jmsxUserID;
+ }
+
+ public int getReferenceCount() {
+ return referenceCount;
+ }
+
+ public org.apache.activemq.apollo.openwire.command.Message getMessageHardRef() {
+ return this;
+ }
+
+ public org.apache.activemq.apollo.openwire.command.Message getMessage() throws IOException {
+ return this;
+ }
+
+ public boolean isMarshallAware() {
+ return true;
+ }
+
+ public int incrementReferenceCount() {
+ int rc;
+ int size;
+ synchronized (this) {
+ rc = ++referenceCount;
+ size = getSize();
+ }
+
+// if (rc == 1 && getMemoryUsage() != null) {
+// getMemoryUsage().increaseUsage(size);
+// }
+
+ //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
+ return rc;
+ }
+
+ public int decrementReferenceCount() {
+ int rc;
+ int size;
+ synchronized (this) {
+ rc = --referenceCount;
+ size = getSize();
+ }
+//
+// if (rc == 0 && getMemoryUsage() != null) {
+// getMemoryUsage().decreaseUsage(size);
+// }
+ //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
+
+ return rc;
+ }
+
+ public int getSize() {
+ int minimumMessageSize = getMinimumMessageSize();
+ if (size < minimumMessageSize || size == 0) {
+ size = minimumMessageSize;
+ if (marshalledProperties != null) {
+ size += marshalledProperties.getLength();
+ }
+ if (content != null) {
+ size += content.getLength();
+ }
+ }
+ return size;
+ }
+
+ protected int getMinimumMessageSize() {
+ return DEFAULT_MINIMUM_MESSAGE_SIZE;
+ }
+
+ /**
+ * @openwire:property version=1
+ * @return Returns the recievedByDFBridge.
+ */
+ public boolean isRecievedByDFBridge() {
+ return recievedByDFBridge;
+ }
+
+ /**
+ * @param recievedByDFBridge The recievedByDFBridge to set.
+ */
+ public void setRecievedByDFBridge(boolean recievedByDFBridge) {
+ this.recievedByDFBridge = recievedByDFBridge;
+ }
+
+ public void onMessageRolledBack() {
+ incrementRedeliveryCounter();
+ }
+
+ /**
+ * @openwire:property version=2 cache=true
+ */
+ public boolean isDroppable() {
+ return droppable;
+ }
+
+ public void setDroppable(boolean droppable) {
+ this.droppable = droppable;
+ }
+
+ /**
+ * If a message is stored in multiple nodes on a cluster, all the cluster
+ * members will be listed here. Otherwise, it will be null.
+ *
+ * @openwire:property version=3 cache=true
+ */
+ public BrokerId[] getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(BrokerId[] cluster) {
+ this.cluster = cluster;
+ }
+
+ public boolean isMessage() {
+ return true;
+ }
+
+ /**
+ * @openwire:property version=3
+ */
+ public long getBrokerInTime() {
+ return this.brokerInTime;
+ }
+
+ public void setBrokerInTime(long brokerInTime) {
+ this.brokerInTime = brokerInTime;
+ }
+
+ /**
+ * @openwire:property version=3
+ */
+ public long getBrokerOutTime() {
+ return this.brokerOutTime;
+ }
+
+ public void setBrokerOutTime(long brokerOutTime) {
+ this.brokerOutTime = brokerOutTime;
+ }
+
+ public boolean isDropped() {
+ return false;
+ }
+
+ public String toString() {
+ return toString(null);
+ }
+
+ public String toString(Map<String, Object>overrideFields) {
+ try {
+ getProperties();
+ } catch (IOException e) {
+ }
+ return super.toString(overrideFields);
+ }
+
+ private static final Map<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>();
+// private Object destination;
+
+ interface Expression {
+ public Object evaluate(Message mc);
+ }
+
+ static {
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression() {
+ public Object evaluate(Message message) {
+ ActiveMQDestination dest = message.getOriginalDestination();
+ if (dest == null) {
+ dest = message.getDestination();
+ }
+ if (dest == null) {
+ return null;
+ }
+ return dest.toString();
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new Expression() {
+ public Object evaluate(Message message) {
+ if (message.getReplyTo() == null) {
+ return null;
+ }
+ return message.getReplyTo().toString();
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSType", new Expression() {
+ public Object evaluate(Message message) {
+ return message.getType();
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new Expression() {
+ public Object evaluate(Message message) {
+ return Integer.valueOf(message.isPersistent() ? 2 : 1);
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new Expression() {
+ public Object evaluate(Message message) {
+ return Integer.valueOf(message.getPriority());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSMessageID", new Expression() {
+ public Object evaluate(Message message) {
+ if (message.getMessageId() == null) {
+ return null;
+ }
+ return message.getMessageId().toString();
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new Expression() {
+ public Object evaluate(Message message) {
+ return Long.valueOf(message.getTimestamp());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new Expression() {
+ public Object evaluate(Message message) {
+ return message.getCorrelationId();
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new Expression() {
+ public Object evaluate(Message message) {
+ return Long.valueOf(message.getExpiration());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression() {
+ public Object evaluate(Message message) {
+ return Boolean.valueOf(message.isRedelivered());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSXDeliveryCount", new Expression() {
+ public Object evaluate(Message message) {
+ return Integer.valueOf(message.getRedeliveryCounter() + 1);
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSXGroupID", new Expression() {
+ public Object evaluate(Message message) {
+ return message.getGroupID();
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSXGroupSeq", new Expression() {
+ public Object evaluate(Message message) {
+ return new Integer(message.getGroupSequence());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSXProducerTXID", new Expression() {
+ public Object evaluate(Message message) {
+ TransactionId txId = message.getOriginalTransactionId();
+ if (txId == null) {
+ txId = message.getTransactionId();
+ }
+ if (txId == null) {
+ return null;
+ }
+ return new Integer(txId.toString());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSActiveMQBrokerInTime", new Expression() {
+ public Object evaluate(Message message) {
+ return Long.valueOf(message.getBrokerInTime());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSActiveMQBrokerOutTime", new Expression() {
+ public Object evaluate(Message message) {
+ return Long.valueOf(message.getBrokerOutTime());
+ }
+ });
+ }
+
+ public Filterable createFilterable() {
+ return new Filterable() {
+ public <T> T getBodyAs(Class<T> type) throws FilterException {
+ try {
+ if( type == String.class ) {
+ if ( Message.this instanceof ActiveMQTextMessage ) {
+ return type.cast(((ActiveMQTextMessage)Message.this).getText());
+ }
+ }
+ if( type == Buffer.class ) {
+ if ( Message.this instanceof ActiveMQBytesMessage ) {
+ ActiveMQBytesMessage bm = ((ActiveMQBytesMessage)Message.this);
+ byte data[] = new byte[(int) bm.getBodyLength()];
+ bm.readBytes(data);
+ return type.cast(new Buffer(data));
+ }
+ }
+ return null;
+ } catch (OpenwireException e) {
+ throw new FilterException(e);
+ }
+ }
+
+ public Object getProperty(String name) {
+ Expression expression = JMS_PROPERTY_EXPRESSIONS.get(name);
+ if( expression != null ) {
+ return expression.evaluate(Message.this);
+ }
+ try {
+ return Message.this.getProperty(name);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ public Object getLocalConnectionId() {
+ // TODO:
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * @openwire:marshaller code="22"
+ * @version $Revision: 1.11 $
+ */
+public class MessageAck extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK;
+
+ /**
+ * Used to let the broker know that the message has been delivered to the
+ * client. Message will still be retained until an standard ack is received.
+ * This is used get the broker to send more messages past prefetch limits
+ * when an standard ack has not been sent.
+ */
+ public static final byte DELIVERED_ACK_TYPE = 0;
+
+ /**
+ * The standard ack case where a client wants the message to be discarded.
+ */
+ public static final byte STANDARD_ACK_TYPE = 2;
+
+ /**
+ * In case the client want's to explicitly let the broker know that a
+ * message was not processed and the message was considered a poison
+ * message.
+ */
+ public static final byte POSION_ACK_TYPE = 1;
+
+ /**
+ * In case the client want's to explicitly let the broker know that a
+ * message was not processed and it was re-delivered to the consumer
+ * but it was not yet considered to be a poison message. The messageCount
+ * field will hold the number of times the message was re-delivered.
+ */
+ public static final byte REDELIVERED_ACK_TYPE = 3;
+
+ /**
+ * The ack case where a client wants only an individual message to be discarded.
+ */
+ public static final byte INDIVIDUAL_ACK_TYPE = 4;
+
+ protected byte ackType;
+ protected ConsumerId consumerId;
+ protected MessageId firstMessageId;
+ protected MessageId lastMessageId;
+ protected ActiveMQDestination destination;
+ protected TransactionId transactionId;
+ protected int messageCount;
+
+ protected transient String consumerKey;
+
+ public MessageAck() {
+ }
+
+ public MessageAck(MessageDispatch md, byte ackType, int messageCount) {
+ this.ackType = ackType;
+ this.consumerId = md.getConsumerId();
+ this.destination = md.getDestination();
+ this.lastMessageId = md.getMessage().getMessageId();
+ this.messageCount = messageCount;
+ }
+
+ public void copy(MessageAck copy) {
+ super.copy(copy);
+ copy.firstMessageId = firstMessageId;
+ copy.lastMessageId = lastMessageId;
+ copy.destination = destination;
+ copy.transactionId = transactionId;
+ copy.ackType = ackType;
+ copy.consumerId = consumerId;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public boolean isMessageAck() {
+ return true;
+ }
+
+ public boolean isPoisonAck() {
+ return ackType == POSION_ACK_TYPE;
+ }
+
+ public boolean isStandardAck() {
+ return ackType == STANDARD_ACK_TYPE;
+ }
+
+ public boolean isDeliveredAck() {
+ return ackType == DELIVERED_ACK_TYPE;
+ }
+
+ public boolean isRedeliveredAck() {
+ return ackType == REDELIVERED_ACK_TYPE;
+ }
+
+ public boolean isIndividualAck() {
+ return ackType == INDIVIDUAL_ACK_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(ActiveMQDestination destination) {
+ this.destination = destination;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public TransactionId getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(TransactionId transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public boolean isInTransaction() {
+ return transactionId != null;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ConsumerId getConsumerId() {
+ return consumerId;
+ }
+
+ public void setConsumerId(ConsumerId consumerId) {
+ this.consumerId = consumerId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public byte getAckType() {
+ return ackType;
+ }
+
+ public void setAckType(byte ackType) {
+ this.ackType = ackType;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public MessageId getFirstMessageId() {
+ return firstMessageId;
+ }
+
+ public void setFirstMessageId(MessageId firstMessageId) {
+ this.firstMessageId = firstMessageId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public MessageId getLastMessageId() {
+ return lastMessageId;
+ }
+
+ public void setLastMessageId(MessageId lastMessageId) {
+ this.lastMessageId = lastMessageId;
+ }
+
+ /**
+ * The number of messages being acknowledged in the range.
+ *
+ * @openwire:property version=1
+ */
+ public int getMessageCount() {
+ return messageCount;
+ }
+
+ public void setMessageCount(int messageCount) {
+ this.messageCount = messageCount;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processMessageAck(this);
+ }
+
+ /**
+ * A helper method to allow a single message ID to be acknowledged
+ */
+ public void setMessageID(MessageId messageID) {
+ setFirstMessageId(messageID);
+ setLastMessageId(messageID);
+ setMessageCount(1);
+ }
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ *
+ * @openwire:marshaller code="21"
+ */
+public class MessageDispatch extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH;
+
+ protected ConsumerId consumerId;
+ protected ActiveMQDestination destination;
+ protected Message message;
+ protected int redeliveryCounter;
+
+ protected transient long deliverySequenceId;
+ protected transient Object consumer;
+ protected transient Runnable transmitCallback;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public boolean isMessageDispatch() {
+ return true;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ConsumerId getConsumerId() {
+ return consumerId;
+ }
+
+ public void setConsumerId(ConsumerId consumerId) {
+ this.consumerId = consumerId;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(ActiveMQDestination destination) {
+ this.destination = destination;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public Message getMessage() {
+ return message;
+ }
+
+ public void setMessage(Message message) {
+ this.message = message;
+ }
+
+ public long getDeliverySequenceId() {
+ return deliverySequenceId;
+ }
+
+ public void setDeliverySequenceId(long deliverySequenceId) {
+ this.deliverySequenceId = deliverySequenceId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public int getRedeliveryCounter() {
+ return redeliveryCounter;
+ }
+
+ public void setRedeliveryCounter(int deliveryCounter) {
+ this.redeliveryCounter = deliveryCounter;
+ }
+
+ public Object getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(Object consumer) {
+ this.consumer = consumer;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processMessageDispatch(this);
+ }
+
+ public Runnable getTransmitCallback() {
+ return transmitCallback;
+ }
+
+ public void setTransmitCallback(Runnable transmitCallback) {
+ this.transmitCallback = transmitCallback;
+ }
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * @openwire:marshaller code="90"
+ */
+public class MessageDispatchNotification extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH_NOTIFICATION;
+
+ protected ConsumerId consumerId;
+ protected ActiveMQDestination destination;
+ protected MessageId messageId;
+ protected long deliverySequenceId;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public boolean isMessageDispatchNotification() {
+ return true;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ConsumerId getConsumerId() {
+ return consumerId;
+ }
+
+ public void setConsumerId(ConsumerId consumerId) {
+ this.consumerId = consumerId;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(ActiveMQDestination destination) {
+ this.destination = destination;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+
+ public long getDeliverySequenceId() {
+ return deliverySequenceId;
+ }
+
+ public void setDeliverySequenceId(long deliverySequenceId) {
+ this.deliverySequenceId = deliverySequenceId;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processMessageDispatchNotification(this);
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public MessageId getMessageId() {
+ return messageId;
+ }
+
+ public void setMessageId(MessageId messageId) {
+ this.messageId = messageId;
+ }
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageId.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageId.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageId.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * @openwire:marshaller code="110"
+ * @version $Revision: 1.12 $
+ */
+public class MessageId implements DataStructure, Comparable<MessageId> {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ID;
+
+ protected ProducerId producerId;
+ protected long producerSequenceId;
+ protected long brokerSequenceId;
+
+ private transient String key;
+ private transient int hashCode;
+
+ public MessageId() {
+ this.producerId = new ProducerId();
+ }
+
+ public MessageId(ProducerInfo producerInfo, long producerSequenceId) {
+ this.producerId = producerInfo.getProducerId();
+ this.producerSequenceId = producerSequenceId;
+ }
+
+ public MessageId(String messageKey) {
+ setValue(messageKey);
+ }
+
+ public MessageId(String producerId, long producerSequenceId) {
+ this(new ProducerId(producerId), producerSequenceId);
+ }
+
+ public MessageId(ProducerId producerId, long producerSequenceId) {
+ this.producerId = producerId;
+ this.producerSequenceId = producerSequenceId;
+ }
+
+ /**
+ * Sets the value as a String
+ */
+ public void setValue(String messageKey) {
+ key = messageKey;
+ // Parse off the sequenceId
+ int p = messageKey.lastIndexOf(":");
+ if (p >= 0) {
+ producerSequenceId = Long.parseLong(messageKey.substring(p + 1));
+ messageKey = messageKey.substring(0, p);
+ }
+ producerId = new ProducerId(messageKey);
+ }
+
+ /**
+ * Sets the transient text view of the message which will be ignored if the
+ * message is marshaled on a transport; so is only for in-JVM changes to
+ * accommodate foreign JMS message IDs
+ */
+ public void setTextView(String key) {
+ this.key = key;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || o.getClass() != getClass()) {
+ return false;
+ }
+
+ MessageId id = (MessageId)o;
+ return producerSequenceId == id.producerSequenceId && producerId.equals(id.producerId);
+ }
+
+ public int hashCode() {
+ if (hashCode == 0) {
+ hashCode = producerId.hashCode() ^ (int)producerSequenceId;
+ }
+ return hashCode;
+ }
+
+ public String toString() {
+ if (key == null) {
+ key = producerId.toString() + ":" + producerSequenceId;
+ }
+ return key;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ProducerId getProducerId() {
+ return producerId;
+ }
+
+ public void setProducerId(ProducerId producerId) {
+ this.producerId = producerId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public long getProducerSequenceId() {
+ return producerSequenceId;
+ }
+
+ public void setProducerSequenceId(long producerSequenceId) {
+ this.producerSequenceId = producerSequenceId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public long getBrokerSequenceId() {
+ return brokerSequenceId;
+ }
+
+ public void setBrokerSequenceId(long brokerSequenceId) {
+ this.brokerSequenceId = brokerSequenceId;
+ }
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+
+ public MessageId copy() {
+ MessageId copy = new MessageId(producerId, producerSequenceId);
+ copy.key = key;
+ copy.brokerSequenceId = brokerSequenceId;
+ return copy;
+ }
+
+ /**
+ * @param o
+ * @return
+ * @see java.lang.Comparable#compareTo(java.lang.Object)
+ */
+ public int compareTo(MessageId other) {
+ int result = -1;
+ if (other != null) {
+ result = this.toString().compareTo(other.toString());
+ }
+ return result;
+ }
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageId.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessagePull.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessagePull.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessagePull.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessagePull.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * Used to pull messages on demand.
+ *
+ * @openwire:marshaller code="20"
+ *
+ */
+public class MessagePull extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_PULL;
+
+ protected ConsumerId consumerId;
+ protected ActiveMQDestination destination;
+ protected long timeout;
+ private MessageId messageId;
+ private String correlationId;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
+ return visitor.processMessagePull(this);
+ }
+
+ /**
+ * Configures a message pull from the consumer information
+ */
+ public void configure(ConsumerInfo info) {
+ setConsumerId(info.getConsumerId());
+ setDestination(info.getDestination());
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ConsumerId getConsumerId() {
+ return consumerId;
+ }
+
+ public void setConsumerId(ConsumerId consumerId) {
+ this.consumerId = consumerId;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(ActiveMQDestination destination) {
+ this.destination = destination;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /**
+ * An optional correlation ID which could be used by a broker to decide which messages are pulled
+ * on demand from a queue for a consumer
+ *
+ * @openwire:property version=3
+ */
+ public String getCorrelationId() {
+ return correlationId;
+ }
+
+ public void setCorrelationId(String correlationId) {
+ this.correlationId = correlationId;
+ }
+
+
+ /**
+ * An optional message ID which could be used by a broker to decide which messages are pulled
+ * on demand from a queue for a consumer
+ *
+ * @openwire:property version=3
+ */
+ public MessageId getMessageId() {
+ return messageId;
+ }
+
+ public void setMessageId(MessageId messageId) {
+ this.messageId = messageId;
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/NetworkBridgeFilter.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/NetworkBridgeFilter.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/NetworkBridgeFilter.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/NetworkBridgeFilter.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import java.util.Arrays;
+
+import org.apache.activemq.apollo.filter.BooleanExpression;
+import org.apache.activemq.apollo.filter.FilterException;
+import org.apache.activemq.apollo.filter.Filterable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @openwire:marshaller code="91"
+ * @version $Revision: 1.12 $
+ */
+public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
+ static final Log LOG = LogFactory.getLog(NetworkBridgeFilter.class);
+
+ private BrokerId networkBrokerId;
+ private int networkTTL;
+
+ public NetworkBridgeFilter() {
+ }
+
+ public NetworkBridgeFilter(BrokerId remoteBrokerPath, int networkTTL) {
+ this.networkBrokerId = remoteBrokerPath;
+ this.networkTTL = networkTTL;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+
+ public boolean matches(Filterable filterable) throws FilterException {
+ return matchesForwardingFilter((Message)filterable);
+ }
+
+ public Object evaluate(Filterable filterable) throws FilterException {
+ return matches(filterable) ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ protected boolean matchesForwardingFilter(Message message) {
+
+ if (contains(message.getBrokerPath(), networkBrokerId)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Message all ready routed once through this broker ("
+ + networkBrokerId + "), path: "
+ + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
+ }
+ return false;
+ }
+
+ int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
+
+ if (hops >= networkTTL) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message);
+ }
+ return false;
+ }
+
+ // Don't propagate advisory messages about network subscriptions
+ if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
+ ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
+ hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
+ if (hops >= networkTTL) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
+ if (brokerPath != null && brokerId != null) {
+ for (int i = 0; i < brokerPath.length; i++) {
+ if (brokerId.equals(brokerPath[i])) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public int getNetworkTTL() {
+ return networkTTL;
+ }
+
+ public void setNetworkTTL(int networkTTL) {
+ this.networkTTL = networkTTL;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public BrokerId getNetworkBrokerId() {
+ return networkBrokerId;
+ }
+
+ public void setNetworkBrokerId(BrokerId remoteBrokerPath) {
+ this.networkBrokerId = remoteBrokerPath;
+ }
+
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/PartialCommand.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/PartialCommand.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/PartialCommand.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/PartialCommand.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * Represents a partial command; a large command that has been split up into
+ * pieces.
+ *
+ * @openwire:marshaller code="60"
+ * @version $Revision: 920306 $
+ */
+public class PartialCommand implements Command {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PARTIAL_COMMAND;
+
+ private int commandId;
+ private byte[] data;
+
+ private transient Endpoint from;
+ private transient Endpoint to;
+
+ public PartialCommand() {
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public int getCommandId() {
+ return commandId;
+ }
+
+ public void setCommandId(int commandId) {
+ this.commandId = commandId;
+ }
+
+ /**
+ * The data for this part of the command
+ *
+ * @openwire:property version=1 mandatory=true
+ */
+ public byte[] getData() {
+ return data;
+ }
+
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+ public Endpoint getFrom() {
+ return from;
+ }
+
+ public void setFrom(Endpoint from) {
+ this.from = from;
+ }
+
+ public Endpoint getTo() {
+ return to;
+ }
+
+ public void setTo(Endpoint to) {
+ this.to = to;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ throw new IllegalStateException("The transport layer should filter out PartialCommand instances but received: " + this);
+ }
+
+ public boolean isResponseRequired() {
+ return false;
+ }
+
+ public boolean isResponse() {
+ return false;
+ }
+
+ public boolean isBrokerInfo() {
+ return false;
+ }
+
+ public boolean isMessageDispatch() {
+ return false;
+ }
+
+ public boolean isMessage() {
+ return false;
+ }
+
+ public boolean isMessageAck() {
+ return false;
+ }
+
+ public boolean isMessageDispatchNotification() {
+ return false;
+ }
+
+ public boolean isShutdownInfo() {
+ return false;
+ }
+
+ public boolean isConnectionControl() {
+ return false;
+ }
+
+ public void setResponseRequired(boolean responseRequired) {
+ }
+
+ public boolean isWireFormatInfo() {
+ return false;
+ }
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+
+ public String toString() {
+ int size = 0;
+ if (data != null) {
+ size = data.length;
+ }
+ return "PartialCommand[id: " + commandId + " data: " + size + " byte(s)]";
+ }
+
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerAck.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerAck.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerAck.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerAck.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * A ProducerAck command is sent by a broker to a producer to let it know it has
+ * received and processed messages that it has produced. The producer will be
+ * flow controlled if it does not receive ProducerAck commands back from the
+ * broker.
+ *
+ * @openwire:marshaller code="19" version="3"
+ * @version $Revision: 1.11 $
+ */
+public class ProducerAck extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ACK;
+
+ protected ProducerId producerId;
+ protected int size;
+
+ public ProducerAck() {
+ }
+
+ public ProducerAck(ProducerId producerId, int size) {
+ this.producerId = producerId;
+ this.size = size;
+ }
+
+ public void copy(ProducerAck copy) {
+ super.copy(copy);
+ copy.producerId = producerId;
+ copy.size = size;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processProducerAck(this);
+ }
+
+ /**
+ * The producer id that this ack message is destined for.
+ *
+ * @openwire:property version=3
+ */
+ public ProducerId getProducerId() {
+ return producerId;
+ }
+
+ public void setProducerId(ProducerId producerId) {
+ this.producerId = producerId;
+ }
+
+ /**
+ * The number of bytes that are being acked.
+ *
+ * @openwire:property version=3
+ */
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * @openwire:marshaller code="123"
+ */
+public class ProducerId implements DataStructure {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ID;
+
+ protected String connectionId;
+ protected long sessionId;
+ protected long value;
+
+ protected transient int hashCode;
+ protected transient String key;
+ protected transient SessionId parentId;
+
+ public ProducerId() {
+ }
+
+ public ProducerId(SessionId sessionId, long producerId) {
+ this.connectionId = sessionId.getConnectionId();
+ this.sessionId = sessionId.getValue();
+ this.value = producerId;
+ }
+
+ public ProducerId(ProducerId id) {
+ this.connectionId = id.getConnectionId();
+ this.sessionId = id.getSessionId();
+ this.value = id.getValue();
+ }
+
+ public ProducerId(String producerKey) {
+ // Parse off the producerId
+ int p = producerKey.lastIndexOf(":");
+ if (p >= 0) {
+ value = Long.parseLong(producerKey.substring(p + 1));
+ producerKey = producerKey.substring(0, p);
+ }
+ setProducerSessionKey(producerKey);
+ }
+
+ public SessionId getParentId() {
+ if (parentId == null) {
+ parentId = new SessionId(this);
+ }
+ return parentId;
+ }
+
+ public int hashCode() {
+ if (hashCode == 0) {
+ hashCode = connectionId.hashCode() ^ (int)sessionId ^ (int)value;
+ }
+ return hashCode;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || o.getClass() != ProducerId.class) {
+ return false;
+ }
+ ProducerId id = (ProducerId)o;
+ return sessionId == id.sessionId && value == id.value && connectionId.equals(id.connectionId);
+ }
+
+ /**
+ * @param sessionKey
+ */
+ private void setProducerSessionKey(String sessionKey) {
+ // Parse off the value
+ int p = sessionKey.lastIndexOf(":");
+ if (p >= 0) {
+ sessionId = Long.parseLong(sessionKey.substring(p + 1));
+ sessionKey = sessionKey.substring(0, p);
+ }
+ // The rest is the value
+ connectionId = sessionKey;
+ }
+
+ public String toString() {
+ if (key == null) {
+ key = connectionId + ":" + sessionId + ":" + value;
+ }
+ return key;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public String getConnectionId() {
+ return connectionId;
+ }
+
+ public void setConnectionId(String connectionId) {
+ this.connectionId = connectionId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public long getValue() {
+ return value;
+ }
+
+ public void setValue(long producerId) {
+ this.value = producerId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public long getSessionId() {
+ return sessionId;
+ }
+
+ public void setSessionId(long sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ *
+ * @openwire:marshaller code="6"
+ * @version $Revision: 1.13 $
+ */
+public class ProducerInfo extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_INFO;
+
+ protected ProducerId producerId;
+ protected ActiveMQDestination destination;
+ protected BrokerId[] brokerPath;
+ protected boolean dispatchAsync;
+ protected int windowSize;
+
+ public ProducerInfo() {
+ }
+
+ public ProducerInfo(ProducerId producerId) {
+ this.producerId = producerId;
+ }
+
+ public ProducerInfo(SessionInfo sessionInfo, long producerId) {
+ this.producerId = new ProducerId(sessionInfo.getSessionId(), producerId);
+ }
+
+ public ProducerInfo copy() {
+ ProducerInfo info = new ProducerInfo();
+ copy(info);
+ return info;
+ }
+
+ public void copy(ProducerInfo info) {
+ super.copy(info);
+ info.producerId = producerId;
+ info.destination = destination;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ProducerId getProducerId() {
+ return producerId;
+ }
+
+ public void setProducerId(ProducerId producerId) {
+ this.producerId = producerId;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(ActiveMQDestination destination) {
+ this.destination = destination;
+ }
+
+ public RemoveInfo createRemoveCommand() {
+ RemoveInfo command = new RemoveInfo(getProducerId());
+ command.setResponseRequired(isResponseRequired());
+ return command;
+ }
+
+ /**
+ * The route of brokers the command has moved through.
+ *
+ * @openwire:property version=1 cache=true
+ */
+ public BrokerId[] getBrokerPath() {
+ return brokerPath;
+ }
+
+ public void setBrokerPath(BrokerId[] brokerPath) {
+ this.brokerPath = brokerPath;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processAddProducer(this);
+ }
+
+ /**
+ * If the broker should dispatch messages from this producer async. Since
+ * sync dispatch could potentally block the producer thread, this could be
+ * an important setting for the producer.
+ *
+ * @openwire:property version=2
+ */
+ public boolean isDispatchAsync() {
+ return dispatchAsync;
+ }
+
+ public void setDispatchAsync(boolean dispatchAsync) {
+ this.dispatchAsync = dispatchAsync;
+ }
+
+ /**
+ * Used to configure the producer window size. A producer will send up to
+ * the configured window size worth of payload data to the broker before
+ * waiting for an Ack that allows him to send more.
+ *
+ * @openwire:property version=3
+ */
+ public int getWindowSize() {
+ return windowSize;
+ }
+
+ public void setWindowSize(int windowSize) {
+ this.windowSize = windowSize;
+ }
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import java.io.IOException;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * Removes a consumer, producer, session or connection.
+ *
+ * @openwire:marshaller code="12"
+ */
+public class RemoveInfo extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_INFO;
+
+ protected DataStructure objectId;
+ protected long lastDeliveredSequenceId;
+
+ public RemoveInfo() {
+ }
+
+ public RemoveInfo(DataStructure objectId) {
+ this.objectId = objectId;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public DataStructure getObjectId() {
+ return objectId;
+ }
+
+ public void setObjectId(DataStructure objectId) {
+ this.objectId = objectId;
+ }
+
+ /**
+ * @openwire:property version=5 cache=false
+ */
+ public long getLastDeliveredSequenceId() {
+ return lastDeliveredSequenceId;
+ }
+
+ public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
+ this.lastDeliveredSequenceId = lastDeliveredSequenceId;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ switch (objectId.getDataStructureType()) {
+ case ConnectionId.DATA_STRUCTURE_TYPE:
+ return visitor.processRemoveConnection(this, (ConnectionId)objectId, lastDeliveredSequenceId);
+ case SessionId.DATA_STRUCTURE_TYPE:
+ return visitor.processRemoveSession(this, (SessionId)objectId, lastDeliveredSequenceId);
+ case ConsumerId.DATA_STRUCTURE_TYPE:
+ return visitor.processRemoveConsumer(this, (ConsumerId)objectId, lastDeliveredSequenceId);
+ case ProducerId.DATA_STRUCTURE_TYPE:
+ return visitor.processRemoveProducer(this, (ProducerId)objectId);
+ default:
+ throw new IOException("Unknown remove command type: " + objectId.getDataStructureType());
+ }
+ }
+
+ /**
+ * Returns true if this event is for a removed connection
+ */
+ public boolean isConnectionRemove() {
+ return objectId.getDataStructureType() == ConnectionId.DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * Returns true if this event is for a removed session
+ */
+ public boolean isSessionRemove() {
+ return objectId.getDataStructureType() == SessionId.DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * Returns true if this event is for a removed consumer
+ */
+ public boolean isConsumerRemove() {
+ return objectId.getDataStructureType() == ConsumerId.DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * Returns true if this event is for a removed producer
+ */
+ public boolean isProducerRemove() {
+ return objectId.getDataStructureType() == ProducerId.DATA_STRUCTURE_TYPE;
+ }
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * @openwire:marshaller code="9"
+ * @version $Revision: 1.7 $
+ */
+public class RemoveSubscriptionInfo extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_SUBSCRIPTION_INFO;
+
+ protected ConnectionId connectionId;
+ protected String clientId;
+ protected String subscriptionName;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public void setConnectionId(ConnectionId connectionId) {
+ this.connectionId = connectionId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getSubscriptionName() {
+ return subscriptionName;
+ }
+
+ public void setSubscriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processRemoveSubscription(this);
+ }
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ReplayCommand.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ReplayCommand.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ReplayCommand.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ReplayCommand.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * A general purpose replay command for some kind of producer where ranges of
+ * messages are asked to be replayed. This command is typically used over a
+ * non-reliable transport such as UDP or multicast but could also be used on
+ * TCP/IP if a socket has been re-established.
+ *
+ * @openwire:marshaller code="65"
+ */
+public class ReplayCommand extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REPLAY;
+
+ private String producerId;
+ private int firstAckNumber;
+ private int lastAckNumber;
+ private int firstNakNumber;
+ private int lastNakNumber;
+
+ public ReplayCommand() {
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public String getProducerId() {
+ return producerId;
+ }
+
+ /**
+ * Is used to uniquely identify the producer of the sequence
+ *
+ * @openwire:property version=1 cache=false
+ */
+ public void setProducerId(String producerId) {
+ this.producerId = producerId;
+ }
+
+ public int getFirstAckNumber() {
+ return firstAckNumber;
+ }
+
+ /**
+ * Is used to specify the first sequence number being acknowledged as delivered on the transport
+ * so that it can be removed from cache
+ *
+ * @openwire:property version=1
+ */
+ public void setFirstAckNumber(int firstSequenceNumber) {
+ this.firstAckNumber = firstSequenceNumber;
+ }
+
+ public int getLastAckNumber() {
+ return lastAckNumber;
+ }
+
+ /**
+ * Is used to specify the last sequence number being acknowledged as delivered on the transport
+ * so that it can be removed from cache
+ *
+ * @openwire:property version=1
+ */
+ public void setLastAckNumber(int lastSequenceNumber) {
+ this.lastAckNumber = lastSequenceNumber;
+ }
+
+ public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
+ return null;
+ }
+
+ /**
+ * Is used to specify the first sequence number to be replayed
+ *
+ * @openwire:property version=1
+ */
+ public int getFirstNakNumber() {
+ return firstNakNumber;
+ }
+
+ public void setFirstNakNumber(int firstNakNumber) {
+ this.firstNakNumber = firstNakNumber;
+ }
+
+ /**
+ * Is used to specify the last sequence number to be replayed
+ *
+ * @openwire:property version=1
+ */
+ public int getLastNakNumber() {
+ return lastNakNumber;
+ }
+
+ public void setLastNakNumber(int lastNakNumber) {
+ this.lastNakNumber = lastNakNumber;
+ }
+
+ public String toString() {
+ return "ReplayCommand {commandId = " + getCommandId() + ", firstNakNumber = " + getFirstNakNumber() + ", lastNakNumber = " + getLastNakNumber() + "}";
+ }
+
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * @openwire:marshaller code="30"
+ * @version $Revision: 1.6 $
+ */
+public class Response extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.RESPONSE;
+ int correlationId;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public int getCorrelationId() {
+ return correlationId;
+ }
+
+ public void setCorrelationId(int responseId) {
+ this.correlationId = responseId;
+ }
+
+ public boolean isResponse() {
+ return true;
+ }
+
+ public boolean isException() {
+ return false;
+ }
+
+ public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
+ return null;
+ }
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java
------------------------------------------------------------------------------
svn:executable = *