You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/01 20:37:54 UTC

svn commit: r780773 [7/31] - in /activemq/sandbox/activemq-flow: activemq-client/ activemq-client/src/main/java/org/ activemq-client/src/main/java/org/apache/ activemq-client/src/main/java/org/apache/activemq/ activemq-client/src/main/java/org/apache/a...

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ReplayCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ReplayCommand.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ReplayCommand.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ReplayCommand.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * A general purpose replay command for some kind of producer where ranges of
+ * messages are asked to be replayed. This command is typically used over a
+ * non-reliable transport such as UDP or multicast but could also be used on
+ * TCP/IP if a socket has been re-established.
+ * 
+ * @openwire:marshaller code="65"
+ * @version $Revision: 563921 $
+ */
+public class ReplayCommand extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REPLAY;
+
+    private String producerId;
+    private int firstAckNumber;
+    private int lastAckNumber;
+    private int firstNakNumber;
+    private int lastNakNumber;
+
+    public ReplayCommand() {
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public String getProducerId() {
+        return producerId;
+    }
+
+    /**
+     * Is used to uniquely identify the producer of the sequence
+     * 
+     * @openwire:property version=1 cache=false
+     */
+    public void setProducerId(String producerId) {
+        this.producerId = producerId;
+    }
+
+    public int getFirstAckNumber() {
+        return firstAckNumber;
+    }
+
+    /**
+     * Is used to specify the first sequence number being acknowledged as delivered on the transport
+     * so that it can be removed from cache
+     * 
+     * @openwire:property version=1
+     */
+    public void setFirstAckNumber(int firstSequenceNumber) {
+        this.firstAckNumber = firstSequenceNumber;
+    }
+
+    public int getLastAckNumber() {
+        return lastAckNumber;
+    }
+
+    /**
+     * Is used to specify the last sequence number being acknowledged as delivered on the transport
+     * so that it can be removed from cache
+     * 
+     * @openwire:property version=1
+     */
+    public void setLastAckNumber(int lastSequenceNumber) {
+        this.lastAckNumber = lastSequenceNumber;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return null;
+    }
+
+    /**
+     * Is used to specify the first sequence number to be replayed
+     * 
+     * @openwire:property version=1
+     */
+    public int getFirstNakNumber() {
+        return firstNakNumber;
+    }
+
+    public void setFirstNakNumber(int firstNakNumber) {
+        this.firstNakNumber = firstNakNumber;
+    }
+
+    /**
+     * Is used to specify the last sequence number to be replayed
+     * 
+     * @openwire:property version=1
+     */
+    public int getLastNakNumber() {
+        return lastNakNumber;
+    }
+
+    public void setLastNakNumber(int lastNakNumber) {
+        this.lastNakNumber = lastNakNumber;
+    }
+
+    public String toString() {
+        return "ReplayCommand {commandId = " + getCommandId() + ", firstNakNumber = " + getFirstNakNumber() + ", lastNakNumber = " + getLastNakNumber() + "}";
+    }
+    
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Response.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Response.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Response.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Response.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * @openwire:marshaller code="30"
+ * @version $Revision: 1.6 $
+ */
+public class Response extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.RESPONSE;
+    int correlationId;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getCorrelationId() {
+        return correlationId;
+    }
+
+    public void setCorrelationId(int responseId) {
+        this.correlationId = responseId;
+    }
+
+    public boolean isResponse() {
+        return true;
+    }
+
+    public boolean isException() {
+        return false;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return null;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Response.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionId.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionId.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionId.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+/**
+ * 
+ * @openwire:marshaller code="121"
+ * @version $Revision$
+ */
+public class SessionId implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_ID;
+
+    protected String connectionId;
+    protected long value;
+
+    protected transient int hashCode;
+    protected transient String key;
+    protected transient ConnectionId parentId;
+
+    public SessionId() {
+    }
+
+    public SessionId(ConnectionId connectionId, long sessionId) {
+        this.connectionId = connectionId.getValue();
+        this.value = sessionId;
+    }
+
+    public SessionId(SessionId id) {
+        this.connectionId = id.getConnectionId();
+        this.value = id.getValue();
+    }
+
+    public SessionId(ProducerId id) {
+        this.connectionId = id.getConnectionId();
+        this.value = id.getSessionId();
+    }
+
+    public SessionId(ConsumerId id) {
+        this.connectionId = id.getConnectionId();
+        this.value = id.getSessionId();
+    }
+
+    public ConnectionId getParentId() {
+        if (parentId == null) {
+            parentId = new ConnectionId(this);
+        }
+        return parentId;
+    }
+
+    public int hashCode() {
+        if (hashCode == 0) {
+            hashCode = connectionId.hashCode() ^ (int)value;
+        }
+        return hashCode;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != SessionId.class) {
+            return false;
+        }
+        SessionId id = (SessionId)o;
+        return value == id.value && connectionId.equals(id.connectionId);
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public String getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(String connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getValue() {
+        return value;
+    }
+
+    public void setValue(long sessionId) {
+        this.value = sessionId;
+    }
+
+    public String toString() {
+        if (key == null) {
+            key = connectionId + ":" + value;
+        }
+        return key;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionId.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionInfo.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * 
+ * @openwire:marshaller code="4"
+ * @version $Revision: 1.13 $
+ */
+public class SessionInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_INFO;
+
+    protected SessionId sessionId;
+
+    public SessionInfo() {
+        sessionId = new SessionId();
+    }
+
+    public SessionInfo(ConnectionInfo connectionInfo, long sessionId) {
+        this.sessionId = new SessionId(connectionInfo.getConnectionId(), sessionId);
+    }
+
+    public SessionInfo(SessionId sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public SessionId getSessionId() {
+        return sessionId;
+    }
+
+    public void setSessionId(SessionId sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    public RemoveInfo createRemoveCommand() {
+        RemoveInfo command = new RemoveInfo(getSessionId());
+        command.setResponseRequired(isResponseRequired());
+        return command;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processAddSession(this);
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ShutdownInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ShutdownInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ShutdownInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ShutdownInfo.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * 
+ * @openwire:marshaller code="11"
+ * @version $Revision$
+ */
+public class ShutdownInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SHUTDOWN_INFO;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processShutdown(this);
+    }
+
+    public boolean isShutdownInfo() {
+        return true;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ShutdownInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SubscriptionInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SubscriptionInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SubscriptionInfo.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.util.IntrospectionSupport;
+
+/**
+ * Used to represent a durable subscription.
+ * 
+ * @openwire:marshaller code="55"
+ * @version $Revision: 1.6 $
+ */
+public class SubscriptionInfo implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.DURABLE_SUBSCRIPTION_INFO;
+
+    protected ActiveMQDestination subscribedDestination;
+    protected ActiveMQDestination destination;
+    protected String clientId;
+    protected String subscriptionName;
+    protected String selector;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    /**
+     * This is the a resolved destination that the subscription is receiving
+     * messages from. This will never be a pattern or a composite destination.
+     * 
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getSelector() {
+        return selector;
+    }
+
+    public void setSelector(String selector) {
+        this.selector = selector;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @deprecated
+     */
+    public String getSubcriptionName() {
+        return subscriptionName;
+    }
+
+    /**
+     * @param subscriptionName *
+     * @deprecated
+     */
+    public void setSubcriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    public String getSubscriptionName() {
+        return subscriptionName;
+    }
+
+    public void setSubscriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public String toString() {
+        return IntrospectionSupport.toString(this);
+    }
+
+    public int hashCode() {
+        int h1 = clientId != null ? clientId.hashCode() : -1;
+        int h2 = subscriptionName != null ? subscriptionName.hashCode() : -1;
+        return h1 ^ h2;
+    }
+
+    public boolean equals(Object obj) {
+        boolean result = false;
+        if (obj instanceof SubscriptionInfo) {
+            SubscriptionInfo other = (SubscriptionInfo)obj;
+            result = (clientId == null && other.clientId == null || clientId != null
+                                                                    && other.clientId != null
+                                                                    && clientId.equals(other.clientId))
+                     && (subscriptionName == null && other.subscriptionName == null || subscriptionName != null
+                                                                                       && other.subscriptionName != null
+                                                                                       && subscriptionName
+                                                                                           .equals(other.subscriptionName));
+        }
+        return result;
+    }
+
+    /**
+     * The destination the client originally subscribed to.. This may not match
+     * the {@see getDestination} method if the subscribed destination uses
+     * patterns or composites.
+     * 
+     * If the subscribed destinationis not set, this just ruturns the
+     * desitination.
+     * 
+     * @openwire:property version=3
+     */
+    public ActiveMQDestination getSubscribedDestination() {
+        if (subscribedDestination == null) {
+            return getDestination();
+        }
+        return subscribedDestination;
+    }
+
+    public void setSubscribedDestination(ActiveMQDestination subscribedDestination) {
+        this.subscribedDestination = subscribedDestination;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionId.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionId.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionId.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+/**
+ * @openwire:marshaller
+ * @version $Revision: 1.6 $
+ */
+public abstract class TransactionId implements DataStructure {
+
+    public abstract boolean isXATransaction();
+    public abstract boolean isLocalTransaction();
+    public abstract String getTransactionKey();
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionId.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionInfo.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.IOException;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * 
+ * @openwire:marshaller code="7"
+ * @version $Revision: 1.10 $
+ */
+public class TransactionInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.TRANSACTION_INFO;
+
+    public static final byte BEGIN = 0;
+    public static final byte PREPARE = 1;
+    public static final byte COMMIT_ONE_PHASE = 2;
+    public static final byte COMMIT_TWO_PHASE = 3;
+    public static final byte ROLLBACK = 4;
+    public static final byte RECOVER = 5;
+    public static final byte FORGET = 6;
+    public static final byte END = 7;
+
+    protected byte type;
+    protected ConnectionId connectionId;
+    protected TransactionId transactionId;
+
+    public TransactionInfo() {
+    }
+
+    public TransactionInfo(ConnectionId connectionId, TransactionId transactionId, byte type) {
+        this.connectionId = connectionId;
+        this.transactionId = transactionId;
+        this.type = type;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(ConnectionId connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public TransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(TransactionId transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public byte getType() {
+        return type;
+    }
+
+    public void setType(byte type) {
+        this.type = type;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        switch (type) {
+        case TransactionInfo.BEGIN:
+            return visitor.processBeginTransaction(this);
+        case TransactionInfo.END:
+            return visitor.processEndTransaction(this);
+        case TransactionInfo.PREPARE:
+            return visitor.processPrepareTransaction(this);
+        case TransactionInfo.COMMIT_ONE_PHASE:
+            return visitor.processCommitTransactionOnePhase(this);
+        case TransactionInfo.COMMIT_TWO_PHASE:
+            return visitor.processCommitTransactionTwoPhase(this);
+        case TransactionInfo.ROLLBACK:
+            return visitor.processRollbackTransaction(this);
+        case TransactionInfo.RECOVER:
+            return visitor.processRecoverTransactions(this);
+        case TransactionInfo.FORGET:
+            return visitor.processForgetTransaction(this);
+        default:
+            throw new IOException("Transaction info type unknown: " + type);
+        }
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/WireFormatInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/WireFormatInfo.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * @openwire:marshaller code="1"
+ * @version $Revision$
+ */
+public class WireFormatInfo implements Command, MarshallAware {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.WIREFORMAT_INFO;
+    private static final int MAX_PROPERTY_SIZE = 1024 * 4;
+    private static final byte MAGIC[] = new byte[] {'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q'};
+
+    protected byte magic[] = MAGIC;
+    protected int version;
+    protected ByteSequence marshalledProperties;
+
+    protected transient Map<String, Object> properties;
+    private transient Endpoint from;
+    private transient Endpoint to;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isWireFormatInfo() {
+        return true;
+    }
+
+    public boolean isMarshallAware() {
+        return true;
+    }
+
+    /**
+     * @openwire:property version=1 size=8 testSize=-1
+     */
+    public byte[] getMagic() {
+        return magic;
+    }
+
+    public void setMagic(byte[] magic) {
+        this.magic = magic;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public ByteSequence getMarshalledProperties() {
+        return marshalledProperties;
+    }
+
+    public void setMarshalledProperties(ByteSequence marshalledProperties) {
+        this.marshalledProperties = marshalledProperties;
+    }
+
+    /**
+     * The endpoint within the transport where this message came from.
+     */
+    public Endpoint getFrom() {
+        return from;
+    }
+
+    public void setFrom(Endpoint from) {
+        this.from = from;
+    }
+
+    /**
+     * The endpoint within the transport where this message is going to - null
+     * means all endpoints.
+     */
+    public Endpoint getTo() {
+        return to;
+    }
+
+    public void setTo(Endpoint to) {
+        this.to = to;
+    }
+
+    // ////////////////////
+    // 
+    // Implementation Methods.
+    //
+    // ////////////////////
+
+    public Object getProperty(String name) throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                return null;
+            }
+            properties = unmarsallProperties(marshalledProperties);
+        }
+        return properties.get(name);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<String, Object> getProperties() throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                return Collections.EMPTY_MAP;
+            }
+            properties = unmarsallProperties(marshalledProperties);
+        }
+        return Collections.unmodifiableMap(properties);
+    }
+
+    public void clearProperties() {
+        marshalledProperties = null;
+        properties = null;
+    }
+
+    public void setProperty(String name, Object value) throws IOException {
+        lazyCreateProperties();
+        properties.put(name, value);
+    }
+
+    protected void lazyCreateProperties() throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                properties = new HashMap<String, Object>();
+            } else {
+                properties = unmarsallProperties(marshalledProperties);
+                marshalledProperties = null;
+            }
+        }
+    }
+
+    private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
+        return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)), MAX_PROPERTY_SIZE);
+    }
+
+    public void beforeMarshall(WireFormat wireFormat) throws IOException {
+        // Need to marshal the properties.
+        if (marshalledProperties == null && properties != null) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputStream os = new DataOutputStream(baos);
+            MarshallingSupport.marshalPrimitiveMap(properties, os);
+            os.close();
+            marshalledProperties = baos.toByteSequence();
+        }
+    }
+
+    public void afterMarshall(WireFormat wireFormat) throws IOException {
+    }
+
+    public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
+    }
+
+    public void afterUnmarshall(WireFormat wireFormat) throws IOException {
+    }
+
+    public boolean isValid() {
+        return magic != null && Arrays.equals(magic, MAGIC);
+    }
+
+    public void setResponseRequired(boolean responseRequired) {
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isCacheEnabled() throws IOException {
+        return Boolean.TRUE == getProperty("CacheEnabled");
+    }
+
+    public void setCacheEnabled(boolean cacheEnabled) throws IOException {
+        setProperty("CacheEnabled", cacheEnabled ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isStackTraceEnabled() throws IOException {
+        return Boolean.TRUE == getProperty("StackTraceEnabled");
+    }
+
+    public void setStackTraceEnabled(boolean stackTraceEnabled) throws IOException {
+        setProperty("StackTraceEnabled", stackTraceEnabled ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isTcpNoDelayEnabled() throws IOException {
+        return Boolean.TRUE == getProperty("TcpNoDelayEnabled");
+    }
+
+    public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) throws IOException {
+        setProperty("TcpNoDelayEnabled", tcpNoDelayEnabled ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isSizePrefixDisabled() throws IOException {
+        return Boolean.TRUE == getProperty("SizePrefixDisabled");
+    }
+
+    public void setSizePrefixDisabled(boolean prefixPacketSize) throws IOException {
+        setProperty("SizePrefixDisabled", prefixPacketSize ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isTightEncodingEnabled() throws IOException {
+        return Boolean.TRUE == getProperty("TightEncodingEnabled");
+    }
+
+    public void setTightEncodingEnabled(boolean tightEncodingEnabled) throws IOException {
+        setProperty("TightEncodingEnabled", tightEncodingEnabled ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public long getMaxInactivityDuration() throws IOException {
+        Long l = (Long)getProperty("MaxInactivityDuration");
+        return l == null ? 0 : l.longValue();
+    }
+
+    public void setMaxInactivityDuration(long maxInactivityDuration) throws IOException {
+        setProperty("MaxInactivityDuration", new Long(maxInactivityDuration));
+    }
+    
+    public long getMaxInactivityDurationInitalDelay() throws IOException {
+        Long l = (Long)getProperty("MaxInactivityDurationInitalDelay");
+        return l == null ? 0 : l.longValue();
+    }
+
+    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) throws IOException {
+        setProperty("MaxInactivityDurationInitalDelay", new Long(maxInactivityDurationInitalDelay));
+    }
+    
+   
+
+    /**
+     * @throws IOException
+     */
+    public int getCacheSize() throws IOException {
+        Integer i = (Integer)getProperty("CacheSize");
+        return i == null ? 0 : i.intValue();
+    }
+
+    public void setCacheSize(int cacheSize) throws IOException {
+        setProperty("CacheSize", new Integer(cacheSize));
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processWireFormat(this);
+    }
+
+    public String toString() {
+        Map<String, Object> p = null;
+        try {
+            p = getProperties();
+        } catch (IOException ignore) {
+        }
+        return "WireFormatInfo { version=" + version + ", properties=" + p + ", magic=" + toString(magic) + "}";
+    }
+
+    private String toString(byte[] data) {
+        StringBuffer sb = new StringBuffer();
+        sb.append('[');
+        for (int i = 0; i < data.length; i++) {
+            if (i != 0) {
+                sb.append(',');
+            }
+            sb.append((char)data[i]);
+        }
+        sb.append(']');
+        return sb.toString();
+    }
+
+    // /////////////////////////////////////////////////////////////
+    //
+    // This are not implemented.
+    //
+    // /////////////////////////////////////////////////////////////
+
+    public void setCommandId(int value) {
+    }
+
+    public int getCommandId() {
+        return 0;
+    }
+
+    public boolean isResponseRequired() {
+        return false;
+    }
+
+    public boolean isResponse() {
+        return false;
+    }
+
+    public boolean isBrokerInfo() {
+        return false;
+    }
+
+    public boolean isMessageDispatch() {
+        return false;
+    }
+
+    public boolean isMessage() {
+        return false;
+    }
+
+    public boolean isMessageAck() {
+        return false;
+    }
+
+    public boolean isMessageDispatchNotification() {
+        return false;
+    }
+
+    public boolean isShutdownInfo() {
+        return false;
+    }
+
+    public void setCachedMarshalledForm(WireFormat wireFormat, ByteSequence data) {
+    }
+
+    public ByteSequence getCachedMarshalledForm(WireFormat wireFormat) {
+        return null;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/WireFormatInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/XATransactionId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/XATransactionId.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/XATransactionId.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/XATransactionId.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.util.Arrays;
+import javax.transaction.xa.Xid;
+import org.apache.activemq.util.HexSupport;
+
+/**
+ * @openwire:marshaller code="112"
+ * @version $Revision: 1.6 $
+ */
+public class XATransactionId extends TransactionId implements Xid, Comparable {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_XA_TRANSACTION_ID;
+
+    private int formatId;
+    private byte[] branchQualifier;
+    private byte[] globalTransactionId;
+
+    private transient int hash;
+    private transient String transactionKey;
+
+    public XATransactionId() {
+    }
+
+    public XATransactionId(Xid xid) {
+        this.formatId = xid.getFormatId();
+        this.globalTransactionId = xid.getGlobalTransactionId();
+        this.branchQualifier = xid.getBranchQualifier();
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public synchronized String getTransactionKey() {
+        if (transactionKey == null) {
+            transactionKey = "XID:" + formatId + ":" + HexSupport.toHexFromBytes(globalTransactionId) + ":"
+                             + HexSupport.toHexFromBytes(branchQualifier);
+        }
+        return transactionKey;
+    }
+
+    public String toString() {
+        return getTransactionKey();
+    }
+
+    public boolean isXATransaction() {
+        return true;
+    }
+
+    public boolean isLocalTransaction() {
+        return false;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getFormatId() {
+        return formatId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public byte[] getGlobalTransactionId() {
+        return globalTransactionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public byte[] getBranchQualifier() {
+        return branchQualifier;
+    }
+
+    public void setBranchQualifier(byte[] branchQualifier) {
+        this.branchQualifier = branchQualifier;
+        this.hash = 0;
+    }
+
+    public void setFormatId(int formatId) {
+        this.formatId = formatId;
+        this.hash = 0;
+    }
+
+    public void setGlobalTransactionId(byte[] globalTransactionId) {
+        this.globalTransactionId = globalTransactionId;
+        this.hash = 0;
+    }
+
+    public int hashCode() {
+        if (hash == 0) {
+            hash = formatId;
+            hash = hash(globalTransactionId, hash);
+            hash = hash(branchQualifier, hash);
+            if (hash == 0) {
+                hash = 0xaceace;
+            }
+        }
+        return hash;
+    }
+
+    private static int hash(byte[] bytes, int hash) {
+        int size = bytes.length;
+        for (int i = 0; i < size; i++) {
+            hash ^= bytes[i] << ((i % 4) * 8);
+        }
+        return hash;
+    }
+
+    public boolean equals(Object o) {
+        if (o == null || o.getClass() != XATransactionId.class) {
+            return false;
+        }
+        XATransactionId xid = (XATransactionId)o;
+        return xid.formatId == formatId && Arrays.equals(xid.globalTransactionId, globalTransactionId)
+               && Arrays.equals(xid.branchQualifier, branchQualifier);
+    }
+
+    public int compareTo(Object o) {
+        if (o == null || o.getClass() != XATransactionId.class) {
+            return -1;
+        }
+        XATransactionId xid = (XATransactionId)o;
+        return getTransactionKey().compareTo(xid.getTransactionKey());
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/XATransactionId.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/package.html?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/package.html (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/package.html Mon Jun  1 18:37:41 2009
@@ -0,0 +1,27 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+Command objects used via the Command Pattern to communicate among nodes
+</p>
+
+</body>
+</html>

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/filter/NoLocalExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/filter/NoLocalExpression.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/filter/NoLocalExpression.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/filter/NoLocalExpression.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.filter;
+
+import org.apache.activemq.broker.openwire.OpenwireMessageEvaluationContext;
+import org.apache.activemq.command.Message;
+
+public class NoLocalExpression implements BooleanExpression {
+
+    private final String connectionId;
+
+    public NoLocalExpression(String connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    public boolean matches(MessageEvaluationContext mec) {
+        Message message = ((OpenwireMessageEvaluationContext)mec).getMessage();
+        if (message.isDropped()) {
+            return false;
+        }
+        return !connectionId.equals(message.getMessageId().getProducerId().getConnectionId());
+    }
+
+    public Object evaluate(MessageEvaluationContext message) {
+        return matches(message) ? Boolean.TRUE : Boolean.FALSE;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/filter/NoLocalExpression.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/BooleanStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/BooleanStream.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/BooleanStream.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/BooleanStream.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public final class BooleanStream {
+
+    byte data[] = new byte[48];
+    short arrayLimit;
+    short arrayPos;
+    byte bytePos;
+
+    public boolean readBoolean() throws IOException {
+        assert arrayPos <= arrayLimit;
+        byte b = data[arrayPos];
+        boolean rc = ((b >> bytePos) & 0x01) != 0;
+        bytePos++;
+        if (bytePos >= 8) {
+            bytePos = 0;
+            arrayPos++;
+        }
+        return rc;
+    }
+
+    public void writeBoolean(boolean value) throws IOException {
+        if (bytePos == 0) {
+            arrayLimit++;
+            if (arrayLimit >= data.length) {
+                // re-grow the array.
+                byte d[] = new byte[data.length * 2];
+                System.arraycopy(data, 0, d, 0, data.length);
+                data = d;
+            }
+        }
+        if (value) {
+            data[arrayPos] |= 0x01 << bytePos;
+        }
+        bytePos++;
+        if (bytePos >= 8) {
+            bytePos = 0;
+            arrayPos++;
+        }
+    }
+
+    public void marshal(DataOutput dataOut) throws IOException {
+        if (arrayLimit < 64) {
+            dataOut.writeByte(arrayLimit);
+        } else if (arrayLimit < 256) { // max value of unsigned byte
+            dataOut.writeByte(0xC0);
+            dataOut.writeByte(arrayLimit);
+        } else {
+            dataOut.writeByte(0x80);
+            dataOut.writeShort(arrayLimit);
+        }
+
+        dataOut.write(data, 0, arrayLimit);
+        clear();
+    }
+
+    public void marshal(ByteBuffer dataOut) {
+        if (arrayLimit < 64) {
+            dataOut.put((byte)arrayLimit);
+        } else if (arrayLimit < 256) { // max value of unsigned byte
+            dataOut.put((byte)0xC0);
+            dataOut.put((byte)arrayLimit);
+        } else {
+            dataOut.put((byte)0x80);
+            dataOut.putShort(arrayLimit);
+        }
+
+        dataOut.put(data, 0, arrayLimit);
+    }
+
+    public void unmarshal(DataInput dataIn) throws IOException {
+
+        arrayLimit = (short)(dataIn.readByte() & 0xFF);
+        if (arrayLimit == 0xC0) {
+            arrayLimit = (short)(dataIn.readByte() & 0xFF);
+        } else if (arrayLimit == 0x80) {
+            arrayLimit = dataIn.readShort();
+        }
+        if (data.length < arrayLimit) {
+            data = new byte[arrayLimit];
+        }
+        dataIn.readFully(data, 0, arrayLimit);
+        clear();
+    }
+
+    public void clear() {
+        arrayPos = 0;
+        bytePos = 0;
+    }
+
+    public int marshalledSize() {
+        if (arrayLimit < 64) {
+            return 1 + arrayLimit;
+        } else if (arrayLimit < 256) {
+            return 2 + arrayLimit;
+        } else {
+            return 3 + arrayLimit;
+        }
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/BooleanStream.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/CommandIdComparator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/CommandIdComparator.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/CommandIdComparator.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/CommandIdComparator.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire;
+
+import java.util.Comparator;
+
+import org.apache.activemq.command.Command;
+
+/**
+ * A @{link Comparator} of commands using their {@link Command#getCommandId()}
+ * 
+ * @version $Revision: 564814 $
+ */
+public class CommandIdComparator implements Comparator<Command> {
+
+    public int compare(Command c1, Command c2) {        
+        return c1.getCommandId() - c2.getCommandId();
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/DataStreamMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/DataStreamMarshaller.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/DataStreamMarshaller.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/DataStreamMarshaller.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.command.DataStructure;
+
+public interface DataStreamMarshaller {
+
+    byte getDataStructureType();
+    DataStructure createObject();
+
+    int tightMarshal1(OpenWireFormat format, Object c, BooleanStream bs) throws IOException;
+    void tightMarshal2(OpenWireFormat format, Object c, DataOutput ds, BooleanStream bs) throws IOException;
+    void tightUnmarshal(OpenWireFormat format, Object data, DataInput dis, BooleanStream bs) throws IOException;
+
+    void looseMarshal(OpenWireFormat format, Object c, DataOutput ds) throws IOException;
+    void looseUnmarshal(OpenWireFormat format, Object data, DataInput dis) throws IOException;
+    
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/DataStreamMarshaller.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,659 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * 
+ * @version $Revision$
+ */
+public final class OpenWireFormat implements WireFormat {
+
+    public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_VERSION;
+
+    static final byte NULL_TYPE = CommandTypes.NULL;
+    private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
+    private static final int MARSHAL_CACHE_FREE_SPACE = 100;
+
+    private DataStreamMarshaller dataMarshallers[];
+    private int version;
+    private boolean stackTraceEnabled;
+    private boolean tcpNoDelayEnabled;
+    private boolean cacheEnabled;
+    private boolean tightEncodingEnabled;
+    private boolean sizePrefixDisabled;
+
+    // The following fields are used for value caching
+    private short nextMarshallCacheIndex;
+    private short nextMarshallCacheEvictionIndex;
+    private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>();
+    private DataStructure marshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
+    private DataStructure unmarshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
+    private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
+    private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
+    private WireFormatInfo preferedWireFormatInfo;
+    
+    private AtomicBoolean receivingMessage = new AtomicBoolean(false);
+
+    public OpenWireFormat() {
+        this(DEFAULT_VERSION);
+    }
+
+    public OpenWireFormat(int i) {
+        setVersion(i);
+    }
+
+    public int hashCode() {
+        return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
+               ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
+               ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
+               ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
+    }
+
+    public OpenWireFormat copy() {
+        OpenWireFormat answer = new OpenWireFormat();
+        answer.version = version;
+        answer.stackTraceEnabled = stackTraceEnabled;
+        answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
+        answer.cacheEnabled = cacheEnabled;
+        answer.tightEncodingEnabled = tightEncodingEnabled;
+        answer.sizePrefixDisabled = sizePrefixDisabled;
+        answer.preferedWireFormatInfo = preferedWireFormatInfo;
+        return answer;
+    }
+
+    public boolean equals(Object object) {
+        if (object == null) {
+            return false;
+        }
+        OpenWireFormat o = (OpenWireFormat)object;
+        return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
+               && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
+               && o.sizePrefixDisabled == sizePrefixDisabled;
+    }
+
+
+    public String toString() {
+        return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
+               + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + "}";
+        // return "OpenWireFormat{id="+id+",
+        // tightEncodingEnabled="+tightEncodingEnabled+"}";
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public synchronized ByteSequence marshal(Object command) throws IOException {
+
+        if (cacheEnabled) {
+            runMarshallCacheEvictionSweep();
+        }
+
+//        MarshallAware ma = null;
+//        // If not using value caching, then the marshaled form is always the
+//        // same
+//        if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) {
+//            ma = (MarshallAware)command;
+//        }
+
+        ByteSequence sequence = null;
+        // if( ma!=null ) {
+        // sequence = ma.getCachedMarshalledForm(this);
+        // }
+
+        if (sequence == null) {
+
+            int size = 1;
+            if (command != null) {
+
+                DataStructure c = (DataStructure)command;
+                byte type = c.getDataStructureType();
+                DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+                if (dsm == null) {
+                    throw new IOException("Unknown data type: " + type);
+                }
+                if (tightEncodingEnabled) {
+
+                    BooleanStream bs = new BooleanStream();
+                    size += dsm.tightMarshal1(this, c, bs);
+                    size += bs.marshalledSize();
+
+                    bytesOut.restart(size);
+                    if (!sizePrefixDisabled) {
+                        bytesOut.writeInt(size);
+                    }
+                    bytesOut.writeByte(type);
+                    bs.marshal(bytesOut);
+                    dsm.tightMarshal2(this, c, bytesOut, bs);
+                    sequence = bytesOut.toByteSequence();
+
+                } else {
+                    bytesOut.restart();
+                    if (!sizePrefixDisabled) {
+                        bytesOut.writeInt(0); // we don't know the final size
+                                                // yet but write this here for
+                                                // now.
+                    }
+                    bytesOut.writeByte(type);
+                    dsm.looseMarshal(this, c, bytesOut);
+                    sequence = bytesOut.toByteSequence();
+
+                    if (!sizePrefixDisabled) {
+                        size = sequence.getLength() - 4;
+                        int pos = sequence.offset;
+                        ByteSequenceData.writeIntBig(sequence, size);
+                        sequence.offset = pos;
+                    }
+                }
+
+            } else {
+                bytesOut.restart(5);
+                bytesOut.writeInt(size);
+                bytesOut.writeByte(NULL_TYPE);
+                sequence = bytesOut.toByteSequence();
+            }
+
+            // if( ma!=null ) {
+            // ma.setCachedMarshalledForm(this, sequence);
+            // }
+        }
+        return sequence;
+    }
+
+    public synchronized Object unmarshal(ByteSequence sequence) throws IOException {
+        bytesIn.restart(sequence);
+        // DataInputStream dis = new DataInputStream(new
+        // ByteArrayInputStream(sequence));
+
+        if (!sizePrefixDisabled) {
+            int size = bytesIn.readInt();
+            if (sequence.getLength() - 4 != size) {
+                // throw new IOException("Packet size does not match marshaled
+                // size");
+            }
+        }
+
+        Object command = doUnmarshal(bytesIn);
+        // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
+        // ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
+        // }
+        return command;
+    }
+
+    public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
+
+        if (cacheEnabled) {
+            runMarshallCacheEvictionSweep();
+        }
+
+        int size = 1;
+        if (o != null) {
+
+            DataStructure c = (DataStructure)o;
+            byte type = c.getDataStructureType();
+            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            if (dsm == null) {
+                throw new IOException("Unknown data type: " + type);
+            }
+            if (tightEncodingEnabled) {
+                BooleanStream bs = new BooleanStream();
+                size += dsm.tightMarshal1(this, c, bs);
+                size += bs.marshalledSize();
+
+                if (!sizePrefixDisabled) {
+                    dataOut.writeInt(size);
+                }
+
+                dataOut.writeByte(type);
+                bs.marshal(dataOut);
+                dsm.tightMarshal2(this, c, dataOut, bs);
+
+            } else {
+                DataOutput looseOut = dataOut;
+
+                if (!sizePrefixDisabled) {
+                    bytesOut.restart();
+                    looseOut = bytesOut;
+                }
+
+                looseOut.writeByte(type);
+                dsm.looseMarshal(this, c, looseOut);
+
+                if (!sizePrefixDisabled) {
+                    ByteSequence sequence = bytesOut.toByteSequence();
+                    dataOut.writeInt(sequence.getLength());
+                    dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+                }
+
+            }
+
+        } else {
+            if (!sizePrefixDisabled) {
+            	dataOut.writeInt(size);
+            }
+            dataOut.writeByte(NULL_TYPE);
+        }
+    }
+
+    public Object unmarshal(DataInput dis) throws IOException {
+        DataInput dataIn = dis;
+        if (!sizePrefixDisabled) {
+            dis.readInt();
+            // int size = dis.readInt();
+            // byte[] data = new byte[size];
+            // dis.readFully(data);
+            // bytesIn.restart(data);
+            // dataIn = bytesIn;
+        }
+        return doUnmarshal(dataIn);
+    }
+
+    /**
+     * Used by NIO or AIO transports
+     */
+    public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
+        int size = 1;
+        if (o != null) {
+            DataStructure c = (DataStructure)o;
+            byte type = c.getDataStructureType();
+            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            if (dsm == null) {
+                throw new IOException("Unknown data type: " + type);
+            }
+
+            size += dsm.tightMarshal1(this, c, bs);
+            size += bs.marshalledSize();
+        }
+        return size;
+    }
+
+    /**
+     * Used by NIO or AIO transports; note that the size is not written as part
+     * of this method.
+     */
+    public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException {
+        if (cacheEnabled) {
+            runMarshallCacheEvictionSweep();
+        }
+
+        if (o != null) {
+            DataStructure c = (DataStructure)o;
+            byte type = c.getDataStructureType();
+            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            if (dsm == null) {
+                throw new IOException("Unknown data type: " + type);
+            }
+            ds.writeByte(type);
+            bs.marshal(ds);
+            dsm.tightMarshal2(this, c, ds, bs);
+        }
+    }
+
+    /**
+     * Allows you to dynamically switch the version of the openwire protocol
+     * being used.
+     * 
+     * @param version
+     */
+    public void setVersion(int version) {
+        String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory";
+        Class mfClass;
+        try {
+            mfClass = Class.forName(mfName, false, getClass().getClassLoader());
+        } catch (ClassNotFoundException e) {
+            throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version
+                                                                         + ", could not load " + mfName)
+                .initCause(e);
+        }
+        try {
+            Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class});
+            dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this});
+        } catch (Throwable e) {
+            throw (IllegalArgumentException)new IllegalArgumentException(
+                                                                         "Invalid version: "
+                                                                             + version
+                                                                             + ", "
+                                                                             + mfName
+                                                                             + " does not properly implement the createMarshallerMap method.")
+                .initCause(e);
+        }
+        this.version = version;
+    }
+
+    public Object doUnmarshal(DataInput dis) throws IOException {
+        byte dataType = dis.readByte();
+        receivingMessage.set(true);
+        if (dataType != NULL_TYPE) {
+            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+            if (dsm == null) {
+                throw new IOException("Unknown data type: " + dataType);
+            }
+            Object data = dsm.createObject();
+            if (this.tightEncodingEnabled) {
+                BooleanStream bs = new BooleanStream();
+                bs.unmarshal(dis);
+                dsm.tightUnmarshal(this, data, dis, bs);
+            } else {
+                dsm.looseUnmarshal(this, data, dis);
+            }
+            receivingMessage.set(false);
+            return data;
+        } else {
+            receivingMessage.set(false);
+            return null;
+        }
+    }
+
+    // public void debug(String msg) {
+    // String t = (Thread.currentThread().getName()+" ").substring(0, 40);
+    // System.out.println(t+": "+msg);
+    // }
+    public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException {
+        bs.writeBoolean(o != null);
+        if (o == null) {
+            return 0;
+        }
+
+        if (o.isMarshallAware()) {
+            // MarshallAware ma = (MarshallAware)o;
+            ByteSequence sequence = null;
+            // sequence=ma.getCachedMarshalledForm(this);
+            bs.writeBoolean(sequence != null);
+            if (sequence != null) {
+                return 1 + sequence.getLength();
+            }
+        }
+
+        byte type = o.getDataStructureType();
+        DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+        if (dsm == null) {
+            throw new IOException("Unknown data type: " + type);
+        }
+        return 1 + dsm.tightMarshal1(this, o, bs);
+    }
+
+    public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs)
+        throws IOException {
+        if (!bs.readBoolean()) {
+            return;
+        }
+
+        byte type = o.getDataStructureType();
+        ds.writeByte(type);
+
+        if (o.isMarshallAware() && bs.readBoolean()) {
+
+            // We should not be doing any caching
+            throw new IOException("Corrupted stream");
+            // MarshallAware ma = (MarshallAware) o;
+            // ByteSequence sequence=ma.getCachedMarshalledForm(this);
+            // ds.write(sequence.getData(), sequence.getOffset(),
+            // sequence.getLength());
+
+        } else {
+
+            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            if (dsm == null) {
+                throw new IOException("Unknown data type: " + type);
+            }
+            dsm.tightMarshal2(this, o, ds, bs);
+
+        }
+    }
+
+    public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException {
+        if (bs.readBoolean()) {
+
+            byte dataType = dis.readByte();
+            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+            if (dsm == null) {
+                throw new IOException("Unknown data type: " + dataType);
+            }
+            DataStructure data = dsm.createObject();
+
+            if (data.isMarshallAware() && bs.readBoolean()) {
+
+                dis.readInt();
+                dis.readByte();
+
+                BooleanStream bs2 = new BooleanStream();
+                bs2.unmarshal(dis);
+                dsm.tightUnmarshal(this, data, dis, bs2);
+
+                // TODO: extract the sequence from the dis and associate it.
+                // MarshallAware ma = (MarshallAware)data
+                // ma.setCachedMarshalledForm(this, sequence);
+
+            } else {
+                dsm.tightUnmarshal(this, data, dis, bs);
+            }
+
+            return data;
+        } else {
+            return null;
+        }
+    }
+
+    public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException {
+        if (dis.readBoolean()) {
+
+            byte dataType = dis.readByte();
+            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+            if (dsm == null) {
+                throw new IOException("Unknown data type: " + dataType);
+            }
+            DataStructure data = dsm.createObject();
+            dsm.looseUnmarshal(this, data, dis);
+            return data;
+
+        } else {
+            return null;
+        }
+    }
+
+    public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException {
+        dataOut.writeBoolean(o != null);
+        if (o != null) {
+            byte type = o.getDataStructureType();
+            dataOut.writeByte(type);
+            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            if (dsm == null) {
+                throw new IOException("Unknown data type: " + type);
+            }
+            dsm.looseMarshal(this, o, dataOut);
+        }
+    }
+
+    public void runMarshallCacheEvictionSweep() {
+        // Do we need to start evicting??
+        while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) {
+
+            marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]);
+            marshallCache[nextMarshallCacheEvictionIndex] = null;
+
+            nextMarshallCacheEvictionIndex++;
+            if (nextMarshallCacheEvictionIndex >= marshallCache.length) {
+                nextMarshallCacheEvictionIndex = 0;
+            }
+
+        }
+    }
+
+    public Short getMarshallCacheIndex(DataStructure o) {
+        return marshallCacheMap.get(o);
+    }
+
+    public Short addToMarshallCache(DataStructure o) {
+        short i = nextMarshallCacheIndex++;
+        if (nextMarshallCacheIndex >= marshallCache.length) {
+            nextMarshallCacheIndex = 0;
+        }
+
+        // We can only cache that item if there is space left.
+        if (marshallCacheMap.size() < marshallCache.length) {
+            marshallCache[i] = o;
+            Short index = new Short(i);
+            marshallCacheMap.put(o, index);
+            return index;
+        } else {
+            // Use -1 to indicate that the value was not cached due to cache
+            // being full.
+            return new Short((short)-1);
+        }
+    }
+
+    public void setInUnmarshallCache(short index, DataStructure o) {
+
+        // There was no space left in the cache, so we can't
+        // put this in the cache.
+        if (index == -1) {
+            return;
+        }
+
+        unmarshallCache[index] = o;
+    }
+
+    public DataStructure getFromUnmarshallCache(short index) {
+        return unmarshallCache[index];
+    }
+
+    public void setStackTraceEnabled(boolean b) {
+        stackTraceEnabled = b;
+    }
+
+    public boolean isStackTraceEnabled() {
+        return stackTraceEnabled;
+    }
+
+    public boolean isTcpNoDelayEnabled() {
+        return tcpNoDelayEnabled;
+    }
+
+    public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
+        this.tcpNoDelayEnabled = tcpNoDelayEnabled;
+    }
+
+    public boolean isCacheEnabled() {
+        return cacheEnabled;
+    }
+
+    public void setCacheEnabled(boolean cacheEnabled) {
+        this.cacheEnabled = cacheEnabled;
+    }
+
+    public boolean isTightEncodingEnabled() {
+        return tightEncodingEnabled;
+    }
+
+    public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
+        this.tightEncodingEnabled = tightEncodingEnabled;
+    }
+
+    public boolean isSizePrefixDisabled() {
+        return sizePrefixDisabled;
+    }
+
+    public void setSizePrefixDisabled(boolean prefixPacketSize) {
+        this.sizePrefixDisabled = prefixPacketSize;
+    }
+
+    public void setPreferedWireFormatInfo(WireFormatInfo info) {
+        this.preferedWireFormatInfo = info;
+    }
+
+    public WireFormatInfo getPreferedWireFormatInfo() {
+        return preferedWireFormatInfo;
+    }
+    
+    public boolean inReceive() {
+    	return receivingMessage.get();
+    }
+
+    public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
+
+        if (preferedWireFormatInfo == null) {
+            throw new IllegalStateException("Wireformat cannot not be renegotiated.");
+        }
+
+        this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
+        info.setVersion(this.getVersion());
+
+        this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
+        info.setStackTraceEnabled(this.stackTraceEnabled);
+
+        this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
+        info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
+
+        this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
+        info.setCacheEnabled(this.cacheEnabled);
+
+        this.tightEncodingEnabled = info.isTightEncodingEnabled()
+                                    && preferedWireFormatInfo.isTightEncodingEnabled();
+        info.setTightEncodingEnabled(this.tightEncodingEnabled);
+
+        this.sizePrefixDisabled = info.isSizePrefixDisabled()
+                                  && preferedWireFormatInfo.isSizePrefixDisabled();
+        info.setSizePrefixDisabled(this.sizePrefixDisabled);
+
+        if (cacheEnabled) {
+
+            int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
+            info.setCacheSize(size);
+
+            if (size == 0) {
+                size = MARSHAL_CACHE_SIZE;
+            }
+
+            marshallCache = new DataStructure[size];
+            unmarshallCache = new DataStructure[size];
+            nextMarshallCacheIndex = 0;
+            nextMarshallCacheEvictionIndex = 0;
+            marshallCacheMap = new HashMap<DataStructure, Short>();
+        } else {
+            marshallCache = null;
+            unmarshallCache = null;
+            nextMarshallCacheIndex = 0;
+            nextMarshallCacheEvictionIndex = 0;
+            marshallCacheMap = null;
+        }
+
+    }
+
+    protected int min(int version1, int version2) {
+        if (version1 < version2 && version1 > 0 || version2 <= 0) {
+            return version1;
+        }
+        return version2;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire;
+
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ * @version $Revision$
+ */
+public class OpenWireFormatFactory implements WireFormatFactory {
+
+    //
+    // The default values here are what the wire format changes to after a
+    // default negotiation.
+    //
+
+    private int version = OpenWireFormat.DEFAULT_VERSION;
+    private boolean stackTraceEnabled = true;
+    private boolean tcpNoDelayEnabled = true;
+    private boolean cacheEnabled = true;
+    private boolean tightEncodingEnabled = true;
+    private boolean sizePrefixDisabled;
+    private long maxInactivityDuration = 30*1000;
+    private long maxInactivityDurationInitalDelay = 10*1000;
+    private int cacheSize = 1024;
+
+    public WireFormat createWireFormat() {
+        WireFormatInfo info = new WireFormatInfo();
+        info.setVersion(version);
+
+        try {
+            info.setStackTraceEnabled(stackTraceEnabled);
+            info.setCacheEnabled(cacheEnabled);
+            info.setTcpNoDelayEnabled(tcpNoDelayEnabled);
+            info.setTightEncodingEnabled(tightEncodingEnabled);
+            info.setSizePrefixDisabled(sizePrefixDisabled);
+            info.setMaxInactivityDuration(maxInactivityDuration);
+            info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay);
+            info.setCacheSize(cacheSize);
+        } catch (Exception e) {
+            IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo");
+            ise.initCause(e);
+            throw ise;
+        }
+
+        OpenWireFormat f = new OpenWireFormat();
+        f.setPreferedWireFormatInfo(info);
+        return f;
+    }
+
+    public boolean isStackTraceEnabled() {
+        return stackTraceEnabled;
+    }
+
+    public void setStackTraceEnabled(boolean stackTraceEnabled) {
+        this.stackTraceEnabled = stackTraceEnabled;
+    }
+
+    public boolean isTcpNoDelayEnabled() {
+        return tcpNoDelayEnabled;
+    }
+
+    public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
+        this.tcpNoDelayEnabled = tcpNoDelayEnabled;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    public boolean isCacheEnabled() {
+        return cacheEnabled;
+    }
+
+    public void setCacheEnabled(boolean cacheEnabled) {
+        this.cacheEnabled = cacheEnabled;
+    }
+
+    public boolean isTightEncodingEnabled() {
+        return tightEncodingEnabled;
+    }
+
+    public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
+        this.tightEncodingEnabled = tightEncodingEnabled;
+    }
+
+    public boolean isSizePrefixDisabled() {
+        return sizePrefixDisabled;
+    }
+
+    public void setSizePrefixDisabled(boolean sizePrefixDisabled) {
+        this.sizePrefixDisabled = sizePrefixDisabled;
+    }
+
+    public long getMaxInactivityDuration() {
+        return maxInactivityDuration;
+    }
+
+    public void setMaxInactivityDuration(long maxInactivityDuration) {
+        this.maxInactivityDuration = maxInactivityDuration;
+    }
+
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
+    public long getMaxInactivityDurationInitalDelay() {
+        return maxInactivityDurationInitalDelay;
+    }
+
+    public void setMaxInactivityDurationInitalDelay(
+            long maxInactivityDurationInitalDelay) {
+        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
------------------------------------------------------------------------------
    svn:executable = *