You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/04/27 19:33:09 UTC

svn commit: r1097189 [39/42] - in /activemq/activemq-apollo/trunk: ./ apollo-openwire/ apollo-openwire/src/ apollo-openwire/src/main/ apollo-openwire/src/main/resources/ apollo-openwire/src/main/resources/META-INF/ apollo-openwire/src/main/resources/ME...

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,847 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.apollo.openwire.support.MarshallingSupport;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.apache.activemq.apollo.openwire.support.advisory.AdvisorySupport;
+import org.apache.activemq.apollo.filter.FilterException;
+import org.apache.activemq.apollo.filter.Filterable;
+import org.apache.activemq.apollo.openwire.support.broker.region.MessageReference;
+import org.apache.activemq.apollo.openwire.codec.OpenWireFormat;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+/**
+ * Represents an ActiveMQ message
+ * 
+ * @openwire:marshaller
+ */
+public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
+
+    /**
+     * The default minimum amount of memory a message is assumed to use
+     */
+    public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
+
+    protected MessageId messageId;
+    protected ActiveMQDestination originalDestination;
+    protected TransactionId originalTransactionId;
+
+    protected ProducerId producerId;
+    protected ActiveMQDestination destination;
+    protected TransactionId transactionId;
+
+    protected long expiration;
+    protected long timestamp;
+    protected long arrival;
+    protected long brokerInTime;
+    protected long brokerOutTime;
+    protected String correlationId;
+    protected ActiveMQDestination replyTo;
+    protected boolean persistent;
+    protected String type;
+    protected byte priority;
+    protected String groupID;
+    protected int groupSequence;
+    protected ConsumerId targetConsumerId;
+    protected boolean compressed;
+    protected String userID;
+
+    protected Buffer content;
+    protected Buffer marshalledProperties;
+    protected DataStructure dataStructure;
+    protected int redeliveryCounter;
+
+    protected int size;
+    protected Map<String, Object> properties;
+    protected boolean readOnlyProperties;
+    protected boolean readOnlyBody;
+    protected transient boolean recievedByDFBridge;
+    protected boolean droppable;
+
+    private transient short referenceCount;
+
+    private BrokerId[] brokerPath;
+    private BrokerId[] cluster;
+
+    public abstract org.apache.activemq.apollo.openwire.command.Message copy();
+    public abstract void clearBody() throws OpenwireException;
+
+    protected void copy(org.apache.activemq.apollo.openwire.command.Message copy) {
+        super.copy(copy);
+        copy.producerId = producerId;
+        copy.transactionId = transactionId;
+        copy.destination = destination;
+        copy.messageId = messageId != null ? messageId.copy() : null;
+        copy.originalDestination = originalDestination;
+        copy.originalTransactionId = originalTransactionId;
+        copy.expiration = expiration;
+        copy.timestamp = timestamp;
+        copy.correlationId = correlationId;
+        copy.replyTo = replyTo;
+        copy.persistent = persistent;
+        copy.redeliveryCounter = redeliveryCounter;
+        copy.type = type;
+        copy.priority = priority;
+        copy.size = size;
+        copy.groupID = groupID;
+        copy.userID = userID;
+        copy.groupSequence = groupSequence;
+
+        if (properties != null) {
+            copy.properties = new HashMap<String, Object>(properties);
+        } else {
+            copy.properties = properties;
+        }
+
+        copy.content = content;
+        copy.marshalledProperties = marshalledProperties;
+        copy.dataStructure = dataStructure;
+        copy.readOnlyProperties = readOnlyProperties;
+        copy.readOnlyBody = readOnlyBody;
+        copy.compressed = compressed;
+        copy.recievedByDFBridge = recievedByDFBridge;
+
+        copy.arrival = arrival;
+        copy.brokerInTime = brokerInTime;
+        copy.brokerOutTime = brokerOutTime;
+        copy.brokerPath = brokerPath;
+
+        // lets not copy the following fields
+        // copy.targetConsumerId = targetConsumerId;
+        // copy.referenceCount = referenceCount;
+    }
+
+    public Object getProperty(String name) throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                return null;
+            }
+            properties = unmarsallProperties(marshalledProperties);
+        }
+        return properties.get(name);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<String, Object> getProperties() throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                return Collections.EMPTY_MAP;
+            }
+            properties = unmarsallProperties(marshalledProperties);
+        }
+        return Collections.unmodifiableMap(properties);
+    }
+
+    public void clearProperties() {
+        marshalledProperties = null;
+        properties = null;
+    }
+
+    public void setProperty(String name, Object value) throws IOException {
+        lazyCreateProperties();
+        properties.put(name, value);
+    }
+
+    protected void lazyCreateProperties() throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                properties = new HashMap<String, Object>();
+            } else {
+                properties = unmarsallProperties(marshalledProperties);
+                marshalledProperties = null;
+            }
+        }
+    }
+
+    private Map<String, Object> unmarsallProperties(Buffer marshalledProperties) throws IOException {
+        return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
+    }
+
+    public void beforeMarshall(OpenWireFormat wireFormat) throws IOException {
+        // Need to marshal the properties.
+        if (marshalledProperties == null && properties != null) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputStream os = new DataOutputStream(baos);
+            MarshallingSupport.marshalPrimitiveMap(properties, os);
+            os.close();
+            marshalledProperties = baos.toBuffer();
+        }
+    }
+
+    public void afterMarshall(OpenWireFormat wireFormat) throws IOException {
+    }
+
+    public void beforeUnmarshall(OpenWireFormat wireFormat) throws IOException {
+    }
+
+    public void afterUnmarshall(OpenWireFormat wireFormat) throws IOException {
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    //
+    // Simple Field accessors
+    //
+    // /////////////////////////////////////////////////////////////////
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public TransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(TransactionId transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    public boolean isInTransaction() {
+        return transactionId != null;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getOriginalDestination() {
+        return originalDestination;
+    }
+
+    public void setOriginalDestination(ActiveMQDestination destination) {
+        this.originalDestination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageId getMessageId() {
+        return messageId;
+    }
+
+    public void setMessageId(MessageId messageId) {
+        this.messageId = messageId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public TransactionId getOriginalTransactionId() {
+        return originalTransactionId;
+    }
+
+    public void setOriginalTransactionId(TransactionId transactionId) {
+        this.originalTransactionId = transactionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getGroupID() {
+        return groupID;
+    }
+
+    public void setGroupID(String groupID) {
+        this.groupID = groupID;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getGroupSequence() {
+        return groupSequence;
+    }
+
+    public void setGroupSequence(int groupSequence) {
+        this.groupSequence = groupSequence;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getCorrelationId() {
+        return correlationId;
+    }
+
+    public void setCorrelationId(String correlationId) {
+        this.correlationId = correlationId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isPersistent() {
+        return persistent;
+    }
+
+    public void setPersistent(boolean deliveryMode) {
+        this.persistent = deliveryMode;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getExpiration() {
+        return expiration;
+    }
+
+    public void setExpiration(long expiration) {
+        this.expiration = expiration;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public byte getPriority() {
+        return priority;
+    }
+
+    public void setPriority(byte priority) {
+        this.priority = priority;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public ActiveMQDestination getReplyTo() {
+        return replyTo;
+    }
+
+    public void setReplyTo(ActiveMQDestination replyTo) {
+        this.replyTo = replyTo;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public Buffer getContent() {
+        return content;
+    }
+
+    public void setContent(Buffer content) {
+        this.content = content;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public Buffer getMarshalledProperties() {
+        return marshalledProperties;
+    }
+
+    public void setMarshalledProperties(Buffer marshalledProperties) {
+        this.marshalledProperties = marshalledProperties;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public DataStructure getDataStructure() {
+        return dataStructure;
+    }
+
+    public void setDataStructure(DataStructure data) {
+        this.dataStructure = data;
+    }
+
+    /**
+     * Can be used to route the message to a specific consumer. Should be null
+     * to allow the broker use normal JMS routing semantics. If the target
+     * consumer id is an active consumer on the broker, the message is dropped.
+     * Used by the AdvisoryBroker to replay advisory messages to a specific
+     * consumer.
+     * 
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getTargetConsumerId() {
+        return targetConsumerId;
+    }
+
+    public void setTargetConsumerId(ConsumerId targetConsumerId) {
+        this.targetConsumerId = targetConsumerId;
+    }
+
+    public boolean isExpired() {
+        long expireTime = getExpiration();
+        if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
+            return true;
+        }
+        return false;
+    }
+
+    public boolean isAdvisory() {
+        return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isCompressed() {
+        return compressed;
+    }
+
+    public void setCompressed(boolean compressed) {
+        this.compressed = compressed;
+    }
+
+    public boolean isRedelivered() {
+        return redeliveryCounter > 0;
+    }
+
+    public void setRedelivered(boolean redelivered) {
+        if (redelivered) {
+            if (!isRedelivered()) {
+                setRedeliveryCounter(1);
+            }
+        } else {
+            if (isRedelivered()) {
+                setRedeliveryCounter(0);
+            }
+        }
+    }
+
+    public void incrementRedeliveryCounter() {
+        redeliveryCounter++;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getRedeliveryCounter() {
+        return redeliveryCounter;
+    }
+
+    public void setRedeliveryCounter(int deliveryCounter) {
+        this.redeliveryCounter = deliveryCounter;
+    }
+
+    /**
+     * The route of brokers the command has moved through.
+     * 
+     * @openwire:property version=1 cache=true
+     */
+    public BrokerId[] getBrokerPath() {
+        return brokerPath;
+    }
+
+    public void setBrokerPath(BrokerId[] brokerPath) {
+        this.brokerPath = brokerPath;
+    }
+
+    public boolean isReadOnlyProperties() {
+        return readOnlyProperties;
+    }
+
+    public void setReadOnlyProperties(boolean readOnlyProperties) {
+        this.readOnlyProperties = readOnlyProperties;
+    }
+
+    public boolean isReadOnlyBody() {
+        return readOnlyBody;
+    }
+
+    public void setReadOnlyBody(boolean readOnlyBody) {
+        this.readOnlyBody = readOnlyBody;
+    }
+
+    /**
+     * Used to schedule the arrival time of a message to a broker. The broker
+     * will not dispatch a message to a consumer until it's arrival time has
+     * elapsed.
+     * 
+     * @openwire:property version=1
+     */
+    public long getArrival() {
+        return arrival;
+    }
+
+    public void setArrival(long arrival) {
+        this.arrival = arrival;
+    }
+
+    /**
+     * Only set by the broker and defines the userID of the producer connection
+     * who sent this message. This is an optional field, it needs to be enabled
+     * on the broker to have this field populated.
+     * 
+     * @openwire:property version=1
+     */
+    public String getUserID() {
+        return userID;
+    }
+
+    public void setUserID(String jmsxUserID) {
+        this.userID = jmsxUserID;
+    }
+
+    public int getReferenceCount() {
+        return referenceCount;
+    }
+
+    public org.apache.activemq.apollo.openwire.command.Message getMessageHardRef() {
+        return this;
+    }
+
+    public org.apache.activemq.apollo.openwire.command.Message getMessage() throws IOException {
+        return this;
+    }
+
+    public boolean isMarshallAware() {
+        return true;
+    }
+
+    public int incrementReferenceCount() {
+        int rc;
+        int size;
+        synchronized (this) {
+            rc = ++referenceCount;
+            size = getSize();
+        }
+
+//        if (rc == 1 && getMemoryUsage() != null) {
+//            getMemoryUsage().increaseUsage(size);
+//        }
+
+        //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
+        return rc;
+    }
+
+    public int decrementReferenceCount() {
+        int rc;
+        int size;
+        synchronized (this) {
+            rc = --referenceCount;
+            size = getSize();
+        }
+//
+//        if (rc == 0 && getMemoryUsage() != null) {
+//            getMemoryUsage().decreaseUsage(size);
+//        }
+        //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
+
+        return rc;
+    }
+
+    public int getSize() {
+        int minimumMessageSize = getMinimumMessageSize();
+        if (size < minimumMessageSize || size == 0) {
+            size = minimumMessageSize;
+            if (marshalledProperties != null) {
+                size += marshalledProperties.getLength();
+            }
+            if (content != null) {
+                size += content.getLength();
+            }
+        }
+        return size;
+    }
+    
+    protected int getMinimumMessageSize() {
+        return DEFAULT_MINIMUM_MESSAGE_SIZE;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the recievedByDFBridge.
+     */
+    public boolean isRecievedByDFBridge() {
+        return recievedByDFBridge;
+    }
+
+    /**
+     * @param recievedByDFBridge The recievedByDFBridge to set.
+     */
+    public void setRecievedByDFBridge(boolean recievedByDFBridge) {
+        this.recievedByDFBridge = recievedByDFBridge;
+    }
+
+    public void onMessageRolledBack() {
+        incrementRedeliveryCounter();
+    }
+
+    /**
+     * @openwire:property version=2 cache=true
+     */
+    public boolean isDroppable() {
+        return droppable;
+    }
+
+    public void setDroppable(boolean droppable) {
+        this.droppable = droppable;
+    }
+
+    /**
+     * If a message is stored in multiple nodes on a cluster, all the cluster
+     * members will be listed here. Otherwise, it will be null.
+     * 
+     * @openwire:property version=3 cache=true
+     */
+    public BrokerId[] getCluster() {
+        return cluster;
+    }
+
+    public void setCluster(BrokerId[] cluster) {
+        this.cluster = cluster;
+    }
+
+    public boolean isMessage() {
+        return true;
+    }
+
+    /**
+     * @openwire:property version=3
+     */
+    public long getBrokerInTime() {
+        return this.brokerInTime;
+    }
+
+    public void setBrokerInTime(long brokerInTime) {
+        this.brokerInTime = brokerInTime;
+    }
+
+    /**
+     * @openwire:property version=3
+     */
+    public long getBrokerOutTime() {
+        return this.brokerOutTime;
+    }
+
+    public void setBrokerOutTime(long brokerOutTime) {
+        this.brokerOutTime = brokerOutTime;
+    }
+    
+    public boolean isDropped() {
+        return false;
+    }
+    
+    public String toString() {
+        return toString(null);
+    }
+    
+    public String toString(Map<String, Object>overrideFields) {
+        try {
+            getProperties();
+        } catch (IOException e) {
+        }
+        return super.toString(overrideFields);
+    }
+
+    private static final Map<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>();
+//    private Object destination;
+
+    interface Expression {
+        public Object evaluate(Message mc);
+    }
+
+    static {
+        JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression() {
+            public Object evaluate(Message message) {
+                ActiveMQDestination dest = message.getOriginalDestination();
+                if (dest == null) {
+                    dest = message.getDestination();
+                }
+                if (dest == null) {
+                    return null;
+                }
+                return dest.toString();
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new Expression() {
+            public Object evaluate(Message message) {
+                if (message.getReplyTo() == null) {
+                    return null;
+                }
+                return message.getReplyTo().toString();
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSType", new Expression() {
+            public Object evaluate(Message message) {
+                return message.getType();
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new Expression() {
+            public Object evaluate(Message message) {
+                return Integer.valueOf(message.isPersistent() ? 2 : 1);
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new Expression() {
+            public Object evaluate(Message message) {
+                return Integer.valueOf(message.getPriority());
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSMessageID", new Expression() {
+            public Object evaluate(Message message) {
+                if (message.getMessageId() == null) {
+                    return null;
+                }
+                return message.getMessageId().toString();
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new Expression() {
+            public Object evaluate(Message message) {
+                return Long.valueOf(message.getTimestamp());
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new Expression() {
+            public Object evaluate(Message message) {
+                return message.getCorrelationId();
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new Expression() {
+            public Object evaluate(Message message) {
+                return Long.valueOf(message.getExpiration());
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression() {
+            public Object evaluate(Message message) {
+                return Boolean.valueOf(message.isRedelivered());
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSXDeliveryCount", new Expression() {
+            public Object evaluate(Message message) {
+                return Integer.valueOf(message.getRedeliveryCounter() + 1);
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSXGroupID", new Expression() {
+            public Object evaluate(Message message) {
+                return message.getGroupID();
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSXGroupSeq", new Expression() {
+            public Object evaluate(Message message) {
+                return new Integer(message.getGroupSequence());
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSXProducerTXID", new Expression() {
+            public Object evaluate(Message message) {
+                TransactionId txId = message.getOriginalTransactionId();
+                if (txId == null) {
+                    txId = message.getTransactionId();
+                }
+                if (txId == null) {
+                    return null;
+                }
+                return new Integer(txId.toString());
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSActiveMQBrokerInTime", new Expression() {
+            public Object evaluate(Message message) {
+                return Long.valueOf(message.getBrokerInTime());
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSActiveMQBrokerOutTime", new Expression() {
+            public Object evaluate(Message message) {
+                return Long.valueOf(message.getBrokerOutTime());
+            }
+        });
+    }
+
+    public Filterable createFilterable() {
+        return new Filterable() {
+            public <T> T getBodyAs(Class<T> type) throws FilterException {
+                try {
+                    if( type == String.class ) {
+                        if ( Message.this instanceof ActiveMQTextMessage ) {
+                            return type.cast(((ActiveMQTextMessage)Message.this).getText());
+                        }
+                    }
+                    if( type == Buffer.class ) {
+                        if ( Message.this instanceof ActiveMQBytesMessage ) {
+                            ActiveMQBytesMessage bm = ((ActiveMQBytesMessage)Message.this);
+                            byte data[] = new byte[(int) bm.getBodyLength()];
+                            bm.readBytes(data);
+                            return type.cast(new Buffer(data));
+                        }
+                    }
+                    return null;
+                } catch (OpenwireException e) {
+                    throw new FilterException(e);
+                }
+            }
+
+            public Object getProperty(String name) {
+                Expression expression = JMS_PROPERTY_EXPRESSIONS.get(name);
+                if( expression != null ) {
+                    return expression.evaluate(Message.this);
+                }
+                try {
+                    return Message.this.getProperty(name);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    return null;
+                }
+            }
+
+            public Object getLocalConnectionId() {
+                // TODO:
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * @openwire:marshaller code="22"
+ * @version $Revision: 1.11 $
+ */
+public class MessageAck extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK;
+
+    /**
+     * Used to let the broker know that the message has been delivered to the
+     * client. Message will still be retained until an standard ack is received.
+     * This is used get the broker to send more messages past prefetch limits
+     * when an standard ack has not been sent.
+     */
+    public static final byte DELIVERED_ACK_TYPE = 0;
+
+    /**
+     * The standard ack case where a client wants the message to be discarded.
+     */
+    public static final byte STANDARD_ACK_TYPE = 2;
+
+    /**
+     * In case the client want's to explicitly let the broker know that a
+     * message was not processed and the message was considered a poison
+     * message.
+     */
+    public static final byte POSION_ACK_TYPE = 1;
+
+    /**
+     * In case the client want's to explicitly let the broker know that a
+     * message was not processed and it was re-delivered to the consumer
+     * but it was not yet considered to be a poison message.  The messageCount 
+     * field will hold the number of times the message was re-delivered. 
+     */
+    public static final byte REDELIVERED_ACK_TYPE = 3;
+    
+    /**
+     * The  ack case where a client wants only an individual message to be discarded.
+     */
+    public static final byte INDIVIDUAL_ACK_TYPE = 4;
+    
+    protected byte ackType;
+    protected ConsumerId consumerId;
+    protected MessageId firstMessageId;
+    protected MessageId lastMessageId;
+    protected ActiveMQDestination destination;
+    protected TransactionId transactionId;
+    protected int messageCount;
+
+    protected transient String consumerKey;
+
+    public MessageAck() {
+    }
+
+    public MessageAck(MessageDispatch md, byte ackType, int messageCount) {
+        this.ackType = ackType;
+        this.consumerId = md.getConsumerId();
+        this.destination = md.getDestination();
+        this.lastMessageId = md.getMessage().getMessageId();
+        this.messageCount = messageCount;
+    }
+
+    public void copy(MessageAck copy) {
+        super.copy(copy);
+        copy.firstMessageId = firstMessageId;
+        copy.lastMessageId = lastMessageId;
+        copy.destination = destination;
+        copy.transactionId = transactionId;
+        copy.ackType = ackType;
+        copy.consumerId = consumerId;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isMessageAck() {
+        return true;
+    }
+
+    public boolean isPoisonAck() {
+        return ackType == POSION_ACK_TYPE;
+    }
+
+    public boolean isStandardAck() {
+        return ackType == STANDARD_ACK_TYPE;
+    }
+
+    public boolean isDeliveredAck() {
+        return ackType == DELIVERED_ACK_TYPE;
+    }
+    
+    public boolean isRedeliveredAck() {
+        return ackType == REDELIVERED_ACK_TYPE;
+    }
+    
+    public boolean isIndividualAck() {
+        return ackType == INDIVIDUAL_ACK_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public TransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(TransactionId transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    public boolean isInTransaction() {
+        return transactionId != null;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(ConsumerId consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public byte getAckType() {
+        return ackType;
+    }
+
+    public void setAckType(byte ackType) {
+        this.ackType = ackType;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageId getFirstMessageId() {
+        return firstMessageId;
+    }
+
+    public void setFirstMessageId(MessageId firstMessageId) {
+        this.firstMessageId = firstMessageId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageId getLastMessageId() {
+        return lastMessageId;
+    }
+
+    public void setLastMessageId(MessageId lastMessageId) {
+        this.lastMessageId = lastMessageId;
+    }
+
+    /**
+     * The number of messages being acknowledged in the range.
+     * 
+     * @openwire:property version=1
+     */
+    public int getMessageCount() {
+        return messageCount;
+    }
+
+    public void setMessageCount(int messageCount) {
+        this.messageCount = messageCount;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processMessageAck(this);
+    }
+
+    /**
+     * A helper method to allow a single message ID to be acknowledged
+     */
+    public void setMessageID(MessageId messageID) {
+        setFirstMessageId(messageID);
+        setLastMessageId(messageID);
+        setMessageCount(1);
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * 
+ * @openwire:marshaller code="21"
+ */
+public class MessageDispatch extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH;
+
+    protected ConsumerId consumerId;
+    protected ActiveMQDestination destination;
+    protected Message message;
+    protected int redeliveryCounter;
+
+    protected transient long deliverySequenceId;
+    protected transient Object consumer;
+    protected transient Runnable transmitCallback;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isMessageDispatch() {
+        return true;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(ConsumerId consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public Message getMessage() {
+        return message;
+    }
+
+    public void setMessage(Message message) {
+        this.message = message;
+    }
+
+    public long getDeliverySequenceId() {
+        return deliverySequenceId;
+    }
+
+    public void setDeliverySequenceId(long deliverySequenceId) {
+        this.deliverySequenceId = deliverySequenceId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getRedeliveryCounter() {
+        return redeliveryCounter;
+    }
+
+    public void setRedeliveryCounter(int deliveryCounter) {
+        this.redeliveryCounter = deliveryCounter;
+    }
+
+    public Object getConsumer() {
+        return consumer;
+    }
+
+    public void setConsumer(Object consumer) {
+        this.consumer = consumer;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processMessageDispatch(this);
+    }
+
+    public Runnable getTransmitCallback() {
+        return transmitCallback;
+    }
+
+    public void setTransmitCallback(Runnable transmitCallback) {
+        this.transmitCallback = transmitCallback;
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * @openwire:marshaller code="90"
+ */
+public class MessageDispatchNotification extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH_NOTIFICATION;
+
+    protected ConsumerId consumerId;
+    protected ActiveMQDestination destination;
+    protected MessageId messageId;
+    protected long deliverySequenceId;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isMessageDispatchNotification() {
+        return true;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(ConsumerId consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+
+    public long getDeliverySequenceId() {
+        return deliverySequenceId;
+    }
+
+    public void setDeliverySequenceId(long deliverySequenceId) {
+        this.deliverySequenceId = deliverySequenceId;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processMessageDispatchNotification(this);
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageId getMessageId() {
+        return messageId;
+    }
+
+    public void setMessageId(MessageId messageId) {
+        this.messageId = messageId;
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageId.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageId.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageId.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * @openwire:marshaller code="110"
+ * @version $Revision: 1.12 $
+ */
+public class MessageId implements DataStructure, Comparable<MessageId> {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ID;
+
+    protected ProducerId producerId;
+    protected long producerSequenceId;
+    protected long brokerSequenceId;
+
+    private transient String key;
+    private transient int hashCode;
+
+    public MessageId() {
+        this.producerId = new ProducerId();
+    }
+
+    public MessageId(ProducerInfo producerInfo, long producerSequenceId) {
+        this.producerId = producerInfo.getProducerId();
+        this.producerSequenceId = producerSequenceId;
+    }
+
+    public MessageId(String messageKey) {
+        setValue(messageKey);
+    }
+
+    public MessageId(String producerId, long producerSequenceId) {
+        this(new ProducerId(producerId), producerSequenceId);
+    }
+
+    public MessageId(ProducerId producerId, long producerSequenceId) {
+        this.producerId = producerId;
+        this.producerSequenceId = producerSequenceId;
+    }
+
+    /**
+     * Sets the value as a String
+     */
+    public void setValue(String messageKey) {
+        key = messageKey;
+        // Parse off the sequenceId
+        int p = messageKey.lastIndexOf(":");
+        if (p >= 0) {
+            producerSequenceId = Long.parseLong(messageKey.substring(p + 1));
+            messageKey = messageKey.substring(0, p);
+        }
+        producerId = new ProducerId(messageKey);
+    }
+
+    /**
+     * Sets the transient text view of the message which will be ignored if the
+     * message is marshaled on a transport; so is only for in-JVM changes to
+     * accommodate foreign JMS message IDs
+     */
+    public void setTextView(String key) {
+        this.key = key;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != getClass()) {
+            return false;
+        }
+
+        MessageId id = (MessageId)o;
+        return producerSequenceId == id.producerSequenceId && producerId.equals(id.producerId);
+    }
+
+    public int hashCode() {
+        if (hashCode == 0) {
+            hashCode = producerId.hashCode() ^ (int)producerSequenceId;
+        }
+        return hashCode;
+    }
+
+    public String toString() {
+        if (key == null) {
+            key = producerId.toString() + ":" + producerSequenceId;
+        }
+        return key;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getProducerSequenceId() {
+        return producerSequenceId;
+    }
+
+    public void setProducerSequenceId(long producerSequenceId) {
+        this.producerSequenceId = producerSequenceId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getBrokerSequenceId() {
+        return brokerSequenceId;
+    }
+
+    public void setBrokerSequenceId(long brokerSequenceId) {
+        this.brokerSequenceId = brokerSequenceId;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public MessageId copy() {
+        MessageId copy = new MessageId(producerId, producerSequenceId);
+        copy.key = key;
+        copy.brokerSequenceId = brokerSequenceId;
+        return copy;
+    }
+
+    /**
+     * @param o
+     * @return
+     * @see java.lang.Comparable#compareTo(java.lang.Object)
+     */
+    public int compareTo(MessageId other) {
+        int result = -1;
+        if (other != null) {
+            result = this.toString().compareTo(other.toString());
+        }
+        return result;
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageId.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessagePull.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessagePull.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessagePull.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessagePull.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * Used to pull messages on demand.
+ * 
+ * @openwire:marshaller code="20"
+ * 
+ */
+public class MessagePull extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_PULL;
+
+    protected ConsumerId consumerId;
+    protected ActiveMQDestination destination;
+    protected long timeout;
+    private MessageId messageId;
+    private String correlationId;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
+        return visitor.processMessagePull(this);
+    }
+
+    /**
+     * Configures a message pull from the consumer information
+     */
+    public void configure(ConsumerInfo info) {
+        setConsumerId(info.getConsumerId());
+        setDestination(info.getDestination());
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(ConsumerId consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    /**
+     * An optional correlation ID which could be used by a broker to decide which messages are pulled
+     * on demand from a queue for a consumer
+     *
+     * @openwire:property version=3
+     */
+    public String getCorrelationId() {
+        return correlationId;
+    }
+
+    public void setCorrelationId(String correlationId) {
+        this.correlationId = correlationId;
+    }
+
+
+    /**
+     * An optional message ID which could be used by a broker to decide which messages are pulled
+     * on demand from a queue for a consumer
+     *
+     * @openwire:property version=3
+     */
+    public MessageId getMessageId() {
+        return messageId;
+    }
+
+    public void setMessageId(MessageId messageId) {
+        this.messageId = messageId;
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/NetworkBridgeFilter.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/NetworkBridgeFilter.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/NetworkBridgeFilter.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/NetworkBridgeFilter.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import java.util.Arrays;
+
+import org.apache.activemq.apollo.filter.BooleanExpression;
+import org.apache.activemq.apollo.filter.FilterException;
+import org.apache.activemq.apollo.filter.Filterable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @openwire:marshaller code="91"
+ * @version $Revision: 1.12 $
+ */
+public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
+    static final Log LOG = LogFactory.getLog(NetworkBridgeFilter.class);
+
+    private BrokerId networkBrokerId;
+    private int networkTTL;
+
+    public NetworkBridgeFilter() {
+    }
+
+    public NetworkBridgeFilter(BrokerId remoteBrokerPath, int networkTTL) {
+        this.networkBrokerId = remoteBrokerPath;
+        this.networkTTL = networkTTL;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public boolean matches(Filterable filterable) throws FilterException {
+        return matchesForwardingFilter((Message)filterable);
+    }
+
+    public Object evaluate(Filterable filterable) throws FilterException {
+        return matches(filterable) ? Boolean.TRUE : Boolean.FALSE;
+    }
+
+    protected boolean matchesForwardingFilter(Message message) {
+
+        if (contains(message.getBrokerPath(), networkBrokerId)) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Message all ready routed once through this broker ("
+                        + networkBrokerId + "), path: "
+                        + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
+            }
+            return false;
+        }
+
+        int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
+
+        if (hops >= networkTTL) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message);
+            }
+            return false;
+        }
+
+        // Don't propagate advisory messages about network subscriptions
+        if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
+            ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
+            hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
+            if (hops >= networkTTL) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
+                }
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
+        if (brokerPath != null && brokerId != null) {
+            for (int i = 0; i < brokerPath.length; i++) {
+                if (brokerId.equals(brokerPath[i])) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getNetworkTTL() {
+        return networkTTL;
+    }
+
+    public void setNetworkTTL(int networkTTL) {
+        this.networkTTL = networkTTL;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public BrokerId getNetworkBrokerId() {
+        return networkBrokerId;
+    }
+
+    public void setNetworkBrokerId(BrokerId remoteBrokerPath) {
+        this.networkBrokerId = remoteBrokerPath;
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/PartialCommand.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/PartialCommand.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/PartialCommand.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/PartialCommand.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * Represents a partial command; a large command that has been split up into
+ * pieces.
+ * 
+ * @openwire:marshaller code="60"
+ * @version $Revision: 920306 $
+ */
+public class PartialCommand implements Command {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PARTIAL_COMMAND;
+
+    private int commandId;
+    private byte[] data;
+
+    private transient Endpoint from;
+    private transient Endpoint to;
+
+    public PartialCommand() {
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getCommandId() {
+        return commandId;
+    }
+
+    public void setCommandId(int commandId) {
+        this.commandId = commandId;
+    }
+
+    /**
+     * The data for this part of the command
+     * 
+     * @openwire:property version=1 mandatory=true
+     */
+    public byte[] getData() {
+        return data;
+    }
+
+    public void setData(byte[] data) {
+        this.data = data;
+    }
+
+    public Endpoint getFrom() {
+        return from;
+    }
+
+    public void setFrom(Endpoint from) {
+        this.from = from;
+    }
+
+    public Endpoint getTo() {
+        return to;
+    }
+
+    public void setTo(Endpoint to) {
+        this.to = to;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        throw new IllegalStateException("The transport layer should filter out PartialCommand instances but received: " + this);
+    }
+
+    public boolean isResponseRequired() {
+        return false;
+    }
+
+    public boolean isResponse() {
+        return false;
+    }
+
+    public boolean isBrokerInfo() {
+        return false;
+    }
+
+    public boolean isMessageDispatch() {
+        return false;
+    }
+
+    public boolean isMessage() {
+        return false;
+    }
+
+    public boolean isMessageAck() {
+        return false;
+    }
+
+    public boolean isMessageDispatchNotification() {
+        return false;
+    }
+
+    public boolean isShutdownInfo() {
+        return false;
+    }
+    
+    public boolean isConnectionControl() {
+        return false;
+    }
+
+    public void setResponseRequired(boolean responseRequired) {
+    }
+
+    public boolean isWireFormatInfo() {
+        return false;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public String toString() {
+        int size = 0;
+        if (data != null) {
+            size = data.length;
+        }
+        return "PartialCommand[id: " + commandId + " data: " + size + " byte(s)]";
+    }   
+    
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerAck.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerAck.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerAck.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerAck.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * A ProducerAck command is sent by a broker to a producer to let it know it has
+ * received and processed messages that it has produced. The producer will be
+ * flow controlled if it does not receive ProducerAck commands back from the
+ * broker.
+ * 
+ * @openwire:marshaller code="19" version="3"
+ * @version $Revision: 1.11 $
+ */
+public class ProducerAck extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ACK;
+
+    protected ProducerId producerId;
+    protected int size;
+
+    public ProducerAck() {
+    }
+
+    public ProducerAck(ProducerId producerId, int size) {
+        this.producerId = producerId;
+        this.size = size;
+    }
+
+    public void copy(ProducerAck copy) {
+        super.copy(copy);
+        copy.producerId = producerId;
+        copy.size = size;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processProducerAck(this);
+    }
+
+    /**
+     * The producer id that this ack message is destined for.
+     * 
+     * @openwire:property version=3
+     */
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    /**
+     * The number of bytes that are being acked.
+     * 
+     * @openwire:property version=3
+     */
+    public int getSize() {
+        return size;
+    }
+
+    public void setSize(int size) {
+        this.size = size;
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * @openwire:marshaller code="123"
+ */
+public class ProducerId implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ID;
+
+    protected String connectionId;
+    protected long sessionId;
+    protected long value;
+
+    protected transient int hashCode;
+    protected transient String key;
+    protected transient SessionId parentId;
+
+    public ProducerId() {
+    }
+
+    public ProducerId(SessionId sessionId, long producerId) {
+        this.connectionId = sessionId.getConnectionId();
+        this.sessionId = sessionId.getValue();
+        this.value = producerId;
+    }
+
+    public ProducerId(ProducerId id) {
+        this.connectionId = id.getConnectionId();
+        this.sessionId = id.getSessionId();
+        this.value = id.getValue();
+    }
+
+    public ProducerId(String producerKey) {
+        // Parse off the producerId
+        int p = producerKey.lastIndexOf(":");
+        if (p >= 0) {
+            value = Long.parseLong(producerKey.substring(p + 1));
+            producerKey = producerKey.substring(0, p);
+        }
+        setProducerSessionKey(producerKey);
+    }
+
+    public SessionId getParentId() {
+        if (parentId == null) {
+            parentId = new SessionId(this);
+        }
+        return parentId;
+    }
+
+    public int hashCode() {
+        if (hashCode == 0) {
+            hashCode = connectionId.hashCode() ^ (int)sessionId ^ (int)value;
+        }
+        return hashCode;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != ProducerId.class) {
+            return false;
+        }
+        ProducerId id = (ProducerId)o;
+        return sessionId == id.sessionId && value == id.value && connectionId.equals(id.connectionId);
+    }
+
+    /**
+     * @param sessionKey
+     */
+    private void setProducerSessionKey(String sessionKey) {
+        // Parse off the value
+        int p = sessionKey.lastIndexOf(":");
+        if (p >= 0) {
+            sessionId = Long.parseLong(sessionKey.substring(p + 1));
+            sessionKey = sessionKey.substring(0, p);
+        }
+        // The rest is the value
+        connectionId = sessionKey;
+    }
+
+    public String toString() {
+        if (key == null) {
+            key = connectionId + ":" + sessionId + ":" + value;
+        }
+        return key;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public String getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(String connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getValue() {
+        return value;
+    }
+
+    public void setValue(long producerId) {
+        this.value = producerId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getSessionId() {
+        return sessionId;
+    }
+
+    public void setSessionId(long sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * 
+ * @openwire:marshaller code="6"
+ * @version $Revision: 1.13 $
+ */
+public class ProducerInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_INFO;
+
+    protected ProducerId producerId;
+    protected ActiveMQDestination destination;
+    protected BrokerId[] brokerPath;
+    protected boolean dispatchAsync;
+    protected int windowSize;
+
+    public ProducerInfo() {
+    }
+
+    public ProducerInfo(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    public ProducerInfo(SessionInfo sessionInfo, long producerId) {
+        this.producerId = new ProducerId(sessionInfo.getSessionId(), producerId);
+    }
+
+    public ProducerInfo copy() {
+        ProducerInfo info = new ProducerInfo();
+        copy(info);
+        return info;
+    }
+
+    public void copy(ProducerInfo info) {
+        super.copy(info);
+        info.producerId = producerId;
+        info.destination = destination;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    public RemoveInfo createRemoveCommand() {
+        RemoveInfo command = new RemoveInfo(getProducerId());
+        command.setResponseRequired(isResponseRequired());
+        return command;
+    }
+
+    /**
+     * The route of brokers the command has moved through.
+     * 
+     * @openwire:property version=1 cache=true
+     */
+    public BrokerId[] getBrokerPath() {
+        return brokerPath;
+    }
+
+    public void setBrokerPath(BrokerId[] brokerPath) {
+        this.brokerPath = brokerPath;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processAddProducer(this);
+    }
+
+    /**
+     * If the broker should dispatch messages from this producer async. Since
+     * sync dispatch could potentally block the producer thread, this could be
+     * an important setting for the producer.
+     * 
+     * @openwire:property version=2
+     */
+    public boolean isDispatchAsync() {
+        return dispatchAsync;
+    }
+
+    public void setDispatchAsync(boolean dispatchAsync) {
+        this.dispatchAsync = dispatchAsync;
+    }
+
+    /**
+     * Used to configure the producer window size. A producer will send up to
+     * the configured window size worth of payload data to the broker before
+     * waiting for an Ack that allows him to send more.
+     * 
+     * @openwire:property version=3
+     */
+    public int getWindowSize() {
+        return windowSize;
+    }
+
+    public void setWindowSize(int windowSize) {
+        this.windowSize = windowSize;
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import java.io.IOException;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * Removes a consumer, producer, session or connection.
+ * 
+ * @openwire:marshaller code="12"
+ */
+public class RemoveInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_INFO;
+
+    protected DataStructure objectId;
+    protected long lastDeliveredSequenceId;
+
+    public RemoveInfo() {
+    }
+
+    public RemoveInfo(DataStructure objectId) {
+        this.objectId = objectId;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public DataStructure getObjectId() {
+        return objectId;
+    }
+
+    public void setObjectId(DataStructure objectId) {
+        this.objectId = objectId;
+    }
+
+    /**
+     * @openwire:property version=5 cache=false
+     */
+    public long getLastDeliveredSequenceId() {
+        return lastDeliveredSequenceId;
+    }
+
+    public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
+        this.lastDeliveredSequenceId = lastDeliveredSequenceId;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        switch (objectId.getDataStructureType()) {
+        case ConnectionId.DATA_STRUCTURE_TYPE:
+            return visitor.processRemoveConnection(this, (ConnectionId)objectId, lastDeliveredSequenceId);
+        case SessionId.DATA_STRUCTURE_TYPE:
+            return visitor.processRemoveSession(this, (SessionId)objectId, lastDeliveredSequenceId);
+        case ConsumerId.DATA_STRUCTURE_TYPE:
+            return visitor.processRemoveConsumer(this, (ConsumerId)objectId, lastDeliveredSequenceId);
+        case ProducerId.DATA_STRUCTURE_TYPE:
+            return visitor.processRemoveProducer(this, (ProducerId)objectId);
+        default:
+            throw new IOException("Unknown remove command type: " + objectId.getDataStructureType());
+        }
+    }
+
+    /**
+     * Returns true if this event is for a removed connection
+     */
+    public boolean isConnectionRemove() {
+        return objectId.getDataStructureType() == ConnectionId.DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * Returns true if this event is for a removed session
+     */
+    public boolean isSessionRemove() {
+        return objectId.getDataStructureType() == SessionId.DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * Returns true if this event is for a removed consumer
+     */
+    public boolean isConsumerRemove() {
+        return objectId.getDataStructureType() == ConsumerId.DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * Returns true if this event is for a removed producer
+     */
+    public boolean isProducerRemove() {
+        return objectId.getDataStructureType() == ProducerId.DATA_STRUCTURE_TYPE;
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * @openwire:marshaller code="9"
+ * @version $Revision: 1.7 $
+ */
+public class RemoveSubscriptionInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_SUBSCRIPTION_INFO;
+
+    protected ConnectionId connectionId;
+    protected String clientId;
+    protected String subscriptionName;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(ConnectionId connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getSubscriptionName() {
+        return subscriptionName;
+    }
+
+    public void setSubscriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processRemoveSubscription(this);
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ReplayCommand.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ReplayCommand.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ReplayCommand.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ReplayCommand.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * A general purpose replay command for some kind of producer where ranges of
+ * messages are asked to be replayed. This command is typically used over a
+ * non-reliable transport such as UDP or multicast but could also be used on
+ * TCP/IP if a socket has been re-established.
+ * 
+ * @openwire:marshaller code="65"
+ */
+public class ReplayCommand extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REPLAY;
+
+    private String producerId;
+    private int firstAckNumber;
+    private int lastAckNumber;
+    private int firstNakNumber;
+    private int lastNakNumber;
+
+    public ReplayCommand() {
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public String getProducerId() {
+        return producerId;
+    }
+
+    /**
+     * Is used to uniquely identify the producer of the sequence
+     * 
+     * @openwire:property version=1 cache=false
+     */
+    public void setProducerId(String producerId) {
+        this.producerId = producerId;
+    }
+
+    public int getFirstAckNumber() {
+        return firstAckNumber;
+    }
+
+    /**
+     * Is used to specify the first sequence number being acknowledged as delivered on the transport
+     * so that it can be removed from cache
+     * 
+     * @openwire:property version=1
+     */
+    public void setFirstAckNumber(int firstSequenceNumber) {
+        this.firstAckNumber = firstSequenceNumber;
+    }
+
+    public int getLastAckNumber() {
+        return lastAckNumber;
+    }
+
+    /**
+     * Is used to specify the last sequence number being acknowledged as delivered on the transport
+     * so that it can be removed from cache
+     * 
+     * @openwire:property version=1
+     */
+    public void setLastAckNumber(int lastSequenceNumber) {
+        this.lastAckNumber = lastSequenceNumber;
+    }
+
+    public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
+        return null;
+    }
+
+    /**
+     * Is used to specify the first sequence number to be replayed
+     * 
+     * @openwire:property version=1
+     */
+    public int getFirstNakNumber() {
+        return firstNakNumber;
+    }
+
+    public void setFirstNakNumber(int firstNakNumber) {
+        this.firstNakNumber = firstNakNumber;
+    }
+
+    /**
+     * Is used to specify the last sequence number to be replayed
+     * 
+     * @openwire:property version=1
+     */
+    public int getLastNakNumber() {
+        return lastNakNumber;
+    }
+
+    public void setLastNakNumber(int lastNakNumber) {
+        this.lastNakNumber = lastNakNumber;
+    }
+
+    public String toString() {
+        return "ReplayCommand {commandId = " + getCommandId() + ", firstNakNumber = " + getFirstNakNumber() + ", lastNakNumber = " + getLastNakNumber() + "}";
+    }
+    
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * @openwire:marshaller code="30"
+ * @version $Revision: 1.6 $
+ */
+public class Response extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.RESPONSE;
+    int correlationId;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getCorrelationId() {
+        return correlationId;
+    }
+
+    public void setCorrelationId(int responseId) {
+        this.correlationId = responseId;
+    }
+
+    public boolean isResponse() {
+        return true;
+    }
+
+    public boolean isException() {
+        return false;
+    }
+
+    public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
+        return null;
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java
------------------------------------------------------------------------------
    svn:executable = *