You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/03/03 12:30:26 UTC
svn commit: r514131 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/ft/
main/java/org/apache/activemq/broker/regio...
Author: rajdavies
Date: Sat Mar 3 03:30:22 2007
New Revision: 514131
URL: http://svn.apache.org/viewvc?view=rev&rev=514131
Log:
performance tuning
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/UserIDBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Sat Mar 3 03:30:22 2007
@@ -1585,6 +1585,9 @@
message.setJMSMessageID(msg.getMessageId().toString());
}
msg.setTransactionId(txid);
+ if(connection.isCopyMessageOnSend()){
+ msg=(ActiveMQMessage)msg.copy();
+ }
msg.setConnection(connection);
msg.onSend();
msg.setProducerId(msg.getMessageId().getProducerId());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Sat Mar 3 03:30:22 2007
@@ -22,6 +22,7 @@
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
@@ -265,11 +266,13 @@
advisoryMessage.setDestination(topic);
advisoryMessage.setResponseRequired(false);
advisoryMessage.setProducerId(advisoryProducerId);
-
boolean originalFlowControl = context.isProducerFlowControl();
+ final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+ producerExchange.setConnectionContext(context);
+ producerExchange.setMutable(true);
try {
context.setProducerFlowControl(false);
- next.send(context, advisoryMessage);
+ next.send(producerExchange, advisoryMessage);
} finally {
context.setProducerFlowControl(originalFlowControl);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java Sat Mar 3 03:30:22 2007
@@ -43,11 +43,11 @@
super(next);
}
- public void acknowledge(ConnectionContext context,MessageAck ack) throws Exception{
- next.acknowledge(context,ack);
+ public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{
+ next.acknowledge(consumerExchange,ack);
Broker brokers[]=getListeners();
for(int i=0;i<brokers.length;i++){
- brokers[i].acknowledge(context,ack);
+ brokers[i].acknowledge(consumerExchange,ack);
}
}
@@ -134,11 +134,11 @@
}
}
- public void send(ConnectionContext context,Message messageSend) throws Exception{
- next.send(context,messageSend);
+ public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws Exception{
+ next.send(producerExchange,messageSend);
Broker brokers[]=getListeners();
for(int i=0;i<brokers.length;i++){
- brokers[i].send(context,messageSend);
+ brokers[i].send(producerExchange,messageSend);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Sat Mar 3 03:30:22 2007
@@ -70,8 +70,8 @@
return next.getDestinations(destination);
}
- public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
- next.acknowledge(context, ack);
+ public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
+ next.acknowledge(consumerExchange, ack);
}
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
@@ -122,8 +122,8 @@
next.rollbackTransaction(context, xid);
}
- public void send(ConnectionContext context, Message messageSend) throws Exception {
- next.send(context, messageSend);
+ public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
+ next.send(producerExchange, messageSend);
}
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java Sat Mar 3 03:30:22 2007
@@ -81,7 +81,7 @@
/**
*
*/
- public void send(ConnectionContext context, Message message) throws Exception {
+ public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
ActiveMQDestination destination = message.getDestination();
if( destination.isComposite() ) {
ActiveMQDestination[] destinations = destination.getCompositeDestinations();
@@ -92,10 +92,10 @@
message.setOriginalDestination(destination);
message.setDestination(destinations[i]);
message.evictMarshlledForm();
- next.send(context, message);
+ next.send(producerExchange, message);
}
} else {
- next.send(context, message);
+ next.send(producerExchange, message);
}
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java?view=auto&rev=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java Sat Mar 3 03:30:22 2007
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Region;
+import org.apache.activemq.broker.region.Subscription;
+
+/**
+ * Holds internal state in the broker for a essageConsumer
+ *
+ * @version $Revision: 1.8 $
+ */
+public class ConsumerBrokerExchange{
+
+ private ConnectionContext connectionContext;
+ private Destination regionDestination;
+ private Region region;
+ private Subscription subscription;
+
+ /**
+ * @return the connectionContext
+ */
+ public ConnectionContext getConnectionContext(){
+ return this.connectionContext;
+ }
+
+ /**
+ * @param connectionContext the connectionContext to set
+ */
+ public void setConnectionContext(ConnectionContext connectionContext){
+ this.connectionContext=connectionContext;
+ }
+
+ /**
+ * @return the region
+ */
+ public Region getRegion(){
+ return this.region;
+ }
+
+ /**
+ * @param region the region to set
+ */
+ public void setRegion(Region region){
+ this.region=region;
+ }
+
+ /**
+ * @return the regionDestination
+ */
+ public Destination getRegionDestination(){
+ return this.regionDestination;
+ }
+
+ /**
+ * @param regionDestination the regionDestination to set
+ */
+ public void setRegionDestination(Destination regionDestination){
+ this.regionDestination=regionDestination;
+ }
+
+ /**
+ * @return the subscription
+ */
+ public Subscription getSubscription(){
+ return this.subscription;
+ }
+
+ /**
+ * @param subscription the subscription to set
+ */
+ public void setSubscription(Subscription subscription){
+ this.subscription=subscription;
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Sat Mar 3 03:30:22 2007
@@ -153,11 +153,11 @@
}
- public void send(ConnectionContext context, Message message) throws Exception {
+ public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
}
- public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+ public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Sat Mar 3 03:30:22 2007
@@ -155,11 +155,11 @@
throw new BrokerStoppedException(this.message);
}
- public void send(ConnectionContext context, Message message) throws Exception {
+ public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
throw new BrokerStoppedException(this.message);
}
- public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+ public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
throw new BrokerStoppedException(this.message);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Sat Mar 3 03:30:22 2007
@@ -84,8 +84,8 @@
return getNext().getDestinations(destination);
}
- public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
- getNext().acknowledge(context, ack);
+ public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
+ getNext().acknowledge(consumerExchange, ack);
}
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
@@ -132,8 +132,8 @@
getNext().rollbackTransaction(context, xid);
}
- public void send(ConnectionContext context, Message messageSend) throws Exception {
- getNext().send(context, messageSend);
+ public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
+ getNext().send(producerExchange, messageSend);
}
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?view=auto&rev=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java Sat Mar 3 03:30:22 2007
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Region;
+import org.apache.activemq.state.ProducerState;
+
+/**
+ * Holds internal state in the broker for a MessageProducer
+ *
+ * @version $Revision: 1.8 $
+ */
+public class ProducerBrokerExchange{
+
+ private ConnectionContext connectionContext;
+ private Destination regionDestination;
+ private Region region;
+ private ProducerState producerState;
+ private boolean mutable=true;
+
+ /**
+ * @return the connectionContext
+ */
+ public ConnectionContext getConnectionContext(){
+ return this.connectionContext;
+ }
+
+ /**
+ * @param connectionContext the connectionContext to set
+ */
+ public void setConnectionContext(ConnectionContext connectionContext){
+ this.connectionContext=connectionContext;
+ }
+
+ /**
+ * @return the mutable
+ */
+ public boolean isMutable(){
+ return this.mutable;
+ }
+
+ /**
+ * @param mutable the mutable to set
+ */
+ public void setMutable(boolean mutable){
+ this.mutable=mutable;
+ }
+
+ /**
+ * @return the regionDestination
+ */
+ public Destination getRegionDestination(){
+ return this.regionDestination;
+ }
+
+ /**
+ * @param regionDestination the regionDestination to set
+ */
+ public void setRegionDestination(Destination regionDestination){
+ this.regionDestination=regionDestination;
+ }
+
+ /**
+ * @return the region
+ */
+ public Region getRegion(){
+ return this.region;
+ }
+
+ /**
+ * @param region the region to set
+ */
+ public void setRegion(Region region){
+ this.region=region;
+ }
+
+ /**
+ * @return the producerState
+ */
+ public ProducerState getProducerState(){
+ return this.producerState;
+ }
+
+ /**
+ * @param producerState the producerState to set
+ */
+ public void setProducerState(ProducerState producerState){
+ this.producerState=producerState;
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java Sat Mar 3 03:30:22 2007
@@ -78,16 +78,20 @@
context.setInRecoveryMode(true);
context.setTransactions(new ConcurrentHashMap());
context.setProducerFlowControl(false);
-
+ final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+ producerExchange.setMutable(true);
+ producerExchange.setConnectionContext(context);
+ final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
+ consumerExchange.setConnectionContext(context);
transactionStore.recover(new TransactionRecoveryListener() {
public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
try {
beginTransaction(context, xid);
for (int i = 0; i < addedMessages.length; i++) {
- send(context, addedMessages[i]);
+ send(producerExchange, addedMessages[i]);
}
for (int i = 0; i < aks.length; i++) {
- acknowledge(context, aks[i]);
+ acknowledge(consumerExchange, aks[i]);
}
prepareTransaction(context, xid);
} catch (Throwable e) {
@@ -168,9 +172,10 @@
transaction.rollback();
}
- public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+ public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
// This method may be invoked recursively.
// Track original tx so that it can be restored.
+ final ConnectionContext context = consumerExchange.getConnectionContext();
Transaction originalTx = context.getTransaction();
Transaction transaction=null;
if( ack.isInTransaction() ) {
@@ -178,15 +183,16 @@
}
context.setTransaction(transaction);
try {
- next.acknowledge(context, ack);
+ next.acknowledge(consumerExchange, ack);
} finally {
context.setTransaction(originalTx);
}
}
- public void send(ConnectionContext context, Message message) throws Exception {
+ public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
// This method may be invoked recursively.
// Track original tx so that it can be restored.
+ final ConnectionContext context = producerExchange.getConnectionContext();
Transaction originalTx = context.getTransaction();
Transaction transaction=null;
if( message.getTransactionId()!=null ) {
@@ -194,7 +200,7 @@
}
context.setTransaction(transaction);
try {
- next.send(context, message);
+ next.send(producerExchange, message);
} finally {
context.setTransaction(originalTx);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Sat Mar 3 03:30:22 2007
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -110,9 +111,11 @@
private boolean pendingStop;
private long timeStamp=0;
private AtomicBoolean stopped=new AtomicBoolean(false);
- protected final AtomicBoolean disposed=new AtomicBoolean(false);
+ private final AtomicBoolean disposed=new AtomicBoolean(false);
private CountDownLatch stopLatch=new CountDownLatch(1);
- protected final AtomicBoolean asyncException=new AtomicBoolean(false);
+ private final AtomicBoolean asyncException=new AtomicBoolean(false);
+ private final Map<ProducerId,ProducerBrokerExchange>producerExchanges = new HashMap<ProducerId,ProducerBrokerExchange>();
+ private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges = new HashMap<ConsumerId,ConsumerBrokerExchange>();
static class ConnectionState extends org.apache.activemq.state.ConnectionState{
@@ -427,33 +430,24 @@
public Response processMessage(Message messageSend) throws Exception{
ProducerId producerId=messageSend.getProducerId();
- ConnectionState state=lookupConnectionState(producerId);
- ConnectionContext context=state.getContext();
- // If the message originates from this client connection,
- // then, finde the associated producer state so we can do some dup detection.
- ProducerState producerState=null;
- if(messageSend.getMessageId().getProducerId().equals(messageSend.getProducerId())){
- SessionState ss=state.getSessionState(producerId.getParentId());
- if(ss==null)
- throw new IllegalStateException("Cannot send from a session that had not been registered: "
- +producerId.getParentId());
- producerState=ss.getProducerState(producerId);
- }
- if(producerState==null){
- broker.send(context,messageSend);
- }else{
- // Avoid Dups.
+ ProducerBrokerExchange producerExchange=getProducerBrokerExchange(producerId);
+ ProducerState producerState=producerExchange.getProducerState();
+ if(producerState!=null){
long seq=messageSend.getMessageId().getProducerSequenceId();
if(seq>producerState.getLastSequenceId()){
producerState.setLastSequenceId(seq);
- broker.send(context,messageSend);
+ broker.send(producerExchange,messageSend);
}
+ }else{
+ // producer not local to this broker
+ broker.send(producerExchange,messageSend);
}
return null;
}
public Response processMessageAck(MessageAck ack) throws Exception{
- broker.acknowledge(lookupConnectionState(ack.getConsumerId()).getContext(),ack);
+ ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
+ broker.acknowledge(consumerExchange,ack);
return null;
}
@@ -515,6 +509,7 @@
ProducerState ps=ss.removeProducer(id);
if(ps==null)
throw new IllegalStateException("Cannot remove a producer that had not been registered: "+id);
+ removeProducerBrokerExchange(id);
broker.removeProducer(cs.getContext(),ps.getInfo());
return null;
}
@@ -551,6 +546,7 @@
if(consumerState==null)
throw new IllegalStateException("Cannot remove a consumer that had not been registered: "+id);
broker.removeConsumer(cs.getContext(),consumerState.getInfo());
+ removeConsumerBrokerExchange(id);
return null;
}
@@ -980,5 +976,54 @@
public String getRemoteAddress(){
return transport.getRemoteAddress();
+ }
+
+ private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){
+ ProducerBrokerExchange result=producerExchanges.get(id);
+ if(result==null){
+ synchronized(producerExchanges){
+ result=new ProducerBrokerExchange();
+ ConnectionState state=lookupConnectionState(id);
+ ConnectionContext context=state.getContext();
+ result.setConnectionContext(context);
+ SessionState ss=state.getSessionState(id.getParentId());
+ if(ss!=null){
+ result.setProducerState(ss.getProducerState(id));
+ ProducerState producerState=ss.getProducerState(id);
+ if(producerState!=null&&producerState.getInfo()!=null){
+ ProducerInfo info=producerState.getInfo();
+ result.setMutable(info.getDestination()==null);
+ }
+ }
+ producerExchanges.put(id,result);
+ }
+ }
+ return result;
+ }
+
+ private void removeProducerBrokerExchange(ProducerId id) {
+ synchronized(producerExchanges) {
+ producerExchanges.remove(id);
+ }
+ }
+
+ private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
+ ConsumerBrokerExchange result = consumerExchanges.get(id);
+ if (result == null) {
+ synchronized(consumerExchanges) {
+ result = new ConsumerBrokerExchange();
+ ConnectionState state = lookupConnectionState(id);
+ ConnectionContext context = state.getContext();
+ result.setConnectionContext(context);
+ consumerExchanges.put(id,result);
+ }
+ }
+ return result;
+ }
+
+ private void removeConsumerBrokerExchange(ConsumerId id) {
+ synchronized(consumerExchanges) {
+ consumerExchanges.remove(id);
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/UserIDBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/UserIDBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/UserIDBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/UserIDBroker.java Sat Mar 3 03:30:22 2007
@@ -33,9 +33,10 @@
super(next);
}
- public void send(ConnectionContext context, Message messageSend) throws Exception {
+ public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
+ final ConnectionContext context = producerExchange.getConnectionContext();
String userID = context.getUserName();
messageSend.setUserID(userID);
- super.send(context, messageSend);
+ super.send(producerExchange, messageSend);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java Sat Mar 3 03:30:22 2007
@@ -17,8 +17,10 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.InsertableMutableBrokerFilter;
import org.apache.activemq.broker.MutableBrokerFilter;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
@@ -298,13 +300,13 @@
* @throws Exception
*
*/
- public void send(ConnectionContext context,Message message) throws Exception{
+ public void send(ProducerBrokerExchange producerExchange,Message message) throws Exception{
/**
* A message can be dispatched before the super.send() method returns so - here the order is switched to avoid
* problems on the slave with receiving acks for messages not received yey
*/
sendToSlave(message);
- super.send(context,message);
+ super.send(producerExchange,message);
}
/**
@@ -313,9 +315,9 @@
* @throws Exception
*
*/
- public void acknowledge(ConnectionContext context,MessageAck ack) throws Exception{
+ public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{
sendToSlave(ack);
- super.acknowledge(context,ack);
+ super.acknowledge(consumerExchange,ack);
}
public boolean isFaultTolerantConfiguration(){
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Sat Mar 3 03:30:22 2007
@@ -25,7 +25,9 @@
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.DestinationAlreadyExistsException;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@@ -300,17 +302,28 @@
throw new JMSException("Invalid operation.");
}
- public void send(ConnectionContext context, Message messageSend)
+ public void send(final ProducerBrokerExchange producerExchange, Message messageSend)
throws Exception {
- Destination dest = lookup(context, messageSend.getDestination());
- dest.send(context, messageSend);
+ final ConnectionContext context = producerExchange.getConnectionContext();
+
+ if (producerExchange.isMutable() || producerExchange.getRegionDestination()==null) {
+ final Destination regionDestination = lookup(context,messageSend.getDestination());
+ producerExchange.setRegionDestination(regionDestination);
+ }
+
+ producerExchange.getRegionDestination().send(context, messageSend);
}
- public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
- Subscription sub = (Subscription) subscriptions.get(ack.getConsumerId());
- if( sub==null )
- throw new IllegalArgumentException("The subscription does not exist: "+ack.getConsumerId());
- sub.acknowledge(context, ack);
+ public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{
+ Subscription sub=consumerExchange.getSubscription();
+ if(sub==null){
+ sub=(Subscription)subscriptions.get(ack.getConsumerId());
+ if(sub==null){
+ throw new IllegalArgumentException("The subscription does not exist: "+ack.getConsumerId());
+ }
+ consumerExchange.setSubscription(sub);
+ }
+ sub.acknowledge(consumerExchange.getConnectionContext(),ack);
}
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java Sat Mar 3 03:30:22 2007
@@ -19,6 +19,8 @@
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@@ -98,17 +100,18 @@
* Send a message to the broker to using the specified destination. The destination specified
* in the message does not need to match the destination the message is sent to. This is
* handy in case the message is being sent to a dead letter destination.
- * @param context the environment the operation is being executed under.
+ * @param producerExchange the environment the operation is being executed under.
+ * @param message
* @throws Exception TODO
*/
- public void send(ConnectionContext context, Message message) throws Exception;
+ public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception;
/**
* Used to acknowledge the receipt of a message by a client.
- * @param context the environment the operation is being executed under.
+ * @param consumerExchange the environment the operation is being executed under.
* @throws Exception TODO
*/
- public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception;
+ public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception;
/**
* Allows a consumer to pull a message from a queue
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Sat Mar 3 03:30:22 2007
@@ -31,7 +31,9 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.DestinationAlreadyExistsException;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
@@ -366,46 +368,56 @@
topicRegion.removeSubscription(context, info);
}
- public void send(ConnectionContext context, Message message) throws Exception {
+ public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
long si = sequenceGenerator.getNextSequenceId();
message.getMessageId().setBrokerSequenceId(si);
- ActiveMQDestination destination = message.getDestination();
- switch(destination.getDestinationType()) {
- case ActiveMQDestination.QUEUE_TYPE:
- queueRegion.send(context, message);
- break;
- case ActiveMQDestination.TOPIC_TYPE:
- topicRegion.send(context, message);
- break;
- case ActiveMQDestination.TEMP_QUEUE_TYPE:
- tempQueueRegion.send(context, message);
- break;
- case ActiveMQDestination.TEMP_TOPIC_TYPE:
- tempTopicRegion.send(context, message);
- break;
- default:
- throw createUnknownDestinationTypeException(destination);
+ if (producerExchange.isMutable() || producerExchange.getRegion()==null) {
+ ActiveMQDestination destination = message.getDestination();
+ Region region = null;
+ switch(destination.getDestinationType()) {
+ case ActiveMQDestination.QUEUE_TYPE:
+ region = queueRegion;
+ break;
+ case ActiveMQDestination.TOPIC_TYPE:
+ region = topicRegion;
+ break;
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ region = tempQueueRegion;
+ break;
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ region = tempTopicRegion;
+ break;
+ default:
+ throw createUnknownDestinationTypeException(destination);
+ }
+ producerExchange.setRegion(region);
}
+ producerExchange.getRegion().send(producerExchange,message);
}
- public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
- ActiveMQDestination destination = ack.getDestination();
- switch(destination.getDestinationType()) {
- case ActiveMQDestination.QUEUE_TYPE:
- queueRegion.acknowledge(context, ack);
- break;
- case ActiveMQDestination.TOPIC_TYPE:
- topicRegion.acknowledge(context, ack);
- break;
- case ActiveMQDestination.TEMP_QUEUE_TYPE:
- tempQueueRegion.acknowledge(context, ack);
- break;
- case ActiveMQDestination.TEMP_TOPIC_TYPE:
- tempTopicRegion.acknowledge(context, ack);
- break;
- default:
- throw createUnknownDestinationTypeException(destination);
+ public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{
+ if(consumerExchange.getRegion()==null){
+ ActiveMQDestination destination=ack.getDestination();
+ Region region=null;
+ switch(destination.getDestinationType()){
+ case ActiveMQDestination.QUEUE_TYPE:
+ region=queueRegion;
+ break;
+ case ActiveMQDestination.TOPIC_TYPE:
+ region=topicRegion;
+ break;
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ region=tempQueueRegion;
+ break;
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ region=tempTopicRegion;
+ break;
+ default:
+ throw createUnknownDestinationTypeException(destination);
+ }
+ consumerExchange.setRegion(region);
}
+ consumerExchange.getRegion().acknowledge(consumerExchange,ack);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Sat Mar 3 03:30:22 2007
@@ -24,6 +24,7 @@
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy;
@@ -242,7 +243,6 @@
if( message.isExpired() ) {
return;
}
-
if (context.isProducerFlowControl()) {
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
@@ -426,7 +426,7 @@
// Implementation methods
// -------------------------------------------------------------------------
- protected void dispatch(ConnectionContext context, Message message) throws Exception {
+ protected void dispatch(final ConnectionContext context, Message message) throws Exception {
destinationStatistics.getEnqueues().increment();
dispatchValve.increment();
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
@@ -481,7 +481,10 @@
boolean originalFlowControl = context.isProducerFlowControl();
try {
context.setProducerFlowControl(false);
- context.getBroker().send(context, message);
+ ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+ producerExchange.setMutable(false);
+ producerExchange.setConnectionContext(context);
+ context.getBroker().send(producerExchange, message);
} finally {
context.setProducerFlowControl(originalFlowControl);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java Sat Mar 3 03:30:22 2007
@@ -19,6 +19,8 @@
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.commons.logging.Log;
@@ -37,18 +39,18 @@
private Log sendLog = LogFactory.getLog(LoggingBrokerPlugin.class.getName()+".Send");
private Log ackLog = LogFactory.getLog(LoggingBrokerPlugin.class.getName()+".Ack");
- public void send(ConnectionContext context, Message messageSend) throws Exception {
+ public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
if (sendLog.isInfoEnabled()) {
sendLog.info("Sending: " + messageSend);
}
- super.send(context, messageSend);
+ super.send(producerExchange, messageSend);
}
- public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+ public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
if (ackLog.isInfoEnabled()) {
ackLog.info("Acknowledge: " + ack);
}
- super.acknowledge(context, ack);
+ super.acknowledge(consumerExchange, ack);
}
// Properties
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java Sat Mar 3 03:30:22 2007
@@ -19,6 +19,7 @@
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
@@ -37,11 +38,11 @@
* @version $Revision$
*/
public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
- public void send(ConnectionContext context, Message message) throws Exception {
+ public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
if (message.getTimestamp() > 0 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) {
//timestamp not been disabled and has not passed through a network
message.setTimestamp(System.currentTimeMillis());
}
- super.send(context, message);
+ super.send(producerExchange, message);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java Sat Mar 3 03:30:22 2007
@@ -30,6 +30,8 @@
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
@@ -127,14 +129,14 @@
}
}
- public void send(ConnectionContext context, Message messageSend) throws Exception {
+ public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
trace(messageSend);
- super.send(context, messageSend);
+ super.send(producerExchange, messageSend);
}
- public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+ public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
trace(ack);
- super.acknowledge(context, ack);
+ super.acknowledge(consumerExchange, ack);
}
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java Sat Mar 3 03:30:22 2007
@@ -31,6 +31,7 @@
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
@@ -106,8 +107,8 @@
}
}
- public void send(ConnectionContext context, Message messageSend) throws Exception {
- super.send(context, messageSend);
+ public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
+ super.send(producerExchange, messageSend);
ProducerId producerId = messageSend.getProducerId();
ActiveMQDestination destination = messageSend.getDestination();
synchronized (lock) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java Sat Mar 3 03:30:22 2007
@@ -20,6 +20,7 @@
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
@@ -165,8 +166,8 @@
super.addProducer(context, info);
}
- public void send(ConnectionContext context, Message messageSend) throws Exception {
- SecurityContext subject = (SecurityContext) context.getSecurityContext();
+ public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
+ SecurityContext subject = (SecurityContext) producerExchange.getConnectionContext().getSecurityContext();
if( subject == null )
throw new SecurityException("User is not authenticated.");
@@ -185,7 +186,7 @@
subject.getAuthorizedWriteDests().put(messageSend.getDestination(), messageSend.getDestination());
}
- super.send(context, messageSend);
+ super.send(producerExchange, messageSend);
}
// SecurityAdminMBean interface
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Sat Mar 3 03:30:22 2007
@@ -23,7 +23,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.Command;
-import org.apache.activemq.command.Message;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -74,9 +73,6 @@
}
public void oneway(Object command) throws IOException{
- if (command instanceof Message) {
- command = ((Message)command).copy();
- }
if(disposed){
throw new TransportDisposedIOException("Transport disposed.");
}
@@ -94,9 +90,6 @@
}
protected void syncOneWay(Object command){
- if (command instanceof Message) {
- command = ((Message)command).copy();
- }
final TransportListener tl=peer.transportListener;
prePeerSetQueue=peer.prePeerSetQueue;
if(tl==null){
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java Sat Mar 3 03:30:22 2007
@@ -17,6 +17,7 @@
package org.apache.activemq.util;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -39,7 +40,10 @@
boolean originalFlowControl=context.isProducerFlowControl();
try{
context.setProducerFlowControl(false);
- context.getBroker().send(context,message);
+ ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+ producerExchange.setMutable(true);
+ producerExchange.setConnectionContext(context);
+ context.getBroker().send(producerExchange,message);
}finally{
context.setProducerFlowControl(originalFlowControl);
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java Sat Mar 3 03:30:22 2007
@@ -173,7 +173,7 @@
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
}
- public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+ public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
}
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
@@ -212,7 +212,7 @@
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
}
- public void send(ConnectionContext context, Message message) throws Exception {
+ public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
}
public void start() throws Exception {