You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/01 20:37:54 UTC

svn commit: r780773 [30/31] - in /activemq/sandbox/activemq-flow: activemq-client/ activemq-client/src/main/java/org/ activemq-client/src/main/java/org/apache/ activemq-client/src/main/java/org/apache/activemq/ activemq-client/src/main/java/org/apache/...

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.state;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Tracks the state of a connection so a newly established transport can be
+ * re-initialized to the state that was tracked.
+ * 
+ * @version $Revision$
+ */
+public class ConnectionStateTracker extends CommandVisitorAdapter {
+    private static final Log LOG = LogFactory.getLog(ConnectionStateTracker.class);
+
+    private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
+
+    protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
+     
+    private boolean trackTransactions;
+    private boolean restoreSessions = true;
+    private boolean restoreConsumers = true;
+    private boolean restoreProducers = true;
+    private boolean restoreTransaction = true;
+    private boolean trackMessages = true;
+    private int maxCacheSize = 128 * 1024;
+    private int currentCacheSize;
+    private Map<MessageId,Message> messageCache = new LinkedHashMap<MessageId,Message>(){
+        protected boolean removeEldestEntry(Map.Entry<MessageId,Message> eldest) {
+            boolean result = currentCacheSize > maxCacheSize;
+            if (result) {
+                currentCacheSize -= eldest.getValue().getSize();
+            }
+            return result;
+        }
+    };
+    
+    
+    private class RemoveTransactionAction implements Runnable {
+        private final TransactionInfo info;
+
+        public RemoveTransactionAction(TransactionInfo info) {
+            this.info = info;
+        }
+
+        public void run() {
+            ConnectionId connectionId = info.getConnectionId();
+            ConnectionState cs = connectionStates.get(connectionId);
+            cs.removeTransactionState(info.getTransactionId());
+        }
+    }
+
+    /**
+     * 
+     * 
+     * @param command
+     * @return null if the command is not state tracked.
+     * @throws IOException
+     */
+    public Tracked track(Command command) throws IOException {
+        try {
+            return (Tracked)command.visit(this);
+        } catch (IOException e) {
+            throw e;
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+    
+    public void trackBack(Command command) {
+        if (trackMessages && command != null && command.isMessage()) {
+            Message message = (Message) command;
+            if (message.getTransactionId()==null) {
+                currentCacheSize = currentCacheSize +  message.getSize();
+            }
+        }
+    }
+
+    public void restore(Transport transport) throws IOException {
+        // Restore the connections.
+        for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
+            ConnectionState connectionState = iter.next();
+            transport.oneway(connectionState.getInfo());
+            restoreTempDestinations(transport, connectionState);
+
+            if (restoreSessions) {
+                restoreSessions(transport, connectionState);
+            }
+
+            if (restoreTransaction) {
+                restoreTransactions(transport, connectionState);
+            }
+        }
+        //now flush messages
+        for (Message msg:messageCache.values()) {
+            transport.oneway(msg);
+        }
+    }
+
+    private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
+        for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) {
+            TransactionState transactionState = (TransactionState)iter.next();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("tx: " + transactionState.getId());
+            }
+            for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) {
+                Command command = (Command)iterator.next();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("tx replay: " + command);
+                }
+                transport.oneway(command);
+            }
+        }
+    }
+
+    /**
+     * @param transport
+     * @param connectionState
+     * @throws IOException
+     */
+    protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
+        // Restore the connection's sessions
+        for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
+            SessionState sessionState = (SessionState)iter2.next();
+            transport.oneway(sessionState.getInfo());
+
+            if (restoreProducers) {
+                restoreProducers(transport, sessionState);
+            }
+
+            if (restoreConsumers) {
+                restoreConsumers(transport, sessionState);
+            }
+        }
+    }
+
+    /**
+     * @param transport
+     * @param sessionState
+     * @throws IOException
+     */
+    protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
+        // Restore the session's consumers
+        for (Iterator iter3 = sessionState.getConsumerStates().iterator(); iter3.hasNext();) {
+            ConsumerState consumerState = (ConsumerState)iter3.next();
+            transport.oneway(consumerState.getInfo());
+        }
+    }
+
+    /**
+     * @param transport
+     * @param sessionState
+     * @throws IOException
+     */
+    protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
+        // Restore the session's producers
+        for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
+            ProducerState producerState = (ProducerState)iter3.next();
+            transport.oneway(producerState.getInfo());
+        }
+    }
+
+    /**
+     * @param transport
+     * @param connectionState
+     * @throws IOException
+     */
+    protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
+        throws IOException {
+        // Restore the connection's temp destinations.
+        for (Iterator iter2 = connectionState.getTempDesinations().iterator(); iter2.hasNext();) {
+            transport.oneway((DestinationInfo)iter2.next());
+        }
+    }
+
+    public Response processAddDestination(DestinationInfo info) {
+        if (info != null) {
+            ConnectionState cs = connectionStates.get(info.getConnectionId());
+            if (cs != null && info.getDestination().isTemporary()) {
+                cs.addTempDestination(info);
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processRemoveDestination(DestinationInfo info) {
+        if (info != null) {
+            ConnectionState cs = connectionStates.get(info.getConnectionId());
+            if (cs != null && info.getDestination().isTemporary()) {
+                cs.removeTempDestination(info.getDestination());
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processAddProducer(ProducerInfo info) {
+        if (info != null && info.getProducerId() != null) {
+            SessionId sessionId = info.getProducerId().getParentId();
+            if (sessionId != null) {
+                ConnectionId connectionId = sessionId.getParentId();
+                if (connectionId != null) {
+                    ConnectionState cs = connectionStates.get(connectionId);
+                    if (cs != null) {
+                        SessionState ss = cs.getSessionState(sessionId);
+                        if (ss != null) {
+                            ss.addProducer(info);
+                        }
+                    }
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processRemoveProducer(ProducerId id) {
+        if (id != null) {
+            SessionId sessionId = id.getParentId();
+            if (sessionId != null) {
+                ConnectionId connectionId = sessionId.getParentId();
+                if (connectionId != null) {
+                    ConnectionState cs = connectionStates.get(connectionId);
+                    if (cs != null) {
+                        SessionState ss = cs.getSessionState(sessionId);
+                        if (ss != null) {
+                            ss.removeProducer(id);
+                        }
+                    }
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processAddConsumer(ConsumerInfo info) {
+        if (info != null) {
+            SessionId sessionId = info.getConsumerId().getParentId();
+            if (sessionId != null) {
+                ConnectionId connectionId = sessionId.getParentId();
+                if (connectionId != null) {
+                    ConnectionState cs = connectionStates.get(connectionId);
+                    if (cs != null) {
+                        SessionState ss = cs.getSessionState(sessionId);
+                        if (ss != null) {
+                            ss.addConsumer(info);
+                        }
+                    }
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
+        if (id != null) {
+            SessionId sessionId = id.getParentId();
+            if (sessionId != null) {
+                ConnectionId connectionId = sessionId.getParentId();
+                if (connectionId != null) {
+                    ConnectionState cs = connectionStates.get(connectionId);
+                    if (cs != null) {
+                        SessionState ss = cs.getSessionState(sessionId);
+                        if (ss != null) {
+                            ss.removeConsumer(id);
+                        }
+                    }
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processAddSession(SessionInfo info) {
+        if (info != null) {
+            ConnectionId connectionId = info.getSessionId().getParentId();
+            if (connectionId != null) {
+                ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    cs.addSession(info);
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
+        if (id != null) {
+            ConnectionId connectionId = id.getParentId();
+            if (connectionId != null) {
+                ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    cs.removeSession(id);
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processAddConnection(ConnectionInfo info) {
+        if (info != null) {
+            connectionStates.put(info.getConnectionId(), new ConnectionState(info));
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
+        if (id != null) {
+            connectionStates.remove(id);
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processMessage(Message send) throws Exception {
+        if (send != null) {
+            if (trackTransactions && send.getTransactionId() != null) {
+                ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
+                if (connectionId != null) {
+                    ConnectionState cs = connectionStates.get(connectionId);
+                    if (cs != null) {
+                        TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
+                        if (transactionState != null) {
+                            transactionState.addCommand(send);
+                        }
+                    }
+                }
+                return TRACKED_RESPONSE_MARKER;
+            }else if (trackMessages) {
+                messageCache.put(send.getMessageId(), send.copy());
+            }
+        }
+        return null;
+    }
+
+    public Response processBeginTransaction(TransactionInfo info) {
+        if (trackTransactions && info != null && info.getTransactionId() != null) {
+            ConnectionId connectionId = info.getConnectionId();
+            if (connectionId != null) {
+                ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    cs.addTransactionState(info.getTransactionId());
+                    TransactionState state = cs.getTransactionState(info.getTransactionId());
+                    state.addCommand(info);
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+        return null;
+    }
+
+    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+        if (trackTransactions && info != null) {
+            ConnectionId connectionId = info.getConnectionId();
+            if (connectionId != null) {
+                ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+                    if (transactionState != null) {
+                        transactionState.addCommand(info);
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+        return null;
+    }
+
+    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+        if (trackTransactions && info != null) {
+            ConnectionId connectionId = info.getConnectionId();
+            if (connectionId != null) {
+                ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+                    if (transactionState != null) {
+                        transactionState.addCommand(info);
+                        return new Tracked(new RemoveTransactionAction(info));
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+        if (trackTransactions && info != null) {
+            ConnectionId connectionId = info.getConnectionId();
+            if (connectionId != null) {
+                ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+                    if (transactionState != null) {
+                        transactionState.addCommand(info);
+                        return new Tracked(new RemoveTransactionAction(info));
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+        if (trackTransactions && info != null) {
+            ConnectionId connectionId = info.getConnectionId();
+            if (connectionId != null) {
+                ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+                    if (transactionState != null) {
+                        transactionState.addCommand(info);
+                        return new Tracked(new RemoveTransactionAction(info));
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public Response processEndTransaction(TransactionInfo info) throws Exception {
+        if (trackTransactions && info != null) {
+            ConnectionId connectionId = info.getConnectionId();
+            if (connectionId != null) {
+                ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+                    if (transactionState != null) {
+                        transactionState.addCommand(info);
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+        return null;
+    }
+
+    public boolean isRestoreConsumers() {
+        return restoreConsumers;
+    }
+
+    public void setRestoreConsumers(boolean restoreConsumers) {
+        this.restoreConsumers = restoreConsumers;
+    }
+
+    public boolean isRestoreProducers() {
+        return restoreProducers;
+    }
+
+    public void setRestoreProducers(boolean restoreProducers) {
+        this.restoreProducers = restoreProducers;
+    }
+
+    public boolean isRestoreSessions() {
+        return restoreSessions;
+    }
+
+    public void setRestoreSessions(boolean restoreSessions) {
+        this.restoreSessions = restoreSessions;
+    }
+
+    public boolean isTrackTransactions() {
+        return trackTransactions;
+    }
+
+    public void setTrackTransactions(boolean trackTransactions) {
+        this.trackTransactions = trackTransactions;
+    }
+
+    public boolean isRestoreTransaction() {
+        return restoreTransaction;
+    }
+
+    public void setRestoreTransaction(boolean restoreTransaction) {
+        this.restoreTransaction = restoreTransaction;
+    }
+
+    public boolean isTrackMessages() {
+        return trackMessages;
+    }
+
+    public void setTrackMessages(boolean trackMessages) {
+        this.trackMessages = trackMessages;
+    }
+
+    public int getMaxCacheSize() {
+        return maxCacheSize;
+    }
+
+    public void setMaxCacheSize(int maxCacheSize) {
+        this.maxCacheSize = maxCacheSize;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConsumerState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConsumerState.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConsumerState.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConsumerState.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.state;
+
+import org.apache.activemq.command.ConsumerInfo;
+
+public class ConsumerState {        
+    final ConsumerInfo info;
+    
+    public ConsumerState(ConsumerInfo info) {
+        this.info = info;
+    }        
+    public String toString() {
+        return info.toString();
+    }        
+    public ConsumerInfo getInfo() {
+        return info;
+    }        
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConsumerState.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ProducerState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ProducerState.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ProducerState.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ProducerState.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.state;
+
+import org.apache.activemq.command.ProducerInfo;
+
+public class ProducerState {
+    final ProducerInfo info;
+
+    public ProducerState(ProducerInfo info) {
+        this.info = info;
+    }
+
+    public String toString() {
+        return info.toString();
+    }
+
+    public ProducerInfo getInfo() {
+        return info;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ProducerState.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/SessionState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/SessionState.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/SessionState.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/SessionState.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.state;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class SessionState {
+    final SessionInfo info;
+
+    private final Map<ProducerId, ProducerState> producers = new ConcurrentHashMap<ProducerId, ProducerState>();
+    private final Map<ConsumerId, ConsumerState> consumers = new ConcurrentHashMap<ConsumerId, ConsumerState>();
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+    public SessionState(SessionInfo info) {
+        this.info = info;
+    }
+
+    public String toString() {
+        return info.toString();
+    }
+
+    public void addProducer(ProducerInfo info) {
+        checkShutdown();
+        producers.put(info.getProducerId(), new ProducerState(info));
+    }
+
+    public ProducerState removeProducer(ProducerId id) {
+        return producers.remove(id);
+    }
+
+    public void addConsumer(ConsumerInfo info) {
+        checkShutdown();
+        consumers.put(info.getConsumerId(), new ConsumerState(info));
+    }
+
+    public ConsumerState removeConsumer(ConsumerId id) {
+        return consumers.remove(id);
+    }
+
+    public SessionInfo getInfo() {
+        return info;
+    }
+
+    public Set<ConsumerId> getConsumerIds() {
+        return consumers.keySet();
+    }
+
+    public Set<ProducerId> getProducerIds() {
+        return producers.keySet();
+    }
+
+    public Collection<ProducerState> getProducerStates() {
+        return producers.values();
+    }
+
+    public ProducerState getProducerState(ProducerId producerId) {
+        return producers.get(producerId);
+    }
+
+    public Collection<ConsumerState> getConsumerStates() {
+        return consumers.values();
+    }
+
+    public ConsumerState getConsumerState(ConsumerId consumerId) {
+        return consumers.get(consumerId);
+    }
+
+    private void checkShutdown() {
+        if (shutdown.get()) {
+            throw new IllegalStateException("Disposed");
+        }
+    }
+
+    public void shutdown() {
+        shutdown.set(false);
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/SessionState.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/Tracked.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/Tracked.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/Tracked.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/Tracked.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.state;
+
+import org.apache.activemq.command.Response;
+
+public class Tracked extends Response {
+
+    private Runnable runnable;
+
+    public Tracked(Runnable runnable) {
+        this.runnable = runnable;
+    }
+
+    public void onResponses() {
+        if (runnable != null) {
+            runnable.run();
+            runnable = null;
+        }
+    }
+
+    public boolean isWaitingForResponse() {
+        return runnable != null;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/TransactionState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/TransactionState.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/TransactionState.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/TransactionState.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.state;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.TransactionId;
+
+public class TransactionState {
+
+    private final List<Command> commands = new ArrayList<Command>();
+    private final TransactionId id;
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
+    private boolean prepared;
+    private int preparedResult;
+
+    public TransactionState(TransactionId id) {
+        this.id = id;
+    }
+
+    public String toString() {
+        return id.toString();
+    }
+
+    public void addCommand(Command operation) {
+        checkShutdown();
+        commands.add(operation);
+    }
+
+    public List<Command> getCommands() {
+        return commands;
+    }
+
+    private void checkShutdown() {
+        if (shutdown.get()) {
+            throw new IllegalStateException("Disposed");
+        }
+    }
+
+    public void shutdown() {
+        shutdown.set(false);
+    }
+
+    public TransactionId getId() {
+        return id;
+    }
+
+    public void setPrepared(boolean prepared) {
+        this.prepared = prepared;
+    }
+
+    public boolean isPrepared() {
+        return prepared;
+    }
+
+    public void setPreparedResult(int preparedResult) {
+        this.preparedResult = preparedResult;
+    }
+
+    public int getPreparedResult() {
+        return preparedResult;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Negotiates the wire format with a new connection
+ */
+public class WireFormatNegotiator extends TransportFilter {
+
+    private static final Log LOG = LogFactory.getLog(WireFormatNegotiator.class);
+
+    private OpenWireFormat wireFormat;
+    private final int minimumVersion;
+    private long negotiateTimeout = 15000L;
+
+    private final AtomicBoolean firstStart = new AtomicBoolean(true);
+    private final CountDownLatch readyCountDownLatch = new CountDownLatch(1);
+    private final CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
+
+    /**
+     * Negotiator
+     * 
+     * @param next
+     */
+    public WireFormatNegotiator(Transport next, OpenWireFormat wireFormat, int minimumVersion) {
+        super(next);
+        this.wireFormat = wireFormat;
+        if (minimumVersion <= 0) {
+            minimumVersion = 1;
+        }
+        this.minimumVersion = minimumVersion;
+        
+        // Setup the initial negociation timeout to be the same as the inital max inactivity delay specified on the wireformat
+        // Does not make sense for us to take longer.
+        try {
+            if( wireFormat.getPreferedWireFormatInfo() !=null ) {
+                setNegotiateTimeout(wireFormat.getPreferedWireFormatInfo().getMaxInactivityDurationInitalDelay());
+            }
+        } catch (IOException e) {
+        }
+    }
+
+    public void start() throws Exception {
+        super.start();
+        if (firstStart.compareAndSet(true, false)) {
+            sendWireFormat();
+        }
+    }
+
+    public void sendWireFormat() throws IOException {
+        try {
+            WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Sending: " + info);
+            }
+            sendWireFormat(info);
+        } finally {
+            wireInfoSentDownLatch.countDown();
+        }
+    }
+
+    public void stop() throws Exception {
+        super.stop();
+        readyCountDownLatch.countDown();
+    }
+
+    public void oneway(Object command) throws IOException {
+        try {
+            if (!readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
+                throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new InterruptedIOException();
+        }
+        super.oneway(command);
+    }
+
+    public void onCommand(Object o) {
+        Command command = (Command)o;
+        if (command.isWireFormatInfo()) {
+            WireFormatInfo info = (WireFormatInfo)command;
+            negociate(info);
+        }
+        getTransportListener().onCommand(command);
+    }
+
+    public void negociate(WireFormatInfo info) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Received WireFormat: " + info);
+        }
+
+        try {
+            wireInfoSentDownLatch.await();
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(this + " before negotiation: " + wireFormat);
+            }
+            if (!info.isValid()) {
+                onException(new IOException("Remote wire format magic is invalid"));
+            } else if (info.getVersion() < minimumVersion) {
+                onException(new IOException("Remote wire format (" + info.getVersion() + ") is lower the minimum version required (" + minimumVersion + ")"));
+            }
+
+            wireFormat.renegotiateWireFormat(info);
+            Socket socket = next.narrow(Socket.class);
+            if (socket != null) {
+                socket.setTcpNoDelay(wireFormat.isTcpNoDelayEnabled());
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(this + " after negotiation: " + wireFormat);
+            }
+
+        } catch (IOException e) {
+            onException(e);
+        } catch (InterruptedException e) {
+            onException((IOException)new InterruptedIOException().initCause(e));
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
+        }
+        readyCountDownLatch.countDown();
+        onWireFormatNegotiated(info);
+    }
+
+    public void onException(IOException error) {
+        readyCountDownLatch.countDown();
+        /*
+         * try { super.oneway(new ExceptionResponse(error)); } catch
+         * (IOException e) { // ignore as we are already throwing an exception }
+         */
+        super.onException(error);
+    }
+
+    public String toString() {
+        return next.toString();
+    }
+
+    protected void sendWireFormat(WireFormatInfo info) throws IOException {
+        next.oneway(info);
+    }
+
+    protected void onWireFormatNegotiated(WireFormatInfo info) {
+    }
+
+    public long getNegotiateTimeout() {
+        return negotiateTimeout;
+    }
+
+    public void setNegotiateTimeout(long negotiateTimeout) {
+        this.negotiateTimeout = negotiateTimeout;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+
+
+/**
+ Identify if a limit has been reached
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 1.3 $
+ */
+public class DefaultUsageCapacity implements UsageCapacity{
+
+    private long limit;
+    
+    /**
+     * @param size
+     * @return true if the limit is reached
+     * @see org.apache.activemq.usage.UsageCapacity#isLimit(long)
+     */
+    public boolean isLimit(long size) {
+        return size >= limit;
+    }
+
+    
+    /**
+     * @return the limit
+     */
+    public final long getLimit(){
+        return this.limit;
+    }
+
+    
+    /**
+     * @param limit the limit to set
+     */
+    public final void setLimit(long limit){
+        this.limit=limit;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/MemoryUsage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/MemoryUsage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/MemoryUsage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/MemoryUsage.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+/**
+ * Used to keep track of how much of something is being used so that a
+ * productive working set usage can be controlled. Main use case is manage
+ * memory usage.
+ * 
+ * @org.apache.xbean.XBean
+ * @version $Revision: 1.3 $
+ */
+public class MemoryUsage extends Usage<MemoryUsage> {
+
+    private long usage;
+
+    public MemoryUsage() {
+        this(null, null);
+    }
+
+    /**
+     * Create the memory manager linked to a parent. When the memory manager is
+     * linked to a parent then when usage increased or decreased, the parent's
+     * usage is also increased or decreased.
+     * 
+     * @param parent
+     */
+    public MemoryUsage(MemoryUsage parent) {
+        this(parent, "default");
+    }
+
+    public MemoryUsage(String name) {
+        this(null, name);
+    }
+
+    public MemoryUsage(MemoryUsage parent, String name) {
+        this(parent, name, 1.0f);
+    }
+
+    public MemoryUsage(MemoryUsage parent, String name, float portion) {
+        super(parent, name, portion);
+    }
+
+    /**
+     * @throws InterruptedException
+     */
+    public void waitForSpace() throws InterruptedException {
+        if (parent != null) {
+            parent.waitForSpace();
+        }
+        synchronized (usageMutex) {
+            for (int i = 0; percentUsage >= 100; i++) {
+                usageMutex.wait();
+            }
+        }
+    }
+
+    /**
+     * @param timeout
+     * @throws InterruptedException
+     * @return true if space
+     */
+    public boolean waitForSpace(long timeout) throws InterruptedException {
+        if (parent != null) {
+            if (!parent.waitForSpace(timeout)) {
+                return false;
+            }
+        }
+        synchronized (usageMutex) {
+            if (percentUsage >= 100) {
+                usageMutex.wait(timeout);
+            }
+            return percentUsage < 100;
+        }
+    }
+
+    public boolean isFull() {
+        if (parent != null && parent.isFull()) {
+            return true;
+        }
+        synchronized (usageMutex) {
+            return percentUsage >= 100;
+        }
+    }
+
+    /**
+     * Tries to increase the usage by value amount but blocks if this object is
+     * currently full.
+     * 
+     * @param value
+     * @throws InterruptedException
+     */
+    public void enqueueUsage(long value) throws InterruptedException {
+        waitForSpace();
+        increaseUsage(value);
+    }
+
+    /**
+     * Increases the usage by the value amount.
+     * 
+     * @param value
+     */
+    public void increaseUsage(long value) {
+        if (value == 0) {
+            return;
+        }
+        int percentUsage;
+        synchronized (usageMutex) {
+            usage += value;
+            percentUsage = caclPercentUsage();
+        }
+        setPercentUsage(percentUsage);
+        if (parent != null) {
+            ((MemoryUsage)parent).increaseUsage(value);
+        }
+    }
+
+    /**
+     * Decreases the usage by the value amount.
+     * 
+     * @param value
+     */
+    public void decreaseUsage(long value) {
+        if (value == 0) {
+            return;
+        }
+        int percentUsage;
+        synchronized (usageMutex) {
+            usage -= value;
+            percentUsage = caclPercentUsage();
+        }
+        setPercentUsage(percentUsage);
+        if (parent != null) {
+            parent.decreaseUsage(value);
+        }
+    }
+
+    protected long retrieveUsage() {
+        return usage;
+    }
+
+    public long getUsage() {
+        return usage;
+    }
+
+    public void setUsage(long usage) {
+        this.usage = usage;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/MemoryUsage.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/Usage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/Usage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/Usage.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Used to keep track of how much of something is being used so that a
+ * productive working set usage can be controlled. Main use case is manage
+ * memory usage.
+ * 
+ * @org.apache.xbean.XBean
+ * @version $Revision: 1.3 $
+ */
+public abstract class Usage<T extends Usage> implements Service {
+
+    private static final Log LOG = LogFactory.getLog(Usage.class);
+    protected final Object usageMutex = new Object();
+    protected int percentUsage;
+    protected T parent;
+    private UsageCapacity limiter = new DefaultUsageCapacity();
+    private int percentUsageMinDelta = 1;
+    private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>();
+    private final boolean debug = LOG.isDebugEnabled();
+    private String name;
+    private float usagePortion = 1.0f;
+    private List<T> children = new CopyOnWriteArrayList<T>();
+    private final List<Runnable> callbacks = new LinkedList<Runnable>();
+    private int pollingTime = 100;
+    private volatile ThreadPoolExecutor executor;
+    private AtomicBoolean started=new AtomicBoolean();
+
+    public Usage(T parent, String name, float portion) {
+        this.parent = parent;
+        this.usagePortion = portion;
+        if (parent != null) {
+            this.limiter.setLimit((long)(parent.getLimit() * portion));
+            name = parent.name + ":" + name;
+        }
+        this.name = name;
+    }
+
+    protected abstract long retrieveUsage();
+
+    /**
+     * @throws InterruptedException
+     */
+    public void waitForSpace() throws InterruptedException {
+        waitForSpace(0);
+    }
+
+    /**
+     * @param timeout
+     * @throws InterruptedException
+     * @return true if space
+     */
+    public boolean waitForSpace(long timeout) throws InterruptedException {
+        if (parent != null) {
+            if (!parent.waitForSpace(timeout)) {
+                return false;
+            }
+        }
+        synchronized (usageMutex) {
+            percentUsage=caclPercentUsage();
+            if (percentUsage >= 100) {
+                long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
+                long timeleft = deadline;
+                while (timeleft > 0) {
+                    percentUsage=caclPercentUsage();
+                    if (percentUsage >= 100) {
+                        usageMutex.wait(pollingTime);
+                        timeleft = deadline - System.currentTimeMillis();
+                    } else {
+                        break;
+                    }
+                }
+            }
+            return percentUsage < 100;
+        }
+    }
+
+    public boolean isFull() {
+        if (parent != null && parent.isFull()) {
+            return true;
+        }
+        synchronized (usageMutex) {
+            percentUsage=caclPercentUsage();
+            return percentUsage >= 100;
+        }
+    }
+
+    public void addUsageListener(UsageListener listener) {
+        listeners.add(listener);
+    }
+
+    public void removeUsageListener(UsageListener listener) {
+        listeners.remove(listener);
+    }
+
+    public long getLimit() {
+        synchronized (usageMutex) {
+            return limiter.getLimit();
+        }
+    }
+
+    /**
+     * Sets the memory limit in bytes. Setting the limit in bytes will set the
+     * usagePortion to 0 since the UsageManager is not going to be portion based
+     * off the parent. When set using XBean, you can use values such as: "20
+     * mb", "1024 kb", or "1 gb"
+     * 
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
+     */
+    public void setLimit(long limit) {
+        if (percentUsageMinDelta < 0) {
+            throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
+        }
+        synchronized (usageMutex) {
+            this.limiter.setLimit(limit);
+            this.usagePortion = 0;
+        }
+        onLimitChange();
+    }
+
+    protected void onLimitChange() {
+        // We may need to calculate the limit
+        if (usagePortion > 0 && parent != null) {
+            synchronized (usageMutex) {
+                this.limiter.setLimit((long)(parent.getLimit() * usagePortion));
+            }
+        }
+        // Reset the percent currently being used.
+        int percentUsage;
+        synchronized (usageMutex) {
+            percentUsage = caclPercentUsage();
+        }
+        setPercentUsage(percentUsage);
+        // Let the children know that the limit has changed. They may need to
+        // set
+        // their limits based on ours.
+        for (T child : children) {
+            child.onLimitChange();
+        }
+    }
+
+    public float getUsagePortion() {
+        synchronized (usageMutex) {
+            return usagePortion;
+        }
+    }
+
+    public void setUsagePortion(float usagePortion) {
+        synchronized (usageMutex) {
+            this.usagePortion = usagePortion;
+        }
+        onLimitChange();
+    }
+
+    public int getPercentUsage() {
+        synchronized (usageMutex) {
+            return percentUsage;
+        }
+    }
+
+    public int getPercentUsageMinDelta() {
+        synchronized (usageMutex) {
+            return percentUsageMinDelta;
+        }
+    }
+
+    /**
+     * Sets the minimum number of percentage points the usage has to change
+     * before a UsageListener event is fired by the manager.
+     * 
+     * @param percentUsageMinDelta
+     */
+    public void setPercentUsageMinDelta(int percentUsageMinDelta) {
+        if (percentUsageMinDelta < 1) {
+            throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
+        }
+        int percentUsage;
+        synchronized (usageMutex) {
+            this.percentUsageMinDelta = percentUsageMinDelta;
+            percentUsage = caclPercentUsage();
+        }
+        setPercentUsage(percentUsage);
+    }
+
+    public long getUsage() {
+        synchronized (usageMutex) {
+            return retrieveUsage();
+        }
+    }
+
+    protected void setPercentUsage(int value) {
+        synchronized (usageMutex) {
+            int oldValue = percentUsage;
+            percentUsage = value;
+            if (oldValue != value) {
+                fireEvent(oldValue, value);
+            }
+        }
+    }
+
+    protected int caclPercentUsage() {
+        if (limiter.getLimit() == 0) {
+            return 0;
+        }
+        return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
+    }
+
+    private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
+        if (debug) {
+            LOG.debug("Memory usage change.  from: " + oldPercentUsage + ", to: " + newPercentUsage);
+        }    
+             
+        if (started.get()) {
+            // Switching from being full to not being full..
+            if (oldPercentUsage >= 100 && newPercentUsage < 100) {
+                synchronized (usageMutex) {
+                    usageMutex.notifyAll();
+                    for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
+                        Runnable callback = iter.next();
+                        getExecutor().execute(callback);
+                    }
+                    callbacks.clear();
+                }
+            }
+            // Let the listeners know on a separate thread
+            Runnable listenerNotifier = new Runnable() {
+            
+                public void run() {
+                    for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
+                        UsageListener l = iter.next();
+                        l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
+                    }
+                }
+            
+            };
+            if (started.get()) {
+                getExecutor().execute(listenerNotifier);
+            } else {
+                LOG.warn("not notifying usage change to listeners on shutdown");
+            }
+        }
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String toString() {
+        return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limiter.getLimit() + " percentUsageMinDelta=" + percentUsageMinDelta + "%";
+    }
+
+    @SuppressWarnings("unchecked")
+    public void start() {
+        if (started.compareAndSet(false, true)){
+            if (parent != null) {
+                parent.addChild(this);
+            }
+            for (T t:children) {
+                t.start();
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void stop() {
+        if (started.compareAndSet(true, false)){
+            if (parent != null) {
+                parent.removeChild(this);
+            }
+            if (this.executor != null){
+                this.executor.shutdownNow();
+            }
+            //clear down any callbacks
+            synchronized (usageMutex) {
+                usageMutex.notifyAll();
+                for (Iterator<Runnable> iter = new ArrayList<Runnable>(this.callbacks).iterator(); iter.hasNext();) {
+                    Runnable callback = iter.next();
+                    callback.run();
+                }
+                this.callbacks.clear();
+            }
+            for (T t:children) {
+                t.stop();
+            }
+        }
+    }
+
+    private void addChild(T child) {
+        children.add(child);
+        if (started.get()) {
+            child.start();
+        }
+    }
+
+    private void removeChild(T child) {
+        children.remove(child);
+    }
+
+    /**
+     * @param callback
+     * @return true if the UsageManager was full. The callback will only be
+     *         called if this method returns true.
+     */
+    public boolean notifyCallbackWhenNotFull(final Runnable callback) {
+        if (parent != null) {
+            Runnable r = new Runnable() {
+
+                public void run() {
+                    synchronized (usageMutex) {
+                        if (percentUsage >= 100) {
+                            callbacks.add(callback);
+                        } else {
+                            callback.run();
+                        }
+                    }
+                }
+            };
+            if (parent.notifyCallbackWhenNotFull(r)) {
+                return true;
+            }
+        }
+        synchronized (usageMutex) {
+            if (percentUsage >= 100) {
+                callbacks.add(callback);
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    /**
+     * @return the limiter
+     */
+    public UsageCapacity getLimiter() {
+        return this.limiter;
+    }
+
+    /**
+     * @param limiter the limiter to set
+     */
+    public void setLimiter(UsageCapacity limiter) {
+        this.limiter = limiter;
+    }
+
+    /**
+     * @return the pollingTime
+     */
+    public int getPollingTime() {
+        return this.pollingTime;
+    }
+
+    /**
+     * @param pollingTime the pollingTime to set
+     */
+    public void setPollingTime(int pollingTime) {
+        this.pollingTime = pollingTime;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public T getParent() {
+        return parent;
+    }
+
+    public void setParent(T parent) {
+        this.parent = parent;
+    }
+    
+    protected Executor getExecutor() {
+        if (this.executor == null) {
+        	synchronized(usageMutex) {
+        		if (this.executor == null) {
+		            this.executor = new ThreadPoolExecutor(1, 1, 0,
+		                    TimeUnit.NANOSECONDS,
+		                    new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+		                        public Thread newThread(Runnable runnable) {
+		                            Thread thread = new Thread(runnable, getName()
+		                                    + " Usage Thread Pool");
+		                            thread.setDaemon(true);
+		                            return thread;
+		                        }
+		                    });
+        		}
+        	}
+        }
+        return this.executor;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/Usage.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageCapacity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageCapacity.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageCapacity.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageCapacity.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+
+
+/**
+ Identify if a limit has been reached
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 1.3 $
+ */
+public interface UsageCapacity{
+
+    /**
+     * Has the limit been reached ?
+     * 
+     * @param size
+     * @return true if it has
+     */
+    boolean isLimit(long size);
+    
+    
+    /**
+     * @return the limit
+     */
+    long getLimit();
+    
+    /**
+     * @param limit the limit to set
+     */
+    void setLimit(long limit);
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageCapacity.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageListener.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageListener.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+public interface UsageListener {
+    void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage);
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageListener.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/util/MarshallingSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/util/MarshallingSupport.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/util/MarshallingSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/util/MarshallingSupport.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * The fixed version of the UTF8 encoding function. Some older JVM's UTF8
+ * encoding function breaks when handling large strings.
+ * 
+ * @version $Revision$
+ */
+public final class MarshallingSupport {
+
+    public static final byte NULL = 0;
+    public static final byte BOOLEAN_TYPE = 1;
+    public static final byte BYTE_TYPE = 2;
+    public static final byte CHAR_TYPE = 3;
+    public static final byte SHORT_TYPE = 4;
+    public static final byte INTEGER_TYPE = 5;
+    public static final byte LONG_TYPE = 6;
+    public static final byte DOUBLE_TYPE = 7;
+    public static final byte FLOAT_TYPE = 8;
+    public static final byte STRING_TYPE = 9;
+    public static final byte BYTE_ARRAY_TYPE = 10;
+    public static final byte MAP_TYPE = 11;
+    public static final byte LIST_TYPE = 12;
+    public static final byte BIG_STRING_TYPE = 13;
+
+    private MarshallingSupport() {
+    }
+    
+    public static void marshalPrimitiveMap(Map map, DataOutputStream out) throws IOException {
+        if (map == null) {
+            out.writeInt(-1);
+        } else {
+            out.writeInt(map.size());
+            for (Iterator iter = map.keySet().iterator(); iter.hasNext();) {
+                String name = (String)iter.next();
+                out.writeUTF(name);
+                Object value = map.get(name);
+                marshalPrimitive(out, value);
+            }
+        }
+    }
+
+    public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in) throws IOException {
+        return unmarshalPrimitiveMap(in, Integer.MAX_VALUE);
+    }
+
+    /**
+     * @param in
+     * @return
+     * @throws IOException
+     * @throws IOException
+     */
+    public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in, int maxPropertySize) throws IOException {
+        int size = in.readInt();
+        if (size > maxPropertySize) {
+            throw new IOException("Primitive map is larger than the allowed size: " + size);
+        }
+        if (size < 0) {
+            return null;
+        } else {
+            Map<String, Object> rc = new HashMap<String, Object>(size);
+            for (int i = 0; i < size; i++) {
+                String name = in.readUTF();
+                rc.put(name, unmarshalPrimitive(in));
+            }
+            return rc;
+        }
+
+    }
+
+    public static void marshalPrimitiveList(List list, DataOutputStream out) throws IOException {
+        out.writeInt(list.size());
+        for (Iterator iter = list.iterator(); iter.hasNext();) {
+            Object element = (Object)iter.next();
+            marshalPrimitive(out, element);
+        }
+    }
+
+    public static List<Object> unmarshalPrimitiveList(DataInputStream in) throws IOException {
+        int size = in.readInt();
+        List<Object> answer = new ArrayList<Object>(size);
+        while (size-- > 0) {
+            answer.add(unmarshalPrimitive(in));
+        }
+        return answer;
+    }
+
+    public static void marshalPrimitive(DataOutputStream out, Object value) throws IOException {
+        if (value == null) {
+            marshalNull(out);
+        } else if (value.getClass() == Boolean.class) {
+            marshalBoolean(out, ((Boolean)value).booleanValue());
+        } else if (value.getClass() == Byte.class) {
+            marshalByte(out, ((Byte)value).byteValue());
+        } else if (value.getClass() == Character.class) {
+            marshalChar(out, ((Character)value).charValue());
+        } else if (value.getClass() == Short.class) {
+            marshalShort(out, ((Short)value).shortValue());
+        } else if (value.getClass() == Integer.class) {
+            marshalInt(out, ((Integer)value).intValue());
+        } else if (value.getClass() == Long.class) {
+            marshalLong(out, ((Long)value).longValue());
+        } else if (value.getClass() == Float.class) {
+            marshalFloat(out, ((Float)value).floatValue());
+        } else if (value.getClass() == Double.class) {
+            marshalDouble(out, ((Double)value).doubleValue());
+        } else if (value.getClass() == byte[].class) {
+            marshalByteArray(out, (byte[])value);
+        } else if (value.getClass() == String.class) {
+            marshalString(out, (String)value);
+        } else if (value instanceof Map) {
+            out.writeByte(MAP_TYPE);
+            marshalPrimitiveMap((Map)value, out);
+        } else if (value instanceof List) {
+            out.writeByte(LIST_TYPE);
+            marshalPrimitiveList((List)value, out);
+        } else {
+            throw new IOException("Object is not a primitive: " + value);
+        }
+    }
+
+    public static Object unmarshalPrimitive(DataInputStream in) throws IOException {
+        Object value = null;
+        byte type = in.readByte();
+        switch (type) {
+        case BYTE_TYPE:
+            value = Byte.valueOf(in.readByte());
+            break;
+        case BOOLEAN_TYPE:
+            value = in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+            break;
+        case CHAR_TYPE:
+            value = Character.valueOf(in.readChar());
+            break;
+        case SHORT_TYPE:
+            value = Short.valueOf(in.readShort());
+            break;
+        case INTEGER_TYPE:
+            value = Integer.valueOf(in.readInt());
+            break;
+        case LONG_TYPE:
+            value = Long.valueOf(in.readLong());
+            break;
+        case FLOAT_TYPE:
+            value = new Float(in.readFloat());
+            break;
+        case DOUBLE_TYPE:
+            value = new Double(in.readDouble());
+            break;
+        case BYTE_ARRAY_TYPE:
+            value = new byte[in.readInt()];
+            in.readFully((byte[])value);
+            break;
+        case STRING_TYPE:
+            value = in.readUTF();
+            break;
+        case BIG_STRING_TYPE:
+            value = readUTF8(in);
+            break;
+        case MAP_TYPE:
+            value = unmarshalPrimitiveMap(in);
+            break;
+        case LIST_TYPE:
+            value = unmarshalPrimitiveList(in);
+            break;
+        case NULL:
+            value = null;
+            break;
+        default:
+            throw new IOException("Unknown primitive type: " + type);
+        }
+        return value;
+    }
+
+    public static void marshalNull(DataOutputStream out) throws IOException {
+        out.writeByte(NULL);
+    }
+
+    public static void marshalBoolean(DataOutputStream out, boolean value) throws IOException {
+        out.writeByte(BOOLEAN_TYPE);
+        out.writeBoolean(value);
+    }
+
+    public static void marshalByte(DataOutputStream out, byte value) throws IOException {
+        out.writeByte(BYTE_TYPE);
+        out.writeByte(value);
+    }
+
+    public static void marshalChar(DataOutputStream out, char value) throws IOException {
+        out.writeByte(CHAR_TYPE);
+        out.writeChar(value);
+    }
+
+    public static void marshalShort(DataOutputStream out, short value) throws IOException {
+        out.writeByte(SHORT_TYPE);
+        out.writeShort(value);
+    }
+
+    public static void marshalInt(DataOutputStream out, int value) throws IOException {
+        out.writeByte(INTEGER_TYPE);
+        out.writeInt(value);
+    }
+
+    public static void marshalLong(DataOutputStream out, long value) throws IOException {
+        out.writeByte(LONG_TYPE);
+        out.writeLong(value);
+    }
+
+    public static void marshalFloat(DataOutputStream out, float value) throws IOException {
+        out.writeByte(FLOAT_TYPE);
+        out.writeFloat(value);
+    }
+
+    public static void marshalDouble(DataOutputStream out, double value) throws IOException {
+        out.writeByte(DOUBLE_TYPE);
+        out.writeDouble(value);
+    }
+
+    public static void marshalByteArray(DataOutputStream out, byte[] value) throws IOException {
+        marshalByteArray(out, value, 0, value.length);
+    }
+
+    public static void marshalByteArray(DataOutputStream out, byte[] value, int offset, int length) throws IOException {
+        out.writeByte(BYTE_ARRAY_TYPE);
+        out.writeInt(length);
+        out.write(value, offset, length);
+    }
+
+    public static void marshalString(DataOutputStream out, String s) throws IOException {
+        // If it's too big, out.writeUTF may not able able to write it out.
+        if (s.length() < Short.MAX_VALUE / 4) {
+            out.writeByte(STRING_TYPE);
+            out.writeUTF(s);
+        } else {
+            out.writeByte(BIG_STRING_TYPE);
+            writeUTF8(out, s);
+        }
+    }
+
+    public static void writeUTF8(DataOutput dataOut, String text) throws IOException {
+        if (text != null) {
+            int strlen = text.length();
+            int utflen = 0;
+            char[] charr = new char[strlen];
+            int c = 0;
+            int count = 0;
+
+            text.getChars(0, strlen, charr, 0);
+
+            for (int i = 0; i < strlen; i++) {
+                c = charr[i];
+                if ((c >= 0x0001) && (c <= 0x007F)) {
+                    utflen++;
+                } else if (c > 0x07FF) {
+                    utflen += 3;
+                } else {
+                    utflen += 2;
+                }
+            }
+            // TODO diff: Sun code - removed
+            byte[] bytearr = new byte[utflen + 4]; // TODO diff: Sun code
+            bytearr[count++] = (byte)((utflen >>> 24) & 0xFF); // TODO diff:
+            // Sun code
+            bytearr[count++] = (byte)((utflen >>> 16) & 0xFF); // TODO diff:
+            // Sun code
+            bytearr[count++] = (byte)((utflen >>> 8) & 0xFF);
+            bytearr[count++] = (byte)((utflen >>> 0) & 0xFF);
+            for (int i = 0; i < strlen; i++) {
+                c = charr[i];
+                if ((c >= 0x0001) && (c <= 0x007F)) {
+                    bytearr[count++] = (byte)c;
+                } else if (c > 0x07FF) {
+                    bytearr[count++] = (byte)(0xE0 | ((c >> 12) & 0x0F));
+                    bytearr[count++] = (byte)(0x80 | ((c >> 6) & 0x3F));
+                    bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+                } else {
+                    bytearr[count++] = (byte)(0xC0 | ((c >> 6) & 0x1F));
+                    bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+                }
+            }
+            dataOut.write(bytearr);
+
+        } else {
+            dataOut.writeInt(-1);
+        }
+    }
+
+    public static String readUTF8(DataInput dataIn) throws IOException {
+        int utflen = dataIn.readInt(); // TODO diff: Sun code
+        if (utflen > -1) {
+            StringBuffer str = new StringBuffer(utflen);
+            byte bytearr[] = new byte[utflen];
+            int c;
+            int char2;
+            int char3;
+            int count = 0;
+
+            dataIn.readFully(bytearr, 0, utflen);
+
+            while (count < utflen) {
+                c = bytearr[count] & 0xff;
+                switch (c >> 4) {
+                case 0:
+                case 1:
+                case 2:
+                case 3:
+                case 4:
+                case 5:
+                case 6:
+                case 7:
+                    /* 0xxxxxxx */
+                    count++;
+                    str.append((char)c);
+                    break;
+                case 12:
+                case 13:
+                    /* 110x xxxx 10xx xxxx */
+                    count += 2;
+                    if (count > utflen) {
+                        throw new UTFDataFormatException();
+                    }
+                    char2 = bytearr[count - 1];
+                    if ((char2 & 0xC0) != 0x80) {
+                        throw new UTFDataFormatException();
+                    }
+                    str.append((char)(((c & 0x1F) << 6) | (char2 & 0x3F)));
+                    break;
+                case 14:
+                    /* 1110 xxxx 10xx xxxx 10xx xxxx */
+                    count += 3;
+                    if (count > utflen) {
+                        throw new UTFDataFormatException();
+                    }
+                    char2 = bytearr[count - 2]; // TODO diff: Sun code
+                    char3 = bytearr[count - 1]; // TODO diff: Sun code
+                    if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+                        throw new UTFDataFormatException();
+                    }
+                    str.append((char)(((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)));
+                    break;
+                default:
+                    /* 10xx xxxx, 1111 xxxx */
+                    throw new UTFDataFormatException();
+                }
+            }
+            // The number of chars produced may be less than utflen
+            return new String(str);
+        } else {
+            return null;
+        }
+    }
+
+    public static String propertiesToString(Properties props) throws IOException {
+        String result = "";
+        if (props != null) {
+            DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream();
+            props.store(dataOut, "");
+            result = new String(dataOut.getData(), 0, dataOut.size());
+            dataOut.close();
+        }
+        return result;
+    }
+
+    public static Properties stringToProperties(String str) throws IOException {
+        Properties result = new Properties();
+        if (str != null && str.length() > 0) {
+            DataByteArrayInputStream dataIn = new DataByteArrayInputStream(str.getBytes());
+            result.load(dataIn);
+            dataIn.close();
+        }
+        return result;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/util/MarshallingSupport.java
------------------------------------------------------------------------------
    svn:executable = *