You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/04/27 19:33:09 UTC

svn commit: r1097189 [40/42] - in /activemq/activemq-apollo/trunk: ./ apollo-openwire/ apollo-openwire/src/ apollo-openwire/src/main/ apollo-openwire/src/main/resources/ apollo-openwire/src/main/resources/META-INF/ apollo-openwire/src/main/resources/ME...

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * 
+ * @openwire:marshaller code="121"
+ */
+public class SessionId implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_ID;
+
+    protected String connectionId;
+    protected long value;
+
+    protected transient int hashCode;
+    protected transient String key;
+    protected transient ConnectionId parentId;
+
+    public SessionId() {
+    }
+
+    public SessionId(ConnectionId connectionId, long sessionId) {
+        this.connectionId = connectionId.getValue();
+        this.value = sessionId;
+    }
+
+    public SessionId(SessionId id) {
+        this.connectionId = id.getConnectionId();
+        this.value = id.getValue();
+    }
+
+    public SessionId(ProducerId id) {
+        this.connectionId = id.getConnectionId();
+        this.value = id.getSessionId();
+    }
+
+    public SessionId(ConsumerId id) {
+        this.connectionId = id.getConnectionId();
+        this.value = id.getSessionId();
+    }
+
+    public ConnectionId getParentId() {
+        if (parentId == null) {
+            parentId = new ConnectionId(this);
+        }
+        return parentId;
+    }
+
+    public int hashCode() {
+        if (hashCode == 0) {
+            hashCode = connectionId.hashCode() ^ (int)value;
+        }
+        return hashCode;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != SessionId.class) {
+            return false;
+        }
+        SessionId id = (SessionId)o;
+        return value == id.value && connectionId.equals(id.connectionId);
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public String getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(String connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getValue() {
+        return value;
+    }
+
+    public void setValue(long sessionId) {
+        this.value = sessionId;
+    }
+
+    public String toString() {
+        if (key == null) {
+            key = connectionId + ":" + value;
+        }
+        return key;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * 
+ * @openwire:marshaller code="4"
+ * @version $Revision: 1.13 $
+ */
+public class SessionInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_INFO;
+
+    protected SessionId sessionId;
+
+    public SessionInfo() {
+        sessionId = new SessionId();
+    }
+
+    public SessionInfo(ConnectionInfo connectionInfo, long sessionId) {
+        this.sessionId = new SessionId(connectionInfo.getConnectionId(), sessionId);
+    }
+
+    public SessionInfo(SessionId sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public SessionId getSessionId() {
+        return sessionId;
+    }
+
+    public void setSessionId(SessionId sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    public RemoveInfo createRemoveCommand() {
+        RemoveInfo command = new RemoveInfo(getSessionId());
+        command.setResponseRequired(isResponseRequired());
+        return command;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processAddSession(this);
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * 
+ * @openwire:marshaller code="11"
+ */
+public class ShutdownInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SHUTDOWN_INFO;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processShutdown(this);
+    }
+
+    public boolean isShutdownInfo() {
+        return true;
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+
+import org.apache.activemq.apollo.util.IntrospectionSupport;
+
+/**
+ * Used to represent a durable subscription.
+ * 
+ * @openwire:marshaller code="55"
+ * @version $Revision: 1.6 $
+ */
+public class SubscriptionInfo implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.DURABLE_SUBSCRIPTION_INFO;
+
+    protected ActiveMQDestination subscribedDestination;
+    protected ActiveMQDestination destination;
+    protected String clientId;
+    protected String subscriptionName;
+    protected String selector;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    /**
+     * This is the a resolved destination that the subscription is receiving
+     * messages from. This will never be a pattern or a composite destination.
+     * 
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getSelector() {
+        return selector;
+    }
+
+    public void setSelector(String selector) {
+        this.selector = selector;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getSubscriptionName() {
+        return subscriptionName;
+    }
+
+    public void setSubscriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public String toString() {
+        return IntrospectionSupport.toString(this);
+    }
+
+    public int hashCode() {
+        int h1 = clientId != null ? clientId.hashCode() : -1;
+        int h2 = subscriptionName != null ? subscriptionName.hashCode() : -1;
+        return h1 ^ h2;
+    }
+
+    public boolean equals(Object obj) {
+        boolean result = false;
+        if (obj instanceof SubscriptionInfo) {
+            SubscriptionInfo other = (SubscriptionInfo)obj;
+            result = (clientId == null && other.clientId == null || clientId != null
+                                                                    && other.clientId != null
+                                                                    && clientId.equals(other.clientId))
+                     && (subscriptionName == null && other.subscriptionName == null || subscriptionName != null
+                                                                                       && other.subscriptionName != null
+                                                                                       && subscriptionName
+                                                                                           .equals(other.subscriptionName));
+        }
+        return result;
+    }
+
+    /**
+     * The destination the client originally subscribed to.. This may not match
+     * the {@see getDestination} method if the subscribed destination uses
+     * patterns or composites.
+     * 
+     * If the subscribed destinationis not set, this just ruturns the
+     * desitination.
+     * 
+     * @openwire:property version=3
+     */
+    public ActiveMQDestination getSubscribedDestination() {
+        if (subscribedDestination == null) {
+            return getDestination();
+        }
+        return subscribedDestination;
+    }
+
+    public void setSubscribedDestination(ActiveMQDestination subscribedDestination) {
+        this.subscribedDestination = subscribedDestination;
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionId.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionId.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionId.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * @openwire:marshaller
+ * @version $Revision: 1.6 $
+ */
+public abstract class TransactionId implements DataStructure {
+
+    public abstract boolean isXATransaction();
+    public abstract boolean isLocalTransaction();
+    public abstract String getTransactionKey();
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionId.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import java.io.IOException;
+
+/**
+ * 
+ * @openwire:marshaller code="7"
+ * @version $Revision: 1.10 $
+ */
+public class TransactionInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.TRANSACTION_INFO;
+
+    public static final byte BEGIN = 0;
+    public static final byte PREPARE = 1;
+    public static final byte COMMIT_ONE_PHASE = 2;
+    public static final byte COMMIT_TWO_PHASE = 3;
+    public static final byte ROLLBACK = 4;
+    public static final byte RECOVER = 5;
+    public static final byte FORGET = 6;
+    public static final byte END = 7;
+
+    protected byte type;
+    protected ConnectionId connectionId;
+    protected TransactionId transactionId;
+
+    public TransactionInfo() {
+    }
+
+    public TransactionInfo(ConnectionId connectionId, TransactionId transactionId, byte type) {
+        this.connectionId = connectionId;
+        this.transactionId = transactionId;
+        this.type = type;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(ConnectionId connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public TransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(TransactionId transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public byte getType() {
+        return type;
+    }
+
+    public void setType(byte type) {
+        this.type = type;
+    }
+
+    public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
+        switch (type) {
+        case TransactionInfo.BEGIN:
+            return visitor.processBeginTransaction(this);
+        case TransactionInfo.END:
+            return visitor.processEndTransaction(this);
+        case TransactionInfo.PREPARE:
+            return visitor.processPrepareTransaction(this);
+        case TransactionInfo.COMMIT_ONE_PHASE:
+            return visitor.processCommitTransactionOnePhase(this);
+        case TransactionInfo.COMMIT_TWO_PHASE:
+            return visitor.processCommitTransactionTwoPhase(this);
+        case TransactionInfo.ROLLBACK:
+            return visitor.processRollbackTransaction(this);
+        case TransactionInfo.RECOVER:
+            return visitor.processRecoverTransactions(this);
+        case TransactionInfo.FORGET:
+            return visitor.processForgetTransaction(this);
+        default:
+            throw new IOException("Transaction info type unknown: " + type);
+        }
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/WireFormatInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/WireFormatInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/WireFormatInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/WireFormatInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.apollo.openwire.codec.OpenWireFormat;
+import org.apache.activemq.apollo.openwire.support.MarshallingSupport;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+/**
+ * @openwire:marshaller code="1"
+ */
+public class WireFormatInfo implements Command, MarshallAware {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.WIREFORMAT_INFO;
+    private static final int MAX_PROPERTY_SIZE = 1024 * 4;
+    private static final byte MAGIC[] = new byte[] {'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q'};
+
+    protected byte magic[] = MAGIC;
+    protected int version;
+    protected Buffer marshalledProperties;
+
+    protected transient Map<String, Object> properties;
+    private transient Endpoint from;
+    private transient Endpoint to;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isWireFormatInfo() {
+        return true;
+    }
+
+    public boolean isMarshallAware() {
+        return true;
+    }
+
+    /**
+     * @openwire:property version=1 size=8 testSize=-1
+     */
+    public byte[] getMagic() {
+        return magic;
+    }
+
+    public void setMagic(byte[] magic) {
+        this.magic = magic;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public Buffer getMarshalledProperties() {
+        return marshalledProperties;
+    }
+
+    public void setMarshalledProperties(Buffer marshalledProperties) {
+        this.marshalledProperties = marshalledProperties;
+    }
+
+    /**
+     * The endpoint within the transport where this message came from.
+     */
+    public Endpoint getFrom() {
+        return from;
+    }
+
+    public void setFrom(Endpoint from) {
+        this.from = from;
+    }
+
+    /**
+     * The endpoint within the transport where this message is going to - null
+     * means all endpoints.
+     */
+    public Endpoint getTo() {
+        return to;
+    }
+
+    public void setTo(Endpoint to) {
+        this.to = to;
+    }
+
+    // ////////////////////
+    // 
+    // Implementation Methods.
+    //
+    // ////////////////////
+
+    public Object getProperty(String name) throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                return null;
+            }
+            properties = unmarsallProperties(marshalledProperties);
+        }
+        return properties.get(name);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<String, Object> getProperties() throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                return Collections.EMPTY_MAP;
+            }
+            properties = unmarsallProperties(marshalledProperties);
+        }
+        return Collections.unmodifiableMap(properties);
+    }
+
+    public void clearProperties() {
+        marshalledProperties = null;
+        properties = null;
+    }
+
+    public void setProperty(String name, Object value) throws IOException {
+        lazyCreateProperties();
+        properties.put(name, value);
+    }
+
+    protected void lazyCreateProperties() throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                properties = new HashMap<String, Object>();
+            } else {
+                properties = unmarsallProperties(marshalledProperties);
+                marshalledProperties = null;
+            }
+        }
+    }
+
+    private Map<String, Object> unmarsallProperties(Buffer marshalledProperties) throws IOException {
+        return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)), MAX_PROPERTY_SIZE);
+    }
+
+    public void beforeMarshall(OpenWireFormat wireFormat) throws IOException {
+        // Need to marshal the properties.
+        if (marshalledProperties == null && properties != null) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputStream os = new DataOutputStream(baos);
+            MarshallingSupport.marshalPrimitiveMap(properties, os);
+            os.close();
+            marshalledProperties = baos.toBuffer();
+        }
+    }
+
+    public void afterMarshall(OpenWireFormat wireFormat) throws IOException {
+    }
+
+    public void beforeUnmarshall(OpenWireFormat wireFormat) throws IOException {
+    }
+
+    public void afterUnmarshall(OpenWireFormat wireFormat) throws IOException {
+    }
+
+    public boolean isValid() {
+        return magic != null && Arrays.equals(magic, MAGIC);
+    }
+
+    public void setResponseRequired(boolean responseRequired) {
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isCacheEnabled() throws IOException {
+        return Boolean.TRUE == getProperty("CacheEnabled");
+    }
+
+    public void setCacheEnabled(boolean cacheEnabled) throws IOException {
+        setProperty("CacheEnabled", cacheEnabled ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isStackTraceEnabled() throws IOException {
+        return Boolean.TRUE == getProperty("StackTraceEnabled");
+    }
+
+    public void setStackTraceEnabled(boolean stackTraceEnabled) throws IOException {
+        setProperty("StackTraceEnabled", stackTraceEnabled ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isTcpNoDelayEnabled() throws IOException {
+        return Boolean.TRUE == getProperty("TcpNoDelayEnabled");
+    }
+
+    public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) throws IOException {
+        setProperty("TcpNoDelayEnabled", tcpNoDelayEnabled ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isSizePrefixDisabled() throws IOException {
+        return Boolean.TRUE == getProperty("SizePrefixDisabled");
+    }
+
+    public void setSizePrefixDisabled(boolean prefixPacketSize) throws IOException {
+        setProperty("SizePrefixDisabled", prefixPacketSize ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isTightEncodingEnabled() throws IOException {
+        return Boolean.TRUE == getProperty("TightEncodingEnabled");
+    }
+
+    public void setTightEncodingEnabled(boolean tightEncodingEnabled) throws IOException {
+        setProperty("TightEncodingEnabled", tightEncodingEnabled ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public long getMaxInactivityDuration() throws IOException {
+        Long l = (Long)getProperty("MaxInactivityDuration");
+        return l == null ? 0 : l.longValue();
+    }
+
+    public void setMaxInactivityDuration(long maxInactivityDuration) throws IOException {
+        setProperty("MaxInactivityDuration", new Long(maxInactivityDuration));
+    }
+    
+    public long getMaxInactivityDurationInitalDelay() throws IOException {
+        Long l = (Long)getProperty("MaxInactivityDurationInitalDelay");
+        return l == null ? 0 : l.longValue();
+    }
+
+    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) throws IOException {
+        setProperty("MaxInactivityDurationInitalDelay", new Long(maxInactivityDurationInitalDelay));
+    }
+    
+   
+
+    /**
+     * @throws IOException
+     */
+    public int getCacheSize() throws IOException {
+        Integer i = (Integer)getProperty("CacheSize");
+        return i == null ? 0 : i.intValue();
+    }
+
+    public void setCacheSize(int cacheSize) throws IOException {
+        setProperty("CacheSize", new Integer(cacheSize));
+    }
+
+    public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
+        return visitor.processWireFormat(this);
+    }
+
+    public String toString() {
+        Map<String, Object> p = null;
+        try {
+            p = getProperties();
+        } catch (IOException ignore) {
+        }
+        return "WireFormatInfo { version=" + version + ", properties=" + p + ", magic=" + toString(magic) + "}";
+    }
+
+    private String toString(byte[] data) {
+        StringBuffer sb = new StringBuffer();
+        sb.append('[');
+        for (int i = 0; i < data.length; i++) {
+            if (i != 0) {
+                sb.append(',');
+            }
+            sb.append((char)data[i]);
+        }
+        sb.append(']');
+        return sb.toString();
+    }
+
+    // /////////////////////////////////////////////////////////////
+    //
+    // This are not implemented.
+    //
+    // /////////////////////////////////////////////////////////////
+
+    public void setCommandId(int value) {
+    }
+
+    public int getCommandId() {
+        return 0;
+    }
+
+    public boolean isResponseRequired() {
+        return false;
+    }
+
+    public boolean isResponse() {
+        return false;
+    }
+
+    public boolean isBrokerInfo() {
+        return false;
+    }
+
+    public boolean isMessageDispatch() {
+        return false;
+    }
+
+    public boolean isMessage() {
+        return false;
+    }
+
+    public boolean isMessageAck() {
+        return false;
+    }
+
+    public boolean isMessageDispatchNotification() {
+        return false;
+    }
+
+    public boolean isShutdownInfo() {
+        return false;
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/XATransactionId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/XATransactionId.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/XATransactionId.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/XATransactionId.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.HexSupport;
+
+import javax.transaction.xa.Xid;
+import java.util.Arrays;
+
+/**
+ * @openwire:marshaller code="112"
+ * @version $Revision: 1.6 $
+ */
+public class XATransactionId extends TransactionId implements Xid, Comparable {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_XA_TRANSACTION_ID;
+
+    private int formatId;
+    private byte[] branchQualifier;
+    private byte[] globalTransactionId;
+
+    private transient int hash;
+    private transient String transactionKey;
+
+    public XATransactionId() {
+    }
+
+    public XATransactionId(Xid xid) {
+        this.formatId = xid.getFormatId();
+        this.globalTransactionId = xid.getGlobalTransactionId();
+        this.branchQualifier = xid.getBranchQualifier();
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public synchronized String getTransactionKey() {
+        if (transactionKey == null) {
+            transactionKey = "XID:" + formatId + ":" + HexSupport.toHexFromBuffer(new Buffer(globalTransactionId)) + ":"
+                             + HexSupport.toHexFromBuffer(new Buffer(branchQualifier));
+        }
+        return transactionKey;
+    }
+
+    public String toString() {
+        return getTransactionKey();
+    }
+
+    public boolean isXATransaction() {
+        return true;
+    }
+
+    public boolean isLocalTransaction() {
+        return false;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getFormatId() {
+        return formatId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public byte[] getGlobalTransactionId() {
+        return globalTransactionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public byte[] getBranchQualifier() {
+        return branchQualifier;
+    }
+
+    public void setBranchQualifier(byte[] branchQualifier) {
+        this.branchQualifier = branchQualifier;
+        this.hash = 0;
+    }
+
+    public void setFormatId(int formatId) {
+        this.formatId = formatId;
+        this.hash = 0;
+    }
+
+    public void setGlobalTransactionId(byte[] globalTransactionId) {
+        this.globalTransactionId = globalTransactionId;
+        this.hash = 0;
+    }
+
+    public int hashCode() {
+        if (hash == 0) {
+            hash = formatId;
+            hash = hash(globalTransactionId, hash);
+            hash = hash(branchQualifier, hash);
+            if (hash == 0) {
+                hash = 0xaceace;
+            }
+        }
+        return hash;
+    }
+
+    private static int hash(byte[] bytes, int hash) {
+        int size = bytes.length;
+        for (int i = 0; i < size; i++) {
+            hash ^= bytes[i] << ((i % 4) * 8);
+        }
+        return hash;
+    }
+
+    public boolean equals(Object o) {
+        if (o == null || o.getClass() != XATransactionId.class) {
+            return false;
+        }
+        XATransactionId xid = (XATransactionId)o;
+        return xid.formatId == formatId && Arrays.equals(xid.globalTransactionId, globalTransactionId)
+               && Arrays.equals(xid.branchQualifier, branchQualifier);
+    }
+
+    public int compareTo(Object o) {
+        if (o == null || o.getClass() != XATransactionId.class) {
+            return -1;
+        }
+        XATransactionId xid = (XATransactionId)o;
+        return getTransactionKey().compareTo(xid.getTransactionKey());
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/XATransactionId.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/package.html?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/package.html (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/package.html Wed Apr 27 17:32:51 2011
@@ -0,0 +1,27 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+Command objects used via the Command Pattern to communicate among nodes
+</p>
+
+</body>
+</html>

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/dto/OpenwireConnectionStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/dto/OpenwireConnectionStatusDTO.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/dto/OpenwireConnectionStatusDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/dto/OpenwireConnectionStatusDTO.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.dto;
+
+import org.apache.activemq.apollo.dto.ConnectionStatusDTO;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="openwire_connection_status")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class OpenwireConnectionStatusDTO extends ConnectionStatusDTO {
+
+    /**
+     * The version of the STOMP protocol being used.
+     */
+	@XmlAttribute
+	public int protocol_version;
+
+    /**
+     * The connected user
+     */
+	@XmlAttribute
+	public String user;
+
+    /**
+     * What the connection is currently waiting on
+     */
+    @XmlAttribute(name="waiting-on")
+	public String waiting_on;
+
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/MarshallingSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/MarshallingSupport.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/MarshallingSupport.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/MarshallingSupport.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+
+/**
+ * The fixed version of the UTF8 encoding function. Some older JVM's UTF8
+ * encoding function breaks when handling large strings.
+ * 
+ */
+public final class MarshallingSupport {
+
+    public static final byte NULL = 0;
+    public static final byte BOOLEAN_TYPE = 1;
+    public static final byte BYTE_TYPE = 2;
+    public static final byte CHAR_TYPE = 3;
+    public static final byte SHORT_TYPE = 4;
+    public static final byte INTEGER_TYPE = 5;
+    public static final byte LONG_TYPE = 6;
+    public static final byte DOUBLE_TYPE = 7;
+    public static final byte FLOAT_TYPE = 8;
+    public static final byte STRING_TYPE = 9;
+    public static final byte BYTE_ARRAY_TYPE = 10;
+    public static final byte MAP_TYPE = 11;
+    public static final byte LIST_TYPE = 12;
+    public static final byte BIG_STRING_TYPE = 13;
+
+    private MarshallingSupport() {
+    }
+    
+    public static void marshalPrimitiveMap(Map map, DataOutputStream out) throws IOException {
+        if (map == null) {
+            out.writeInt(-1);
+        } else {
+            out.writeInt(map.size());
+            for (Iterator iter = map.keySet().iterator(); iter.hasNext();) {
+                String name = (String)iter.next();
+                out.writeUTF(name);
+                Object value = map.get(name);
+                marshalPrimitive(out, value);
+            }
+        }
+    }
+
+    public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in) throws IOException {
+        return unmarshalPrimitiveMap(in, Integer.MAX_VALUE);
+    }
+
+    /**
+     * @param in
+     * @return
+     * @throws IOException
+     * @throws IOException
+     */
+    public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in, int maxPropertySize) throws IOException {
+        int size = in.readInt();
+        if (size > maxPropertySize) {
+            throw new IOException("Primitive map is larger than the allowed size: " + size);
+        }
+        if (size < 0) {
+            return null;
+        } else {
+            Map<String, Object> rc = new HashMap<String, Object>(size);
+            for (int i = 0; i < size; i++) {
+                String name = in.readUTF();
+                rc.put(name, unmarshalPrimitive(in));
+            }
+            return rc;
+        }
+
+    }
+
+    public static void marshalPrimitiveList(List list, DataOutputStream out) throws IOException {
+        out.writeInt(list.size());
+        for (Iterator iter = list.iterator(); iter.hasNext();) {
+            Object element = (Object)iter.next();
+            marshalPrimitive(out, element);
+        }
+    }
+
+    public static List<Object> unmarshalPrimitiveList(DataInputStream in) throws IOException {
+        int size = in.readInt();
+        List<Object> answer = new ArrayList<Object>(size);
+        while (size-- > 0) {
+            answer.add(unmarshalPrimitive(in));
+        }
+        return answer;
+    }
+
+    public static void marshalPrimitive(DataOutputStream out, Object value) throws IOException {
+        if (value == null) {
+            marshalNull(out);
+        } else if (value.getClass() == Boolean.class) {
+            marshalBoolean(out, ((Boolean)value).booleanValue());
+        } else if (value.getClass() == Byte.class) {
+            marshalByte(out, ((Byte)value).byteValue());
+        } else if (value.getClass() == Character.class) {
+            marshalChar(out, ((Character)value).charValue());
+        } else if (value.getClass() == Short.class) {
+            marshalShort(out, ((Short)value).shortValue());
+        } else if (value.getClass() == Integer.class) {
+            marshalInt(out, ((Integer)value).intValue());
+        } else if (value.getClass() == Long.class) {
+            marshalLong(out, ((Long)value).longValue());
+        } else if (value.getClass() == Float.class) {
+            marshalFloat(out, ((Float)value).floatValue());
+        } else if (value.getClass() == Double.class) {
+            marshalDouble(out, ((Double)value).doubleValue());
+        } else if (value.getClass() == byte[].class) {
+            marshalByteArray(out, (byte[])value);
+        } else if (value.getClass() == String.class) {
+            marshalString(out, (String)value);
+        } else if (value instanceof Map) {
+            out.writeByte(MAP_TYPE);
+            marshalPrimitiveMap((Map)value, out);
+        } else if (value instanceof List) {
+            out.writeByte(LIST_TYPE);
+            marshalPrimitiveList((List)value, out);
+        } else {
+            throw new IOException("Object is not a primitive: " + value);
+        }
+    }
+
+    public static Object unmarshalPrimitive(DataInputStream in) throws IOException {
+        Object value = null;
+        byte type = in.readByte();
+        switch (type) {
+        case BYTE_TYPE:
+            value = Byte.valueOf(in.readByte());
+            break;
+        case BOOLEAN_TYPE:
+            value = in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+            break;
+        case CHAR_TYPE:
+            value = Character.valueOf(in.readChar());
+            break;
+        case SHORT_TYPE:
+            value = Short.valueOf(in.readShort());
+            break;
+        case INTEGER_TYPE:
+            value = Integer.valueOf(in.readInt());
+            break;
+        case LONG_TYPE:
+            value = Long.valueOf(in.readLong());
+            break;
+        case FLOAT_TYPE:
+            value = new Float(in.readFloat());
+            break;
+        case DOUBLE_TYPE:
+            value = new Double(in.readDouble());
+            break;
+        case BYTE_ARRAY_TYPE:
+            value = new byte[in.readInt()];
+            in.readFully((byte[])value);
+            break;
+        case STRING_TYPE:
+            value = in.readUTF();
+            break;
+        case BIG_STRING_TYPE:
+            value = readUTF8(in);
+            break;
+        case MAP_TYPE:
+            value = unmarshalPrimitiveMap(in);
+            break;
+        case LIST_TYPE:
+            value = unmarshalPrimitiveList(in);
+            break;
+        case NULL:
+            value = null;
+            break;
+        default:
+            throw new IOException("Unknown primitive type: " + type);
+        }
+        return value;
+    }
+
+    public static void marshalNull(DataOutputStream out) throws IOException {
+        out.writeByte(NULL);
+    }
+
+    public static void marshalBoolean(DataOutputStream out, boolean value) throws IOException {
+        out.writeByte(BOOLEAN_TYPE);
+        out.writeBoolean(value);
+    }
+
+    public static void marshalByte(DataOutputStream out, byte value) throws IOException {
+        out.writeByte(BYTE_TYPE);
+        out.writeByte(value);
+    }
+
+    public static void marshalChar(DataOutputStream out, char value) throws IOException {
+        out.writeByte(CHAR_TYPE);
+        out.writeChar(value);
+    }
+
+    public static void marshalShort(DataOutputStream out, short value) throws IOException {
+        out.writeByte(SHORT_TYPE);
+        out.writeShort(value);
+    }
+
+    public static void marshalInt(DataOutputStream out, int value) throws IOException {
+        out.writeByte(INTEGER_TYPE);
+        out.writeInt(value);
+    }
+
+    public static void marshalLong(DataOutputStream out, long value) throws IOException {
+        out.writeByte(LONG_TYPE);
+        out.writeLong(value);
+    }
+
+    public static void marshalFloat(DataOutputStream out, float value) throws IOException {
+        out.writeByte(FLOAT_TYPE);
+        out.writeFloat(value);
+    }
+
+    public static void marshalDouble(DataOutputStream out, double value) throws IOException {
+        out.writeByte(DOUBLE_TYPE);
+        out.writeDouble(value);
+    }
+
+    public static void marshalByteArray(DataOutputStream out, byte[] value) throws IOException {
+        marshalByteArray(out, value, 0, value.length);
+    }
+
+    public static void marshalByteArray(DataOutputStream out, byte[] value, int offset, int length) throws IOException {
+        out.writeByte(BYTE_ARRAY_TYPE);
+        out.writeInt(length);
+        out.write(value, offset, length);
+    }
+
+    public static void marshalString(DataOutputStream out, String s) throws IOException {
+        // If it's too big, out.writeUTF may not able able to write it out.
+        if (s.length() < Short.MAX_VALUE / 4) {
+            out.writeByte(STRING_TYPE);
+            out.writeUTF(s);
+        } else {
+            out.writeByte(BIG_STRING_TYPE);
+            writeUTF8(out, s);
+        }
+    }
+
+    public static void writeUTF8(DataOutput dataOut, String text) throws IOException {
+        if (text != null) {
+            int strlen = text.length();
+            int utflen = 0;
+            char[] charr = new char[strlen];
+            int c = 0;
+            int count = 0;
+
+            text.getChars(0, strlen, charr, 0);
+
+            for (int i = 0; i < strlen; i++) {
+                c = charr[i];
+                if ((c >= 0x0001) && (c <= 0x007F)) {
+                    utflen++;
+                } else if (c > 0x07FF) {
+                    utflen += 3;
+                } else {
+                    utflen += 2;
+                }
+            }
+            // TODO diff: Sun code - removed
+            byte[] bytearr = new byte[utflen + 4]; // TODO diff: Sun code
+            bytearr[count++] = (byte)((utflen >>> 24) & 0xFF); // TODO diff:
+            // Sun code
+            bytearr[count++] = (byte)((utflen >>> 16) & 0xFF); // TODO diff:
+            // Sun code
+            bytearr[count++] = (byte)((utflen >>> 8) & 0xFF);
+            bytearr[count++] = (byte)((utflen >>> 0) & 0xFF);
+            for (int i = 0; i < strlen; i++) {
+                c = charr[i];
+                if ((c >= 0x0001) && (c <= 0x007F)) {
+                    bytearr[count++] = (byte)c;
+                } else if (c > 0x07FF) {
+                    bytearr[count++] = (byte)(0xE0 | ((c >> 12) & 0x0F));
+                    bytearr[count++] = (byte)(0x80 | ((c >> 6) & 0x3F));
+                    bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+                } else {
+                    bytearr[count++] = (byte)(0xC0 | ((c >> 6) & 0x1F));
+                    bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+                }
+            }
+            dataOut.write(bytearr);
+
+        } else {
+            dataOut.writeInt(-1);
+        }
+    }
+
+    public static String readUTF8(DataInput dataIn) throws IOException {
+        int utflen = dataIn.readInt(); // TODO diff: Sun code
+        if (utflen > -1) {
+            StringBuffer str = new StringBuffer(utflen);
+            byte bytearr[] = new byte[utflen];
+            int c;
+            int char2;
+            int char3;
+            int count = 0;
+
+            dataIn.readFully(bytearr, 0, utflen);
+
+            while (count < utflen) {
+                c = bytearr[count] & 0xff;
+                switch (c >> 4) {
+                case 0:
+                case 1:
+                case 2:
+                case 3:
+                case 4:
+                case 5:
+                case 6:
+                case 7:
+                    /* 0xxxxxxx */
+                    count++;
+                    str.append((char)c);
+                    break;
+                case 12:
+                case 13:
+                    /* 110x xxxx 10xx xxxx */
+                    count += 2;
+                    if (count > utflen) {
+                        throw new UTFDataFormatException();
+                    }
+                    char2 = bytearr[count - 1];
+                    if ((char2 & 0xC0) != 0x80) {
+                        throw new UTFDataFormatException();
+                    }
+                    str.append((char)(((c & 0x1F) << 6) | (char2 & 0x3F)));
+                    break;
+                case 14:
+                    /* 1110 xxxx 10xx xxxx 10xx xxxx */
+                    count += 3;
+                    if (count > utflen) {
+                        throw new UTFDataFormatException();
+                    }
+                    char2 = bytearr[count - 2]; // TODO diff: Sun code
+                    char3 = bytearr[count - 1]; // TODO diff: Sun code
+                    if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+                        throw new UTFDataFormatException();
+                    }
+                    str.append((char)(((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)));
+                    break;
+                default:
+                    /* 10xx xxxx, 1111 xxxx */
+                    throw new UTFDataFormatException();
+                }
+            }
+            // The number of chars produced may be less than utflen
+            return new String(str);
+        } else {
+            return null;
+        }
+    }
+
+    public static String propertiesToString(Properties props) throws IOException {
+        String result = "";
+        if (props != null) {
+            DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream();
+            props.store(dataOut, "");
+            result = new String(dataOut.getData(), 0, dataOut.size());
+            dataOut.close();
+        }
+        return result;
+    }
+
+    public static Properties stringToProperties(String str) throws IOException {
+        Properties result = new Properties();
+        if (str != null && str.length() > 0) {
+            DataByteArrayInputStream dataIn = new DataByteArrayInputStream(str.getBytes());
+            result.load(dataIn);
+            dataIn.close();
+        }
+        return result;
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/OpenwireException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/OpenwireException.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/OpenwireException.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/OpenwireException.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire.support;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class OpenwireException extends Exception {
+
+    public OpenwireException() {
+    }
+
+    public OpenwireException(String s) {
+        super(s);
+    }
+
+    public OpenwireException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public OpenwireException(Throwable throwable) {
+        super(throwable);
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/Settings.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/Settings.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/Settings.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/Settings.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire.support;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Settings {
+
+    static public boolean enable_compression() {
+        return true;
+    }
+
+    static public boolean enable_nested_map_and_list() {
+        return true;
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support;
+
+import java.util.Date;
+import java.util.HashMap;
+
+import org.apache.activemq.apollo.openwire.command.ActiveMQDestination;
+
+public final class TypeConversionSupport {
+
+    static class ConversionKey {
+        final Class from;
+        final Class to;
+        final int hashCode;
+
+        public ConversionKey(Class from, Class to) {
+            this.from = from;
+            this.to = to;
+            this.hashCode = from.hashCode() ^ (to.hashCode() << 1);
+        }
+
+        public boolean equals(Object o) {
+            ConversionKey x = (ConversionKey)o;
+            return x.from == from && x.to == to;
+        }
+
+        public int hashCode() {
+            return hashCode;
+        }
+    }
+
+    interface Converter {
+        Object convert(Object value);
+    }
+
+    private static final HashMap<ConversionKey, Converter> CONVERSION_MAP = new HashMap<ConversionKey, Converter>();
+    static {
+        Converter toStringConverter = new Converter() {
+            public Object convert(Object value) {
+                return value.toString();
+            }
+        };
+        CONVERSION_MAP.put(new ConversionKey(Boolean.class, String.class), toStringConverter);
+        CONVERSION_MAP.put(new ConversionKey(Byte.class, String.class), toStringConverter);
+        CONVERSION_MAP.put(new ConversionKey(Short.class, String.class), toStringConverter);
+        CONVERSION_MAP.put(new ConversionKey(Integer.class, String.class), toStringConverter);
+        CONVERSION_MAP.put(new ConversionKey(Long.class, String.class), toStringConverter);
+        CONVERSION_MAP.put(new ConversionKey(Float.class, String.class), toStringConverter);
+        CONVERSION_MAP.put(new ConversionKey(Double.class, String.class), toStringConverter);
+
+        CONVERSION_MAP.put(new ConversionKey(String.class, Boolean.class), new Converter() {
+            public Object convert(Object value) {
+                return Boolean.valueOf((String)value);
+            }
+        });
+        CONVERSION_MAP.put(new ConversionKey(String.class, Byte.class), new Converter() {
+            public Object convert(Object value) {
+                return Byte.valueOf((String)value);
+            }
+        });
+        CONVERSION_MAP.put(new ConversionKey(String.class, Short.class), new Converter() {
+            public Object convert(Object value) {
+                return Short.valueOf((String)value);
+            }
+        });
+        CONVERSION_MAP.put(new ConversionKey(String.class, Integer.class), new Converter() {
+            public Object convert(Object value) {
+                return Integer.valueOf((String)value);
+            }
+        });
+        CONVERSION_MAP.put(new ConversionKey(String.class, Long.class), new Converter() {
+            public Object convert(Object value) {
+                return Long.valueOf((String)value);
+            }
+        });
+        CONVERSION_MAP.put(new ConversionKey(String.class, Float.class), new Converter() {
+            public Object convert(Object value) {
+                return Float.valueOf((String)value);
+            }
+        });
+        CONVERSION_MAP.put(new ConversionKey(String.class, Double.class), new Converter() {
+            public Object convert(Object value) {
+                return Double.valueOf((String)value);
+            }
+        });
+
+        Converter longConverter = new Converter() {
+            public Object convert(Object value) {
+                return Long.valueOf(((Number)value).longValue());
+            }
+        };
+        CONVERSION_MAP.put(new ConversionKey(Byte.class, Long.class), longConverter);
+        CONVERSION_MAP.put(new ConversionKey(Short.class, Long.class), longConverter);
+        CONVERSION_MAP.put(new ConversionKey(Integer.class, Long.class), longConverter);
+        CONVERSION_MAP.put(new ConversionKey(Date.class, Long.class), new Converter() {
+            public Object convert(Object value) {
+                return Long.valueOf(((Date)value).getTime());
+            }
+        });
+
+        Converter intConverter = new Converter() {
+            public Object convert(Object value) {
+                return Integer.valueOf(((Number)value).intValue());
+            }
+        };
+        CONVERSION_MAP.put(new ConversionKey(Byte.class, Integer.class), intConverter);
+        CONVERSION_MAP.put(new ConversionKey(Short.class, Integer.class), intConverter);
+
+        CONVERSION_MAP.put(new ConversionKey(Byte.class, Short.class), new Converter() {
+            public Object convert(Object value) {
+                return Short.valueOf(((Number)value).shortValue());
+            }
+        });
+
+        CONVERSION_MAP.put(new ConversionKey(Float.class, Double.class), new Converter() {
+            public Object convert(Object value) {
+                return new Double(((Number)value).doubleValue());
+            }
+        });
+        CONVERSION_MAP.put(new ConversionKey(String.class, ActiveMQDestination.class), new Converter() {
+            public Object convert(Object value) {
+                return ActiveMQDestination.createDestination((String)value, ActiveMQDestination.QUEUE_TYPE);
+            }
+        });
+    }
+
+    private TypeConversionSupport() {
+    }
+
+    public static Object convert(Object value, Class clazz) {
+
+        assert value != null && clazz != null;
+
+        if (value.getClass() == clazz) {
+            return value;
+        }
+
+        Converter c = CONVERSION_MAP.get(new ConversionKey(value.getClass(), clazz));
+        if (c == null) {
+            return null;
+        }
+        return c.convert(value);
+
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.advisory;
+
+import org.apache.activemq.apollo.openwire.command.ActiveMQDestination;
+import org.apache.activemq.apollo.openwire.command.ActiveMQTopic;
+
+public final class AdvisorySupport {
+
+    public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
+    public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Connection");
+    public static final ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue");
+    public static final ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic");
+    public static final ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue");
+    public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic");
+    public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
+    public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
+    public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
+    public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
+    public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
+    public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
+    public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
+    public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
+    public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
+    public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
+    public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
+    public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastPorducer.";
+    public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
+    public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
+    public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
+    public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
+    public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
+    public static final String AGENT_TOPIC = "ActiveMQ.Agent";
+    public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
+    public static final String MSG_PROPERTY_ORIGIN_BROKER_ID="originBrokerId";
+    public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME="originBrokerName";
+    public static final String MSG_PROPERTY_ORIGIN_BROKER_URL="originBrokerURL";
+    public static final String MSG_PROPERTY_USAGE_NAME="usageName";
+    public static final String MSG_PROPERTY_CONSUMER_ID="consumerId";
+    public static final String MSG_PROPERTY_PRODUCER_ID="producerId";
+    public static final String MSG_PROPERTY_MESSAGE_ID="orignalMessageId";
+    public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC + "," + TEMP_TOPIC_ADVISORY_TOPIC);
+    private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
+
+    private AdvisorySupport() {        
+    }
+    
+    public static ActiveMQTopic getConnectionAdvisoryTopic() {
+        return CONNECTION_ADVISORY_TOPIC;
+    }
+
+    public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isQueue()) {
+            return new ActiveMQTopic(QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+        } else {
+            return new ActiveMQTopic(TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+        }
+    }
+
+    public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isQueue()) {
+            return new ActiveMQTopic(QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+        } else {
+            return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+        }
+    }
+
+    public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) {
+        if (destination.isQueue()) {
+            return getExpiredQueueMessageAdvisoryTopic(destination);
+        }
+        return getExpiredTopicMessageAdvisoryTopic(destination);
+    }
+
+    public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
+        String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+
+    public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) {
+        String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+
+    public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) {
+        String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+
+    public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) {
+        String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
+        String name = SLOW_CONSUMER_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) {
+        String name = FAST_PRODUCER_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
+        String name = MESSAGE_DISCAREDED_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
+        String name = MESSAGE_DELIVERED_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
+        String name = MESSAGE_CONSUMED_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getMasterBrokerAdvisoryTopic() {
+        return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
+    }
+    
+    public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
+        String name = FULL_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+
+    public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
+        switch (destination.getDestinationType()) {
+        case ActiveMQDestination.QUEUE_TYPE:
+            return QUEUE_ADVISORY_TOPIC;
+        case ActiveMQDestination.TOPIC_TYPE:
+            return TOPIC_ADVISORY_TOPIC;
+        case ActiveMQDestination.TEMP_QUEUE_TYPE:
+            return TEMP_QUEUE_ADVISORY_TOPIC;
+        case ActiveMQDestination.TEMP_TOPIC_TYPE:
+            return TEMP_TOPIC_ADVISORY_TOPIC;
+        default:
+            throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
+        }
+    }
+
+    public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isDestinationAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC) || destination.equals(QUEUE_ADVISORY_TOPIC)
+                   || destination.equals(TOPIC_ADVISORY_TOPIC);
+        }
+    }
+
+    public static boolean isAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
+        }
+    }
+
+    public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isConnectionAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.equals(CONNECTION_ADVISORY_TOPIC);
+        }
+    }
+
+    public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isProducerAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX);
+        }
+    }
+
+    public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isConsumerAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isFastProducerAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isFullAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX);
+        }
+    }
+
+    /**
+     * Returns the agent topic which is used to send commands to the broker
+     */
+    public static ActiveMQDestination getAgentDestination() {
+        return AGENT_TOPIC_DESTINATION;
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java
------------------------------------------------------------------------------
    svn:executable = *