You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/01 20:37:54 UTC
svn commit: r780773 [30/31] - in /activemq/sandbox/activemq-flow:
activemq-client/ activemq-client/src/main/java/org/
activemq-client/src/main/java/org/apache/
activemq-client/src/main/java/org/apache/activemq/
activemq-client/src/main/java/org/apache/...
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.state;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Tracks the state of a connection so a newly established transport can be
+ * re-initialized to the state that was tracked.
+ *
+ * @version $Revision$
+ */
+public class ConnectionStateTracker extends CommandVisitorAdapter {
+ private static final Log LOG = LogFactory.getLog(ConnectionStateTracker.class);
+
+ private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
+
+ protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
+
+ private boolean trackTransactions;
+ private boolean restoreSessions = true;
+ private boolean restoreConsumers = true;
+ private boolean restoreProducers = true;
+ private boolean restoreTransaction = true;
+ private boolean trackMessages = true;
+ private int maxCacheSize = 128 * 1024;
+ private int currentCacheSize;
+ private Map<MessageId,Message> messageCache = new LinkedHashMap<MessageId,Message>(){
+ protected boolean removeEldestEntry(Map.Entry<MessageId,Message> eldest) {
+ boolean result = currentCacheSize > maxCacheSize;
+ if (result) {
+ currentCacheSize -= eldest.getValue().getSize();
+ }
+ return result;
+ }
+ };
+
+
+ private class RemoveTransactionAction implements Runnable {
+ private final TransactionInfo info;
+
+ public RemoveTransactionAction(TransactionInfo info) {
+ this.info = info;
+ }
+
+ public void run() {
+ ConnectionId connectionId = info.getConnectionId();
+ ConnectionState cs = connectionStates.get(connectionId);
+ cs.removeTransactionState(info.getTransactionId());
+ }
+ }
+
+ /**
+ *
+ *
+ * @param command
+ * @return null if the command is not state tracked.
+ * @throws IOException
+ */
+ public Tracked track(Command command) throws IOException {
+ try {
+ return (Tracked)command.visit(this);
+ } catch (IOException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ public void trackBack(Command command) {
+ if (trackMessages && command != null && command.isMessage()) {
+ Message message = (Message) command;
+ if (message.getTransactionId()==null) {
+ currentCacheSize = currentCacheSize + message.getSize();
+ }
+ }
+ }
+
+ public void restore(Transport transport) throws IOException {
+ // Restore the connections.
+ for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
+ ConnectionState connectionState = iter.next();
+ transport.oneway(connectionState.getInfo());
+ restoreTempDestinations(transport, connectionState);
+
+ if (restoreSessions) {
+ restoreSessions(transport, connectionState);
+ }
+
+ if (restoreTransaction) {
+ restoreTransactions(transport, connectionState);
+ }
+ }
+ //now flush messages
+ for (Message msg:messageCache.values()) {
+ transport.oneway(msg);
+ }
+ }
+
+ private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
+ for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) {
+ TransactionState transactionState = (TransactionState)iter.next();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("tx: " + transactionState.getId());
+ }
+ for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) {
+ Command command = (Command)iterator.next();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("tx replay: " + command);
+ }
+ transport.oneway(command);
+ }
+ }
+ }
+
+ /**
+ * @param transport
+ * @param connectionState
+ * @throws IOException
+ */
+ protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
+ // Restore the connection's sessions
+ for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
+ SessionState sessionState = (SessionState)iter2.next();
+ transport.oneway(sessionState.getInfo());
+
+ if (restoreProducers) {
+ restoreProducers(transport, sessionState);
+ }
+
+ if (restoreConsumers) {
+ restoreConsumers(transport, sessionState);
+ }
+ }
+ }
+
+ /**
+ * @param transport
+ * @param sessionState
+ * @throws IOException
+ */
+ protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
+ // Restore the session's consumers
+ for (Iterator iter3 = sessionState.getConsumerStates().iterator(); iter3.hasNext();) {
+ ConsumerState consumerState = (ConsumerState)iter3.next();
+ transport.oneway(consumerState.getInfo());
+ }
+ }
+
+ /**
+ * @param transport
+ * @param sessionState
+ * @throws IOException
+ */
+ protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
+ // Restore the session's producers
+ for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
+ ProducerState producerState = (ProducerState)iter3.next();
+ transport.oneway(producerState.getInfo());
+ }
+ }
+
+ /**
+ * @param transport
+ * @param connectionState
+ * @throws IOException
+ */
+ protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
+ throws IOException {
+ // Restore the connection's temp destinations.
+ for (Iterator iter2 = connectionState.getTempDesinations().iterator(); iter2.hasNext();) {
+ transport.oneway((DestinationInfo)iter2.next());
+ }
+ }
+
+ public Response processAddDestination(DestinationInfo info) {
+ if (info != null) {
+ ConnectionState cs = connectionStates.get(info.getConnectionId());
+ if (cs != null && info.getDestination().isTemporary()) {
+ cs.addTempDestination(info);
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public Response processRemoveDestination(DestinationInfo info) {
+ if (info != null) {
+ ConnectionState cs = connectionStates.get(info.getConnectionId());
+ if (cs != null && info.getDestination().isTemporary()) {
+ cs.removeTempDestination(info.getDestination());
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public Response processAddProducer(ProducerInfo info) {
+ if (info != null && info.getProducerId() != null) {
+ SessionId sessionId = info.getProducerId().getParentId();
+ if (sessionId != null) {
+ ConnectionId connectionId = sessionId.getParentId();
+ if (connectionId != null) {
+ ConnectionState cs = connectionStates.get(connectionId);
+ if (cs != null) {
+ SessionState ss = cs.getSessionState(sessionId);
+ if (ss != null) {
+ ss.addProducer(info);
+ }
+ }
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public Response processRemoveProducer(ProducerId id) {
+ if (id != null) {
+ SessionId sessionId = id.getParentId();
+ if (sessionId != null) {
+ ConnectionId connectionId = sessionId.getParentId();
+ if (connectionId != null) {
+ ConnectionState cs = connectionStates.get(connectionId);
+ if (cs != null) {
+ SessionState ss = cs.getSessionState(sessionId);
+ if (ss != null) {
+ ss.removeProducer(id);
+ }
+ }
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public Response processAddConsumer(ConsumerInfo info) {
+ if (info != null) {
+ SessionId sessionId = info.getConsumerId().getParentId();
+ if (sessionId != null) {
+ ConnectionId connectionId = sessionId.getParentId();
+ if (connectionId != null) {
+ ConnectionState cs = connectionStates.get(connectionId);
+ if (cs != null) {
+ SessionState ss = cs.getSessionState(sessionId);
+ if (ss != null) {
+ ss.addConsumer(info);
+ }
+ }
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
+ if (id != null) {
+ SessionId sessionId = id.getParentId();
+ if (sessionId != null) {
+ ConnectionId connectionId = sessionId.getParentId();
+ if (connectionId != null) {
+ ConnectionState cs = connectionStates.get(connectionId);
+ if (cs != null) {
+ SessionState ss = cs.getSessionState(sessionId);
+ if (ss != null) {
+ ss.removeConsumer(id);
+ }
+ }
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public Response processAddSession(SessionInfo info) {
+ if (info != null) {
+ ConnectionId connectionId = info.getSessionId().getParentId();
+ if (connectionId != null) {
+ ConnectionState cs = connectionStates.get(connectionId);
+ if (cs != null) {
+ cs.addSession(info);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
+ if (id != null) {
+ ConnectionId connectionId = id.getParentId();
+ if (connectionId != null) {
+ ConnectionState cs = connectionStates.get(connectionId);
+ if (cs != null) {
+ cs.removeSession(id);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public Response processAddConnection(ConnectionInfo info) {
+ if (info != null) {
+ connectionStates.put(info.getConnectionId(), new ConnectionState(info));
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
+ if (id != null) {
+ connectionStates.remove(id);
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public Response processMessage(Message send) throws Exception {
+ if (send != null) {
+ if (trackTransactions && send.getTransactionId() != null) {
+ ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
+ if (connectionId != null) {
+ ConnectionState cs = connectionStates.get(connectionId);
+ if (cs != null) {
+ TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
+ if (transactionState != null) {
+ transactionState.addCommand(send);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }else if (trackMessages) {
+ messageCache.put(send.getMessageId(), send.copy());
+ }
+ }
+ return null;
+ }
+
+ public Response processBeginTransaction(TransactionInfo info) {
+ if (trackTransactions && info != null && info.getTransactionId() != null) {
+ ConnectionId connectionId = info.getConnectionId();
+ if (connectionId != null) {
+ ConnectionState cs = connectionStates.get(connectionId);
+ if (cs != null) {
+ cs.addTransactionState(info.getTransactionId());
+ TransactionState state = cs.getTransactionState(info.getTransactionId());
+ state.addCommand(info);
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
+ }
+
+ public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+ if (trackTransactions && info != null) {
+ ConnectionId connectionId = info.getConnectionId();
+ if (connectionId != null) {
+ ConnectionState cs = connectionStates.get(connectionId);
+ if (cs != null) {
+ TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+ if (transactionState != null) {
+ transactionState.addCommand(info);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
+ }
+
+ public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+ if (trackTransactions && info != null) {
+ ConnectionId connectionId = info.getConnectionId();
+ if (connectionId != null) {
+ ConnectionState cs = connectionStates.get(connectionId);
+ if (cs != null) {
+ TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+ if (transactionState != null) {
+ transactionState.addCommand(info);
+ return new Tracked(new RemoveTransactionAction(info));
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+ if (trackTransactions && info != null) {
+ ConnectionId connectionId = info.getConnectionId();
+ if (connectionId != null) {
+ ConnectionState cs = connectionStates.get(connectionId);
+ if (cs != null) {
+ TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+ if (transactionState != null) {
+ transactionState.addCommand(info);
+ return new Tracked(new RemoveTransactionAction(info));
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+ if (trackTransactions && info != null) {
+ ConnectionId connectionId = info.getConnectionId();
+ if (connectionId != null) {
+ ConnectionState cs = connectionStates.get(connectionId);
+ if (cs != null) {
+ TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+ if (transactionState != null) {
+ transactionState.addCommand(info);
+ return new Tracked(new RemoveTransactionAction(info));
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public Response processEndTransaction(TransactionInfo info) throws Exception {
+ if (trackTransactions && info != null) {
+ ConnectionId connectionId = info.getConnectionId();
+ if (connectionId != null) {
+ ConnectionState cs = connectionStates.get(connectionId);
+ if (cs != null) {
+ TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+ if (transactionState != null) {
+ transactionState.addCommand(info);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
+ }
+
+ public boolean isRestoreConsumers() {
+ return restoreConsumers;
+ }
+
+ public void setRestoreConsumers(boolean restoreConsumers) {
+ this.restoreConsumers = restoreConsumers;
+ }
+
+ public boolean isRestoreProducers() {
+ return restoreProducers;
+ }
+
+ public void setRestoreProducers(boolean restoreProducers) {
+ this.restoreProducers = restoreProducers;
+ }
+
+ public boolean isRestoreSessions() {
+ return restoreSessions;
+ }
+
+ public void setRestoreSessions(boolean restoreSessions) {
+ this.restoreSessions = restoreSessions;
+ }
+
+ public boolean isTrackTransactions() {
+ return trackTransactions;
+ }
+
+ public void setTrackTransactions(boolean trackTransactions) {
+ this.trackTransactions = trackTransactions;
+ }
+
+ public boolean isRestoreTransaction() {
+ return restoreTransaction;
+ }
+
+ public void setRestoreTransaction(boolean restoreTransaction) {
+ this.restoreTransaction = restoreTransaction;
+ }
+
+ public boolean isTrackMessages() {
+ return trackMessages;
+ }
+
+ public void setTrackMessages(boolean trackMessages) {
+ this.trackMessages = trackMessages;
+ }
+
+ public int getMaxCacheSize() {
+ return maxCacheSize;
+ }
+
+ public void setMaxCacheSize(int maxCacheSize) {
+ this.maxCacheSize = maxCacheSize;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConsumerState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConsumerState.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConsumerState.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConsumerState.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.state;
+
+import org.apache.activemq.command.ConsumerInfo;
+
+public class ConsumerState {
+ final ConsumerInfo info;
+
+ public ConsumerState(ConsumerInfo info) {
+ this.info = info;
+ }
+ public String toString() {
+ return info.toString();
+ }
+ public ConsumerInfo getInfo() {
+ return info;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ConsumerState.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ProducerState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ProducerState.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ProducerState.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ProducerState.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.state;
+
+import org.apache.activemq.command.ProducerInfo;
+
+public class ProducerState {
+ final ProducerInfo info;
+
+ public ProducerState(ProducerInfo info) {
+ this.info = info;
+ }
+
+ public String toString() {
+ return info.toString();
+ }
+
+ public ProducerInfo getInfo() {
+ return info;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/ProducerState.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/SessionState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/SessionState.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/SessionState.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/SessionState.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.state;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class SessionState {
+ final SessionInfo info;
+
+ private final Map<ProducerId, ProducerState> producers = new ConcurrentHashMap<ProducerId, ProducerState>();
+ private final Map<ConsumerId, ConsumerState> consumers = new ConcurrentHashMap<ConsumerId, ConsumerState>();
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+ public SessionState(SessionInfo info) {
+ this.info = info;
+ }
+
+ public String toString() {
+ return info.toString();
+ }
+
+ public void addProducer(ProducerInfo info) {
+ checkShutdown();
+ producers.put(info.getProducerId(), new ProducerState(info));
+ }
+
+ public ProducerState removeProducer(ProducerId id) {
+ return producers.remove(id);
+ }
+
+ public void addConsumer(ConsumerInfo info) {
+ checkShutdown();
+ consumers.put(info.getConsumerId(), new ConsumerState(info));
+ }
+
+ public ConsumerState removeConsumer(ConsumerId id) {
+ return consumers.remove(id);
+ }
+
+ public SessionInfo getInfo() {
+ return info;
+ }
+
+ public Set<ConsumerId> getConsumerIds() {
+ return consumers.keySet();
+ }
+
+ public Set<ProducerId> getProducerIds() {
+ return producers.keySet();
+ }
+
+ public Collection<ProducerState> getProducerStates() {
+ return producers.values();
+ }
+
+ public ProducerState getProducerState(ProducerId producerId) {
+ return producers.get(producerId);
+ }
+
+ public Collection<ConsumerState> getConsumerStates() {
+ return consumers.values();
+ }
+
+ public ConsumerState getConsumerState(ConsumerId consumerId) {
+ return consumers.get(consumerId);
+ }
+
+ private void checkShutdown() {
+ if (shutdown.get()) {
+ throw new IllegalStateException("Disposed");
+ }
+ }
+
+ public void shutdown() {
+ shutdown.set(false);
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/SessionState.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/Tracked.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/Tracked.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/Tracked.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/Tracked.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.state;
+
+import org.apache.activemq.command.Response;
+
+public class Tracked extends Response {
+
+ private Runnable runnable;
+
+ public Tracked(Runnable runnable) {
+ this.runnable = runnable;
+ }
+
+ public void onResponses() {
+ if (runnable != null) {
+ runnable.run();
+ runnable = null;
+ }
+ }
+
+ public boolean isWaitingForResponse() {
+ return runnable != null;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/TransactionState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/TransactionState.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/TransactionState.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/TransactionState.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.state;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.TransactionId;
+
+public class TransactionState {
+
+ private final List<Command> commands = new ArrayList<Command>();
+ private final TransactionId id;
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
+ private boolean prepared;
+ private int preparedResult;
+
+ public TransactionState(TransactionId id) {
+ this.id = id;
+ }
+
+ public String toString() {
+ return id.toString();
+ }
+
+ public void addCommand(Command operation) {
+ checkShutdown();
+ commands.add(operation);
+ }
+
+ public List<Command> getCommands() {
+ return commands;
+ }
+
+ private void checkShutdown() {
+ if (shutdown.get()) {
+ throw new IllegalStateException("Disposed");
+ }
+ }
+
+ public void shutdown() {
+ shutdown.set(false);
+ }
+
+ public TransactionId getId() {
+ return id;
+ }
+
+ public void setPrepared(boolean prepared) {
+ this.prepared = prepared;
+ }
+
+ public boolean isPrepared() {
+ return prepared;
+ }
+
+ public void setPreparedResult(int preparedResult) {
+ this.preparedResult = preparedResult;
+ }
+
+ public int getPreparedResult() {
+ return preparedResult;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Negotiates the wire format with a new connection
+ */
+public class WireFormatNegotiator extends TransportFilter {
+
+ private static final Log LOG = LogFactory.getLog(WireFormatNegotiator.class);
+
+ private OpenWireFormat wireFormat;
+ private final int minimumVersion;
+ private long negotiateTimeout = 15000L;
+
+ private final AtomicBoolean firstStart = new AtomicBoolean(true);
+ private final CountDownLatch readyCountDownLatch = new CountDownLatch(1);
+ private final CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
+
+ /**
+ * Negotiator
+ *
+ * @param next
+ */
+ public WireFormatNegotiator(Transport next, OpenWireFormat wireFormat, int minimumVersion) {
+ super(next);
+ this.wireFormat = wireFormat;
+ if (minimumVersion <= 0) {
+ minimumVersion = 1;
+ }
+ this.minimumVersion = minimumVersion;
+
+ // Setup the initial negociation timeout to be the same as the inital max inactivity delay specified on the wireformat
+ // Does not make sense for us to take longer.
+ try {
+ if( wireFormat.getPreferedWireFormatInfo() !=null ) {
+ setNegotiateTimeout(wireFormat.getPreferedWireFormatInfo().getMaxInactivityDurationInitalDelay());
+ }
+ } catch (IOException e) {
+ }
+ }
+
+ public void start() throws Exception {
+ super.start();
+ if (firstStart.compareAndSet(true, false)) {
+ sendWireFormat();
+ }
+ }
+
+ public void sendWireFormat() throws IOException {
+ try {
+ WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending: " + info);
+ }
+ sendWireFormat(info);
+ } finally {
+ wireInfoSentDownLatch.countDown();
+ }
+ }
+
+ public void stop() throws Exception {
+ super.stop();
+ readyCountDownLatch.countDown();
+ }
+
+ public void oneway(Object command) throws IOException {
+ try {
+ if (!readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
+ throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
+ }
+ super.oneway(command);
+ }
+
+ public void onCommand(Object o) {
+ Command command = (Command)o;
+ if (command.isWireFormatInfo()) {
+ WireFormatInfo info = (WireFormatInfo)command;
+ negociate(info);
+ }
+ getTransportListener().onCommand(command);
+ }
+
+ public void negociate(WireFormatInfo info) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received WireFormat: " + info);
+ }
+
+ try {
+ wireInfoSentDownLatch.await();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + " before negotiation: " + wireFormat);
+ }
+ if (!info.isValid()) {
+ onException(new IOException("Remote wire format magic is invalid"));
+ } else if (info.getVersion() < minimumVersion) {
+ onException(new IOException("Remote wire format (" + info.getVersion() + ") is lower the minimum version required (" + minimumVersion + ")"));
+ }
+
+ wireFormat.renegotiateWireFormat(info);
+ Socket socket = next.narrow(Socket.class);
+ if (socket != null) {
+ socket.setTcpNoDelay(wireFormat.isTcpNoDelayEnabled());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + " after negotiation: " + wireFormat);
+ }
+
+ } catch (IOException e) {
+ onException(e);
+ } catch (InterruptedException e) {
+ onException((IOException)new InterruptedIOException().initCause(e));
+ } catch (Exception e) {
+ onException(IOExceptionSupport.create(e));
+ }
+ readyCountDownLatch.countDown();
+ onWireFormatNegotiated(info);
+ }
+
+ public void onException(IOException error) {
+ readyCountDownLatch.countDown();
+ /*
+ * try { super.oneway(new ExceptionResponse(error)); } catch
+ * (IOException e) { // ignore as we are already throwing an exception }
+ */
+ super.onException(error);
+ }
+
+ public String toString() {
+ return next.toString();
+ }
+
+ protected void sendWireFormat(WireFormatInfo info) throws IOException {
+ next.oneway(info);
+ }
+
+ protected void onWireFormatNegotiated(WireFormatInfo info) {
+ }
+
+ public long getNegotiateTimeout() {
+ return negotiateTimeout;
+ }
+
+ public void setNegotiateTimeout(long negotiateTimeout) {
+ this.negotiateTimeout = negotiateTimeout;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+
+
+/**
+ Identify if a limit has been reached
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision: 1.3 $
+ */
+public class DefaultUsageCapacity implements UsageCapacity{
+
+ private long limit;
+
+ /**
+ * @param size
+ * @return true if the limit is reached
+ * @see org.apache.activemq.usage.UsageCapacity#isLimit(long)
+ */
+ public boolean isLimit(long size) {
+ return size >= limit;
+ }
+
+
+ /**
+ * @return the limit
+ */
+ public final long getLimit(){
+ return this.limit;
+ }
+
+
+ /**
+ * @param limit the limit to set
+ */
+ public final void setLimit(long limit){
+ this.limit=limit;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/MemoryUsage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/MemoryUsage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/MemoryUsage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/MemoryUsage.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+/**
+ * Used to keep track of how much of something is being used so that a
+ * productive working set usage can be controlled. Main use case is manage
+ * memory usage.
+ *
+ * @org.apache.xbean.XBean
+ * @version $Revision: 1.3 $
+ */
+public class MemoryUsage extends Usage<MemoryUsage> {
+
+ private long usage;
+
+ public MemoryUsage() {
+ this(null, null);
+ }
+
+ /**
+ * Create the memory manager linked to a parent. When the memory manager is
+ * linked to a parent then when usage increased or decreased, the parent's
+ * usage is also increased or decreased.
+ *
+ * @param parent
+ */
+ public MemoryUsage(MemoryUsage parent) {
+ this(parent, "default");
+ }
+
+ public MemoryUsage(String name) {
+ this(null, name);
+ }
+
+ public MemoryUsage(MemoryUsage parent, String name) {
+ this(parent, name, 1.0f);
+ }
+
+ public MemoryUsage(MemoryUsage parent, String name, float portion) {
+ super(parent, name, portion);
+ }
+
+ /**
+ * @throws InterruptedException
+ */
+ public void waitForSpace() throws InterruptedException {
+ if (parent != null) {
+ parent.waitForSpace();
+ }
+ synchronized (usageMutex) {
+ for (int i = 0; percentUsage >= 100; i++) {
+ usageMutex.wait();
+ }
+ }
+ }
+
+ /**
+ * @param timeout
+ * @throws InterruptedException
+ * @return true if space
+ */
+ public boolean waitForSpace(long timeout) throws InterruptedException {
+ if (parent != null) {
+ if (!parent.waitForSpace(timeout)) {
+ return false;
+ }
+ }
+ synchronized (usageMutex) {
+ if (percentUsage >= 100) {
+ usageMutex.wait(timeout);
+ }
+ return percentUsage < 100;
+ }
+ }
+
+ public boolean isFull() {
+ if (parent != null && parent.isFull()) {
+ return true;
+ }
+ synchronized (usageMutex) {
+ return percentUsage >= 100;
+ }
+ }
+
+ /**
+ * Tries to increase the usage by value amount but blocks if this object is
+ * currently full.
+ *
+ * @param value
+ * @throws InterruptedException
+ */
+ public void enqueueUsage(long value) throws InterruptedException {
+ waitForSpace();
+ increaseUsage(value);
+ }
+
+ /**
+ * Increases the usage by the value amount.
+ *
+ * @param value
+ */
+ public void increaseUsage(long value) {
+ if (value == 0) {
+ return;
+ }
+ int percentUsage;
+ synchronized (usageMutex) {
+ usage += value;
+ percentUsage = caclPercentUsage();
+ }
+ setPercentUsage(percentUsage);
+ if (parent != null) {
+ ((MemoryUsage)parent).increaseUsage(value);
+ }
+ }
+
+ /**
+ * Decreases the usage by the value amount.
+ *
+ * @param value
+ */
+ public void decreaseUsage(long value) {
+ if (value == 0) {
+ return;
+ }
+ int percentUsage;
+ synchronized (usageMutex) {
+ usage -= value;
+ percentUsage = caclPercentUsage();
+ }
+ setPercentUsage(percentUsage);
+ if (parent != null) {
+ parent.decreaseUsage(value);
+ }
+ }
+
+ protected long retrieveUsage() {
+ return usage;
+ }
+
+ public long getUsage() {
+ return usage;
+ }
+
+ public void setUsage(long usage) {
+ this.usage = usage;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/MemoryUsage.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/Usage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/Usage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/Usage.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Used to keep track of how much of something is being used so that a
+ * productive working set usage can be controlled. Main use case is manage
+ * memory usage.
+ *
+ * @org.apache.xbean.XBean
+ * @version $Revision: 1.3 $
+ */
+public abstract class Usage<T extends Usage> implements Service {
+
+ private static final Log LOG = LogFactory.getLog(Usage.class);
+ protected final Object usageMutex = new Object();
+ protected int percentUsage;
+ protected T parent;
+ private UsageCapacity limiter = new DefaultUsageCapacity();
+ private int percentUsageMinDelta = 1;
+ private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>();
+ private final boolean debug = LOG.isDebugEnabled();
+ private String name;
+ private float usagePortion = 1.0f;
+ private List<T> children = new CopyOnWriteArrayList<T>();
+ private final List<Runnable> callbacks = new LinkedList<Runnable>();
+ private int pollingTime = 100;
+ private volatile ThreadPoolExecutor executor;
+ private AtomicBoolean started=new AtomicBoolean();
+
+ public Usage(T parent, String name, float portion) {
+ this.parent = parent;
+ this.usagePortion = portion;
+ if (parent != null) {
+ this.limiter.setLimit((long)(parent.getLimit() * portion));
+ name = parent.name + ":" + name;
+ }
+ this.name = name;
+ }
+
+ protected abstract long retrieveUsage();
+
+ /**
+ * @throws InterruptedException
+ */
+ public void waitForSpace() throws InterruptedException {
+ waitForSpace(0);
+ }
+
+ /**
+ * @param timeout
+ * @throws InterruptedException
+ * @return true if space
+ */
+ public boolean waitForSpace(long timeout) throws InterruptedException {
+ if (parent != null) {
+ if (!parent.waitForSpace(timeout)) {
+ return false;
+ }
+ }
+ synchronized (usageMutex) {
+ percentUsage=caclPercentUsage();
+ if (percentUsage >= 100) {
+ long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
+ long timeleft = deadline;
+ while (timeleft > 0) {
+ percentUsage=caclPercentUsage();
+ if (percentUsage >= 100) {
+ usageMutex.wait(pollingTime);
+ timeleft = deadline - System.currentTimeMillis();
+ } else {
+ break;
+ }
+ }
+ }
+ return percentUsage < 100;
+ }
+ }
+
+ public boolean isFull() {
+ if (parent != null && parent.isFull()) {
+ return true;
+ }
+ synchronized (usageMutex) {
+ percentUsage=caclPercentUsage();
+ return percentUsage >= 100;
+ }
+ }
+
+ public void addUsageListener(UsageListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeUsageListener(UsageListener listener) {
+ listeners.remove(listener);
+ }
+
+ public long getLimit() {
+ synchronized (usageMutex) {
+ return limiter.getLimit();
+ }
+ }
+
+ /**
+ * Sets the memory limit in bytes. Setting the limit in bytes will set the
+ * usagePortion to 0 since the UsageManager is not going to be portion based
+ * off the parent. When set using XBean, you can use values such as: "20
+ * mb", "1024 kb", or "1 gb"
+ *
+ * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
+ */
+ public void setLimit(long limit) {
+ if (percentUsageMinDelta < 0) {
+ throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
+ }
+ synchronized (usageMutex) {
+ this.limiter.setLimit(limit);
+ this.usagePortion = 0;
+ }
+ onLimitChange();
+ }
+
+ protected void onLimitChange() {
+ // We may need to calculate the limit
+ if (usagePortion > 0 && parent != null) {
+ synchronized (usageMutex) {
+ this.limiter.setLimit((long)(parent.getLimit() * usagePortion));
+ }
+ }
+ // Reset the percent currently being used.
+ int percentUsage;
+ synchronized (usageMutex) {
+ percentUsage = caclPercentUsage();
+ }
+ setPercentUsage(percentUsage);
+ // Let the children know that the limit has changed. They may need to
+ // set
+ // their limits based on ours.
+ for (T child : children) {
+ child.onLimitChange();
+ }
+ }
+
+ public float getUsagePortion() {
+ synchronized (usageMutex) {
+ return usagePortion;
+ }
+ }
+
+ public void setUsagePortion(float usagePortion) {
+ synchronized (usageMutex) {
+ this.usagePortion = usagePortion;
+ }
+ onLimitChange();
+ }
+
+ public int getPercentUsage() {
+ synchronized (usageMutex) {
+ return percentUsage;
+ }
+ }
+
+ public int getPercentUsageMinDelta() {
+ synchronized (usageMutex) {
+ return percentUsageMinDelta;
+ }
+ }
+
+ /**
+ * Sets the minimum number of percentage points the usage has to change
+ * before a UsageListener event is fired by the manager.
+ *
+ * @param percentUsageMinDelta
+ */
+ public void setPercentUsageMinDelta(int percentUsageMinDelta) {
+ if (percentUsageMinDelta < 1) {
+ throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
+ }
+ int percentUsage;
+ synchronized (usageMutex) {
+ this.percentUsageMinDelta = percentUsageMinDelta;
+ percentUsage = caclPercentUsage();
+ }
+ setPercentUsage(percentUsage);
+ }
+
+ public long getUsage() {
+ synchronized (usageMutex) {
+ return retrieveUsage();
+ }
+ }
+
+ protected void setPercentUsage(int value) {
+ synchronized (usageMutex) {
+ int oldValue = percentUsage;
+ percentUsage = value;
+ if (oldValue != value) {
+ fireEvent(oldValue, value);
+ }
+ }
+ }
+
+ protected int caclPercentUsage() {
+ if (limiter.getLimit() == 0) {
+ return 0;
+ }
+ return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
+ }
+
+ private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
+ if (debug) {
+ LOG.debug("Memory usage change. from: " + oldPercentUsage + ", to: " + newPercentUsage);
+ }
+
+ if (started.get()) {
+ // Switching from being full to not being full..
+ if (oldPercentUsage >= 100 && newPercentUsage < 100) {
+ synchronized (usageMutex) {
+ usageMutex.notifyAll();
+ for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
+ Runnable callback = iter.next();
+ getExecutor().execute(callback);
+ }
+ callbacks.clear();
+ }
+ }
+ // Let the listeners know on a separate thread
+ Runnable listenerNotifier = new Runnable() {
+
+ public void run() {
+ for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
+ UsageListener l = iter.next();
+ l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
+ }
+ }
+
+ };
+ if (started.get()) {
+ getExecutor().execute(listenerNotifier);
+ } else {
+ LOG.warn("not notifying usage change to listeners on shutdown");
+ }
+ }
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String toString() {
+ return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limiter.getLimit() + " percentUsageMinDelta=" + percentUsageMinDelta + "%";
+ }
+
+ @SuppressWarnings("unchecked")
+ public void start() {
+ if (started.compareAndSet(false, true)){
+ if (parent != null) {
+ parent.addChild(this);
+ }
+ for (T t:children) {
+ t.start();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void stop() {
+ if (started.compareAndSet(true, false)){
+ if (parent != null) {
+ parent.removeChild(this);
+ }
+ if (this.executor != null){
+ this.executor.shutdownNow();
+ }
+ //clear down any callbacks
+ synchronized (usageMutex) {
+ usageMutex.notifyAll();
+ for (Iterator<Runnable> iter = new ArrayList<Runnable>(this.callbacks).iterator(); iter.hasNext();) {
+ Runnable callback = iter.next();
+ callback.run();
+ }
+ this.callbacks.clear();
+ }
+ for (T t:children) {
+ t.stop();
+ }
+ }
+ }
+
+ private void addChild(T child) {
+ children.add(child);
+ if (started.get()) {
+ child.start();
+ }
+ }
+
+ private void removeChild(T child) {
+ children.remove(child);
+ }
+
+ /**
+ * @param callback
+ * @return true if the UsageManager was full. The callback will only be
+ * called if this method returns true.
+ */
+ public boolean notifyCallbackWhenNotFull(final Runnable callback) {
+ if (parent != null) {
+ Runnable r = new Runnable() {
+
+ public void run() {
+ synchronized (usageMutex) {
+ if (percentUsage >= 100) {
+ callbacks.add(callback);
+ } else {
+ callback.run();
+ }
+ }
+ }
+ };
+ if (parent.notifyCallbackWhenNotFull(r)) {
+ return true;
+ }
+ }
+ synchronized (usageMutex) {
+ if (percentUsage >= 100) {
+ callbacks.add(callback);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * @return the limiter
+ */
+ public UsageCapacity getLimiter() {
+ return this.limiter;
+ }
+
+ /**
+ * @param limiter the limiter to set
+ */
+ public void setLimiter(UsageCapacity limiter) {
+ this.limiter = limiter;
+ }
+
+ /**
+ * @return the pollingTime
+ */
+ public int getPollingTime() {
+ return this.pollingTime;
+ }
+
+ /**
+ * @param pollingTime the pollingTime to set
+ */
+ public void setPollingTime(int pollingTime) {
+ this.pollingTime = pollingTime;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public T getParent() {
+ return parent;
+ }
+
+ public void setParent(T parent) {
+ this.parent = parent;
+ }
+
+ protected Executor getExecutor() {
+ if (this.executor == null) {
+ synchronized(usageMutex) {
+ if (this.executor == null) {
+ this.executor = new ThreadPoolExecutor(1, 1, 0,
+ TimeUnit.NANOSECONDS,
+ new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, getName()
+ + " Usage Thread Pool");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ }
+ }
+ }
+ return this.executor;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/Usage.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageCapacity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageCapacity.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageCapacity.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageCapacity.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+
+
+/**
+ Identify if a limit has been reached
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision: 1.3 $
+ */
+public interface UsageCapacity{
+
+ /**
+ * Has the limit been reached ?
+ *
+ * @param size
+ * @return true if it has
+ */
+ boolean isLimit(long size);
+
+
+ /**
+ * @return the limit
+ */
+ long getLimit();
+
+ /**
+ * @param limit the limit to set
+ */
+ void setLimit(long limit);
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageCapacity.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageListener.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageListener.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+public interface UsageListener {
+ void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage);
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/usage/UsageListener.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/util/MarshallingSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/util/MarshallingSupport.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/util/MarshallingSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/util/MarshallingSupport.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * The fixed version of the UTF8 encoding function. Some older JVM's UTF8
+ * encoding function breaks when handling large strings.
+ *
+ * @version $Revision$
+ */
+public final class MarshallingSupport {
+
+ public static final byte NULL = 0;
+ public static final byte BOOLEAN_TYPE = 1;
+ public static final byte BYTE_TYPE = 2;
+ public static final byte CHAR_TYPE = 3;
+ public static final byte SHORT_TYPE = 4;
+ public static final byte INTEGER_TYPE = 5;
+ public static final byte LONG_TYPE = 6;
+ public static final byte DOUBLE_TYPE = 7;
+ public static final byte FLOAT_TYPE = 8;
+ public static final byte STRING_TYPE = 9;
+ public static final byte BYTE_ARRAY_TYPE = 10;
+ public static final byte MAP_TYPE = 11;
+ public static final byte LIST_TYPE = 12;
+ public static final byte BIG_STRING_TYPE = 13;
+
+ private MarshallingSupport() {
+ }
+
+ public static void marshalPrimitiveMap(Map map, DataOutputStream out) throws IOException {
+ if (map == null) {
+ out.writeInt(-1);
+ } else {
+ out.writeInt(map.size());
+ for (Iterator iter = map.keySet().iterator(); iter.hasNext();) {
+ String name = (String)iter.next();
+ out.writeUTF(name);
+ Object value = map.get(name);
+ marshalPrimitive(out, value);
+ }
+ }
+ }
+
+ public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in) throws IOException {
+ return unmarshalPrimitiveMap(in, Integer.MAX_VALUE);
+ }
+
+ /**
+ * @param in
+ * @return
+ * @throws IOException
+ * @throws IOException
+ */
+ public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in, int maxPropertySize) throws IOException {
+ int size = in.readInt();
+ if (size > maxPropertySize) {
+ throw new IOException("Primitive map is larger than the allowed size: " + size);
+ }
+ if (size < 0) {
+ return null;
+ } else {
+ Map<String, Object> rc = new HashMap<String, Object>(size);
+ for (int i = 0; i < size; i++) {
+ String name = in.readUTF();
+ rc.put(name, unmarshalPrimitive(in));
+ }
+ return rc;
+ }
+
+ }
+
+ public static void marshalPrimitiveList(List list, DataOutputStream out) throws IOException {
+ out.writeInt(list.size());
+ for (Iterator iter = list.iterator(); iter.hasNext();) {
+ Object element = (Object)iter.next();
+ marshalPrimitive(out, element);
+ }
+ }
+
+ public static List<Object> unmarshalPrimitiveList(DataInputStream in) throws IOException {
+ int size = in.readInt();
+ List<Object> answer = new ArrayList<Object>(size);
+ while (size-- > 0) {
+ answer.add(unmarshalPrimitive(in));
+ }
+ return answer;
+ }
+
+ public static void marshalPrimitive(DataOutputStream out, Object value) throws IOException {
+ if (value == null) {
+ marshalNull(out);
+ } else if (value.getClass() == Boolean.class) {
+ marshalBoolean(out, ((Boolean)value).booleanValue());
+ } else if (value.getClass() == Byte.class) {
+ marshalByte(out, ((Byte)value).byteValue());
+ } else if (value.getClass() == Character.class) {
+ marshalChar(out, ((Character)value).charValue());
+ } else if (value.getClass() == Short.class) {
+ marshalShort(out, ((Short)value).shortValue());
+ } else if (value.getClass() == Integer.class) {
+ marshalInt(out, ((Integer)value).intValue());
+ } else if (value.getClass() == Long.class) {
+ marshalLong(out, ((Long)value).longValue());
+ } else if (value.getClass() == Float.class) {
+ marshalFloat(out, ((Float)value).floatValue());
+ } else if (value.getClass() == Double.class) {
+ marshalDouble(out, ((Double)value).doubleValue());
+ } else if (value.getClass() == byte[].class) {
+ marshalByteArray(out, (byte[])value);
+ } else if (value.getClass() == String.class) {
+ marshalString(out, (String)value);
+ } else if (value instanceof Map) {
+ out.writeByte(MAP_TYPE);
+ marshalPrimitiveMap((Map)value, out);
+ } else if (value instanceof List) {
+ out.writeByte(LIST_TYPE);
+ marshalPrimitiveList((List)value, out);
+ } else {
+ throw new IOException("Object is not a primitive: " + value);
+ }
+ }
+
+ public static Object unmarshalPrimitive(DataInputStream in) throws IOException {
+ Object value = null;
+ byte type = in.readByte();
+ switch (type) {
+ case BYTE_TYPE:
+ value = Byte.valueOf(in.readByte());
+ break;
+ case BOOLEAN_TYPE:
+ value = in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+ break;
+ case CHAR_TYPE:
+ value = Character.valueOf(in.readChar());
+ break;
+ case SHORT_TYPE:
+ value = Short.valueOf(in.readShort());
+ break;
+ case INTEGER_TYPE:
+ value = Integer.valueOf(in.readInt());
+ break;
+ case LONG_TYPE:
+ value = Long.valueOf(in.readLong());
+ break;
+ case FLOAT_TYPE:
+ value = new Float(in.readFloat());
+ break;
+ case DOUBLE_TYPE:
+ value = new Double(in.readDouble());
+ break;
+ case BYTE_ARRAY_TYPE:
+ value = new byte[in.readInt()];
+ in.readFully((byte[])value);
+ break;
+ case STRING_TYPE:
+ value = in.readUTF();
+ break;
+ case BIG_STRING_TYPE:
+ value = readUTF8(in);
+ break;
+ case MAP_TYPE:
+ value = unmarshalPrimitiveMap(in);
+ break;
+ case LIST_TYPE:
+ value = unmarshalPrimitiveList(in);
+ break;
+ case NULL:
+ value = null;
+ break;
+ default:
+ throw new IOException("Unknown primitive type: " + type);
+ }
+ return value;
+ }
+
+ public static void marshalNull(DataOutputStream out) throws IOException {
+ out.writeByte(NULL);
+ }
+
+ public static void marshalBoolean(DataOutputStream out, boolean value) throws IOException {
+ out.writeByte(BOOLEAN_TYPE);
+ out.writeBoolean(value);
+ }
+
+ public static void marshalByte(DataOutputStream out, byte value) throws IOException {
+ out.writeByte(BYTE_TYPE);
+ out.writeByte(value);
+ }
+
+ public static void marshalChar(DataOutputStream out, char value) throws IOException {
+ out.writeByte(CHAR_TYPE);
+ out.writeChar(value);
+ }
+
+ public static void marshalShort(DataOutputStream out, short value) throws IOException {
+ out.writeByte(SHORT_TYPE);
+ out.writeShort(value);
+ }
+
+ public static void marshalInt(DataOutputStream out, int value) throws IOException {
+ out.writeByte(INTEGER_TYPE);
+ out.writeInt(value);
+ }
+
+ public static void marshalLong(DataOutputStream out, long value) throws IOException {
+ out.writeByte(LONG_TYPE);
+ out.writeLong(value);
+ }
+
+ public static void marshalFloat(DataOutputStream out, float value) throws IOException {
+ out.writeByte(FLOAT_TYPE);
+ out.writeFloat(value);
+ }
+
+ public static void marshalDouble(DataOutputStream out, double value) throws IOException {
+ out.writeByte(DOUBLE_TYPE);
+ out.writeDouble(value);
+ }
+
+ public static void marshalByteArray(DataOutputStream out, byte[] value) throws IOException {
+ marshalByteArray(out, value, 0, value.length);
+ }
+
+ public static void marshalByteArray(DataOutputStream out, byte[] value, int offset, int length) throws IOException {
+ out.writeByte(BYTE_ARRAY_TYPE);
+ out.writeInt(length);
+ out.write(value, offset, length);
+ }
+
+ public static void marshalString(DataOutputStream out, String s) throws IOException {
+ // If it's too big, out.writeUTF may not able able to write it out.
+ if (s.length() < Short.MAX_VALUE / 4) {
+ out.writeByte(STRING_TYPE);
+ out.writeUTF(s);
+ } else {
+ out.writeByte(BIG_STRING_TYPE);
+ writeUTF8(out, s);
+ }
+ }
+
+ public static void writeUTF8(DataOutput dataOut, String text) throws IOException {
+ if (text != null) {
+ int strlen = text.length();
+ int utflen = 0;
+ char[] charr = new char[strlen];
+ int c = 0;
+ int count = 0;
+
+ text.getChars(0, strlen, charr, 0);
+
+ for (int i = 0; i < strlen; i++) {
+ c = charr[i];
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+ // TODO diff: Sun code - removed
+ byte[] bytearr = new byte[utflen + 4]; // TODO diff: Sun code
+ bytearr[count++] = (byte)((utflen >>> 24) & 0xFF); // TODO diff:
+ // Sun code
+ bytearr[count++] = (byte)((utflen >>> 16) & 0xFF); // TODO diff:
+ // Sun code
+ bytearr[count++] = (byte)((utflen >>> 8) & 0xFF);
+ bytearr[count++] = (byte)((utflen >>> 0) & 0xFF);
+ for (int i = 0; i < strlen; i++) {
+ c = charr[i];
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ bytearr[count++] = (byte)c;
+ } else if (c > 0x07FF) {
+ bytearr[count++] = (byte)(0xE0 | ((c >> 12) & 0x0F));
+ bytearr[count++] = (byte)(0x80 | ((c >> 6) & 0x3F));
+ bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+ } else {
+ bytearr[count++] = (byte)(0xC0 | ((c >> 6) & 0x1F));
+ bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+ }
+ }
+ dataOut.write(bytearr);
+
+ } else {
+ dataOut.writeInt(-1);
+ }
+ }
+
+ public static String readUTF8(DataInput dataIn) throws IOException {
+ int utflen = dataIn.readInt(); // TODO diff: Sun code
+ if (utflen > -1) {
+ StringBuffer str = new StringBuffer(utflen);
+ byte bytearr[] = new byte[utflen];
+ int c;
+ int char2;
+ int char3;
+ int count = 0;
+
+ dataIn.readFully(bytearr, 0, utflen);
+
+ while (count < utflen) {
+ c = bytearr[count] & 0xff;
+ switch (c >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ /* 0xxxxxxx */
+ count++;
+ str.append((char)c);
+ break;
+ case 12:
+ case 13:
+ /* 110x xxxx 10xx xxxx */
+ count += 2;
+ if (count > utflen) {
+ throw new UTFDataFormatException();
+ }
+ char2 = bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException();
+ }
+ str.append((char)(((c & 0x1F) << 6) | (char2 & 0x3F)));
+ break;
+ case 14:
+ /* 1110 xxxx 10xx xxxx 10xx xxxx */
+ count += 3;
+ if (count > utflen) {
+ throw new UTFDataFormatException();
+ }
+ char2 = bytearr[count - 2]; // TODO diff: Sun code
+ char3 = bytearr[count - 1]; // TODO diff: Sun code
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException();
+ }
+ str.append((char)(((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)));
+ break;
+ default:
+ /* 10xx xxxx, 1111 xxxx */
+ throw new UTFDataFormatException();
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(str);
+ } else {
+ return null;
+ }
+ }
+
+ public static String propertiesToString(Properties props) throws IOException {
+ String result = "";
+ if (props != null) {
+ DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream();
+ props.store(dataOut, "");
+ result = new String(dataOut.getData(), 0, dataOut.size());
+ dataOut.close();
+ }
+ return result;
+ }
+
+ public static Properties stringToProperties(String str) throws IOException {
+ Properties result = new Properties();
+ if (str != null && str.length() > 0) {
+ DataByteArrayInputStream dataIn = new DataByteArrayInputStream(str.getBytes());
+ result.load(dataIn);
+ dataIn.close();
+ }
+ return result;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/util/MarshallingSupport.java
------------------------------------------------------------------------------
svn:executable = *