You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/01/25 00:08:38 UTC
svn commit: r372050 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/ft/
main/java/org/apache/activemq/broker/region/
test/java/org/apache/activemq/broker/ft/
Author: rajdavies
Date: Tue Jan 24 15:08:29 2006
New Revision: 372050
URL: http://svn.apache.org/viewcvs?rev=372050&view=rev
Log:
Fixes for Master-Slave functionality
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
Removed:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?rev=372050&r1=372049&r2=372050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java Tue Jan 24 15:08:29 2006
@@ -23,6 +23,7 @@
import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@@ -30,6 +31,7 @@
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
@@ -131,6 +133,17 @@
super.removeProducer(context, info);
sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
}
+
+ public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Throwable {
+ super.addConsumer(context, info);
+ sendAsyncToSlave(info);
+ }
+
+
+ public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Throwable {
+ super.removeSubscription(context, info);
+ sendAsyncToSlave(info);
+ }
@@ -163,6 +176,7 @@
super.rollbackTransaction(context, xid);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
sendAsyncToSlave(info);
+
}
/**
@@ -174,7 +188,7 @@
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Throwable{
super.commitTransaction(context, xid,onePhase);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
- sendAsyncToSlave(info);
+ sendSyncToSlave(info);
}
/**
@@ -205,26 +219,40 @@
}
public void send(ConnectionContext context, Message message) throws Throwable{
+ /**
+ * A message can be dispatched before the super.send() method returns
+ * so - here the order is switched to avoid problems on the slave
+ * with receiving acks for messages not received yey
+ */
+ sendToSlave(message);
super.send(context,message);
- sendAsyncToSlave(message);
}
public void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable{
super.acknowledge(context, ack);
- sendAsyncToSlave(ack);
+ sendToSlave(ack);
}
protected void sendToSlave(Message message){
- /*
- if (message.isPersistent()){
+
+ if (message.isPersistent() && !message.isInTransaction()){
sendSyncToSlave(message);
}else{
sendAsyncToSlave(message);
}
- */
- sendAsyncToSlave(message);
+
+
+ }
+
+ protected void sendToSlave(MessageAck ack){
+
+ if (ack.isInTransaction()){
+ sendAsyncToSlave(ack);
+ }else{
+ sendSyncToSlave(ack);
+ }
}
protected void sendAsyncToSlave(Command command){
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java?rev=372050&r1=372049&r2=372050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java Tue Jan 24 15:08:29 2006
@@ -233,6 +233,6 @@
private void shutDown(){
masterActive.set(false);
broker.masterFailed();
- //ServiceSupport.dispose(this);
+ ServiceSupport.dispose(this);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=372050&r1=372049&r2=372050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Tue Jan 24 15:08:29 2006
@@ -57,6 +57,8 @@
private Message message;
/** The number of times the message has requested being hardened */
private int referenceCount;
+ /** the size of the message **/
+ private int cachedSize = 0;
/**
* Only used by the END_OF_BROWSE_MARKER singleton
@@ -69,6 +71,7 @@
this.groupID = null;
this.groupSequence = 0;
this.targetConsumerId=null;
+ this.cachedSize = message != null ? message.getSize() : 0;
}
public IndirectMessageReference(Destination destination, Message message) {
@@ -81,7 +84,8 @@
this.targetConsumerId=message.getTargetConsumerId();
this.referenceCount=1;
- message.incrementReferenceCount();
+ message.incrementReferenceCount();
+ this.cachedSize = message != null ? message.getSize() : 0;
}
synchronized public Message getMessageHardRef() {
@@ -201,5 +205,13 @@
public ConsumerId getTargetConsumerId() {
return targetConsumerId;
+ }
+
+ public int getSize(){
+ Message msg = message;
+ if (msg != null){
+ return msg.getSize();
+ }
+ return cachedSize;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java?rev=372050&r1=372049&r2=372050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java Tue Jan 24 15:08:29 2006
@@ -46,5 +46,6 @@
public int incrementReferenceCount();
public int decrementReferenceCount();
public ConsumerId getTargetConsumerId();
+ public int getSize();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=372050&r1=372049&r2=372050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Jan 24 15:08:29 2006
@@ -1,18 +1,15 @@
/**
- *
+ *
* Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region;
@@ -29,217 +26,186 @@
import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
-
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
-
/**
* A subscription that honors the pre-fetch option of the ConsumerInfo.
*
* @version $Revision: 1.15 $
*/
-abstract public class PrefetchSubscription extends AbstractSubscription {
- static private final Log log = LogFactory.getLog(PrefetchSubscription.class);
-
- final protected LinkedList matched = new LinkedList();
- final protected LinkedList dispatched = new LinkedList();
-
+abstract public class PrefetchSubscription extends AbstractSubscription{
+ static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
+ final protected LinkedList matched=new LinkedList();
+ final protected LinkedList dispatched=new LinkedList();
protected int delivered=0;
-
int preLoadLimit=1024*100;
int preLoadSize=0;
boolean dispatching=false;
-
- public PrefetchSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
- super(broker,context, info);
+
+ public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
+ throws InvalidSelectorException{
+ super(broker,context,info);
}
- synchronized public void add(MessageReference node) throws Throwable {
- if( !isFull() && !isSlaveBroker()) {
+ synchronized public void add(MessageReference node) throws Throwable{
+ if(!isFull()&&!isSlaveBroker()){
dispatch(node);
- } else {
+ }else{
synchronized(matched){
matched.addLast(node);
}
}
-
}
-
- public void processMessageDispatchNotification(MessageDispatchNotification mdn){
+
+ public void processMessageDispatchNotification(MessageDispatchNotification mdn){
synchronized(matched){
- for (Iterator i = matched.iterator(); i.hasNext();){
- MessageReference node = (MessageReference)i.next();
- if (node.getMessageId().equals(mdn.getMessageId())){
+ for(Iterator i=matched.iterator();i.hasNext();){
+ MessageReference node=(MessageReference) i.next();
+ if(node.getMessageId().equals(mdn.getMessageId())){
i.remove();
- try {
- MessageDispatch md = createMessageDispatch(node, node.getMessage());
- dispatched.addLast(node);
-
- incrementPreloadSize(node.getMessage().getSize());
- node.decrementReferenceCount();
+ try{
+ MessageDispatch md=createMessageDispatch(node,node.getMessage());
+ dispatched.addLast(node);
+ incrementPreloadSize(node.getSize());
+ node.decrementReferenceCount();
}catch(Exception e){
- log.error("Problem processing MessageDispatchNotification: " + mdn,e);
+ log.error("Problem processing MessageDispatchNotification: "+mdn,e);
}
break;
}
}
}
}
-
- synchronized public void acknowledge(final ConnectionContext context, final MessageAck ack) throws Throwable {
-
+
+ synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Throwable{
// Handle the standard acknowledgment case.
- boolean wasFull = isFull();
- if( ack.isStandardAck() ) {
-
+ boolean wasFull=isFull();
+ if(ack.isStandardAck()){
// Acknowledge all dispatched messages up till the message id of the acknowledgment.
int index=0;
boolean inAckRange=false;
- for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
- final MessageReference node = (MessageReference)iter.next();
- MessageId messageId = node.getMessageId();
-
- if( ack.getFirstMessageId()==null || ack.getFirstMessageId().equals(messageId)) {
- inAckRange = true;
+ for(Iterator iter=dispatched.iterator();iter.hasNext();){
+ final MessageReference node=(MessageReference) iter.next();
+ MessageId messageId=node.getMessageId();
+ if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
+ inAckRange=true;
}
-
- if( inAckRange ) {
-
+ if(inAckRange){
// Don't remove the nodes until we are committed.
- if ( !context.isInTransaction() ) {
+ if(!context.isInTransaction()){
iter.remove();
- } else {
+ }else{
// setup a Synchronization to remove nodes from the dispatched list.
context.getTransaction().addSynchronization(new Synchronization(){
- public void afterCommit() throws Throwable {
- synchronized(PrefetchSubscription.this) {
-
+ public void afterCommit() throws Throwable{
+ synchronized(PrefetchSubscription.this){
// Now that we are committed, we can remove the nodes.
boolean inAckRange=false;
int index=0;
- for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
- final MessageReference node = (MessageReference)iter.next();
- MessageId messageId = node.getMessageId();
- if( ack.getFirstMessageId()==null || ack.getFirstMessageId().equals(messageId)) {
- inAckRange = true;
+ for(Iterator iter=dispatched.iterator();iter.hasNext();){
+ final MessageReference node=(MessageReference) iter.next();
+ MessageId messageId=node.getMessageId();
+ if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
+ inAckRange=true;
}
- if( inAckRange ) {
+ if(inAckRange){
index++;
iter.remove();
- if( ack.getLastMessageId().equals(messageId)) {
- delivered = Math.max(0, delivered - (index+1));
+ if(ack.getLastMessageId().equals(messageId)){
+ delivered=Math.max(0,delivered-(index+1));
return;
}
}
}
-
}
}
- });
+ });
}
-
index++;
- acknowledge(context, ack, node);
- if( ack.getLastMessageId().equals(messageId)) {
- if ( context.isInTransaction() )
- delivered = Math.max(delivered,index+1);
- else
- delivered = Math.max(0, delivered - (index+1));
-
- if( wasFull && !isFull() ) {
+ acknowledge(context,ack,node);
+ if(ack.getLastMessageId().equals(messageId)){
+ if(context.isInTransaction())
+ delivered=Math.max(delivered,index+1);
+ else
+ delivered=Math.max(0,delivered-(index+1));
+ if(wasFull&&!isFull()){
dispatchMatched();
}
return;
- } else {
-// System.out.println("no match: "+ack.getLastMessageId()+","+messageId);
+ }else{
+ // System.out.println("no match: "+ack.getLastMessageId()+","+messageId);
}
}
-
}
log.info("Could not correlate acknowledgment with dispatched message: "+ack);
-
- } else if( ack.isDeliveredAck() ) {
-
+ }else if(ack.isDeliveredAck()){
// Message was delivered but not acknowledged: update pre-fetch counters.
// Acknowledge all dispatched messages up till the message id of the acknowledgment.
int index=0;
- for (Iterator iter = dispatched.iterator(); iter.hasNext();index++) {
- final MessageReference node = (MessageReference)iter.next();
- if( ack.getLastMessageId().equals(node.getMessageId()) ) {
- delivered = Math.max(delivered,index+1);
- if( wasFull && !isFull() ) {
+ for(Iterator iter=dispatched.iterator();iter.hasNext();index++){
+ final MessageReference node=(MessageReference) iter.next();
+ if(ack.getLastMessageId().equals(node.getMessageId())){
+ delivered=Math.max(delivered,index+1);
+ if(wasFull&&!isFull()){
dispatchMatched();
}
return;
}
}
throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
-
- } else if( ack.isPoisonAck() ) {
-
+ }else if(ack.isPoisonAck()){
// TODO: what if the message is already in a DLQ???
-
- // Handle the poison ACK case: we need to send the message to a DLQ
- if( ack.isInTransaction() )
+ // Handle the poison ACK case: we need to send the message to a DLQ
+ if(ack.isInTransaction())
throw new JMSException("Poison ack cannot be transacted: "+ack);
-
// Acknowledge all dispatched messages up till the message id of the acknowledgment.
int index=0;
boolean inAckRange=false;
- for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
- final MessageReference node = (MessageReference)iter.next();
- MessageId messageId = node.getMessageId();
-
- if( ack.getFirstMessageId()==null || ack.getFirstMessageId().equals(messageId)) {
- inAckRange = true;
+ for(Iterator iter=dispatched.iterator();iter.hasNext();){
+ final MessageReference node=(MessageReference) iter.next();
+ MessageId messageId=node.getMessageId();
+ if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
+ inAckRange=true;
}
-
- if( inAckRange ) {
-
+ if(inAckRange){
// Send the message to the DLQ
node.incrementReferenceCount();
- try {
- Message message = node.getMessage();
- if( message !=null ) {
-
- // The original destination and transaction id do not get filled when the message is first sent,
+ try{
+ Message message=node.getMessage();
+ if(message!=null){
+ // The original destination and transaction id do not get filled when the message is first
+ // sent,
// it is only populated if the message is routed to another destination like the DLQ
- if( message.getOriginalDestination()!=null )
+ if(message.getOriginalDestination()!=null)
message.setOriginalDestination(message.getDestination());
- if( message.getOriginalTransactionId()!=null )
+ if(message.getOriginalTransactionId()!=null)
message.setOriginalTransactionId(message.getTransactionId());
-
- DeadLetterStrategy deadLetterStrategy = node.getRegionDestination().getDeadLetterStrategy();
- ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
+ DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
+ ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message
+ .getDestination());
message.setDestination(deadLetterDestination);
message.setTransactionId(null);
message.evictMarshlledForm();
-
- boolean originalFlowControl = context.isProducerFlowControl();
- try {
+ boolean originalFlowControl=context.isProducerFlowControl();
+ try{
context.setProducerFlowControl(false);
- context.getBroker().send(context, message);
- } finally {
+ context.getBroker().send(context,message);
+ }finally{
context.setProducerFlowControl(originalFlowControl);
}
-
- }
- } finally {
+ }
+ }finally{
node.decrementReferenceCount();
}
-
iter.remove();
index++;
- acknowledge(context, ack, node);
- if( ack.getLastMessageId().equals(messageId)) {
-
- delivered = Math.max(0, delivered - (index+1));
-
- if( wasFull && !isFull() ) {
+ acknowledge(context,ack,node);
+ if(ack.getLastMessageId().equals(messageId)){
+ delivered=Math.max(0,delivered-(index+1));
+ if(wasFull&&!isFull()){
dispatchMatched();
}
return;
@@ -248,128 +214,115 @@
}
throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
}
-
throw new JMSException("Invalid acknowledgment: "+ack);
}
-
- protected boolean isFull() {
- return dispatched.size()-delivered >= info.getPrefetchSize() || preLoadSize > preLoadLimit;
- }
-
- protected void dispatchMatched() throws IOException {
- if(!dispatching) {
- dispatching = true;
- try {
- for (Iterator iter = matched.iterator(); iter.hasNext() && !isFull();) {
- MessageReference node = (MessageReference) iter.next();
+
+ protected boolean isFull(){
+ return dispatched.size()-delivered>=info.getPrefetchSize()||preLoadSize>preLoadLimit;
+ }
+
+ protected void dispatchMatched() throws IOException{
+ if(!dispatching){
+ dispatching=true;
+ try{
+ for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){
+ MessageReference node=(MessageReference) iter.next();
iter.remove();
dispatch(node);
}
- } finally {
+ }finally{
dispatching=false;
}
}
}
-
-
- private void dispatch(final MessageReference node) throws IOException {
+ private void dispatch(final MessageReference node) throws IOException{
node.incrementReferenceCount();
-
- final Message message = node.getMessage();
- if( message == null ) {
+ final Message message=node.getMessage();
+ if(message==null){
return;
- }
-
+ }
// Make sure we can dispatch a message.
- if( canDispatch(node) && !isSlaveBroker()) {
-
- MessageDispatch md = createMessageDispatch(node, message);
+ if(canDispatch(node)&&!isSlaveBroker()){
+ MessageDispatch md=createMessageDispatch(node,message);
dispatched.addLast(node);
-
- incrementPreloadSize(node.getMessage().getSize());
-
- if( info.isDispatchAsync() ) {
+ incrementPreloadSize(node.getMessage().getSize());
+ if(info.isDispatchAsync()){
md.setConsumer(new Runnable(){
- public void run() {
- // Since the message gets queued up in async dispatch, we don't want to
+ public void run(){
+ // Since the message gets queued up in async dispatch, we don't want to
// decrease the reference count until it gets put on the wire.
- onDispatch(node, message);
+ onDispatch(node,message);
}
});
context.getConnection().dispatchAsync(md);
- } else {
+ }else{
context.getConnection().dispatchSync(md);
- onDispatch(node, message);
+ onDispatch(node,message);
}
// The onDispatch() does the node.decrementReferenceCount();
- } else {
+ }else{
// We were not allowed to dispatch that message (an other consumer grabbed it before we did)
node.decrementReferenceCount();
}
-
}
- synchronized private void onDispatch(final MessageReference node, final Message message) {
-
- boolean wasFull = isFull();
- decrementPreloadSize(message.getSize());
+ synchronized private void onDispatch(final MessageReference node,final Message message){
+ boolean wasFull=isFull();
+ decrementPreloadSize(message.getSize());
node.decrementReferenceCount();
-
- if( node.getRegionDestination() !=null ) {
+ if(node.getRegionDestination()!=null){
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
context.getConnection().getStatistics().onMessageDequeue(message);
-
- if( wasFull && !isFull() ) {
- try {
+ if(wasFull&&!isFull()){
+ try{
dispatchMatched();
- } catch (IOException e) {
+ }catch(IOException e){
context.getConnection().serviceException(e);
}
}
}
-
}
-
- private int incrementPreloadSize(int size) {
- preLoadSize += size;
+
+ private int incrementPreloadSize(int size){
+ preLoadSize+=size;
return preLoadSize;
}
-
- private int decrementPreloadSize(int size) {
- preLoadSize -= size;
+
+ private int decrementPreloadSize(int size){
+ preLoadSize-=size;
return preLoadSize;
}
-
/**
* @param node
- * @param message TODO
+ * @param message
+ * TODO
* @return
*/
- protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
- MessageDispatch md = new MessageDispatch();
- md.setConsumerId( info.getConsumerId() );
- md.setDestination( node.getRegionDestination().getActiveMQDestination() );
+ protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
+ MessageDispatch md=new MessageDispatch();
+ md.setConsumerId(info.getConsumerId());
+ md.setDestination(node.getRegionDestination().getActiveMQDestination());
md.setMessage(message);
- md.setRedeliveryCounter( node.getRedeliveryCounter() );
+ md.setRedeliveryCounter(node.getRedeliveryCounter());
return md;
}
-
+
/**
* Use when a matched message is about to be dispatched to the client.
*
* @param node
- * @return false if the message should not be dispatched to the client (another sub may have already dispatched it for example).
+ * @return false if the message should not be dispatched to the client (another sub may have already dispatched it
+ * for example).
*/
abstract protected boolean canDispatch(MessageReference node);
-
+
/**
* Used during acknowledgment to remove the message.
- * @throws IOException
+ *
+ * @throws IOException
*/
- protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException {
- }
-
-
+ protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node)
+ throws IOException{}
}
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java?rev=372050&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java Tue Jan 24 15:08:29 2006
@@ -0,0 +1,76 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.springframework.core.io.ClassPathResource;
+
+/**
+ *Test failover for Queues
+ *
+ */
+public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsTest{
+
+
+
+ protected BrokerService master;
+ protected BrokerService slave;
+ protected int inflightMessageCount = 0;
+ protected int failureCount = 50;
+ protected String uriString="failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false";
+
+ protected void setUp() throws Exception{
+ failureCount = super.messageCount/2;
+ super.topic = isTopic();
+ BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/master.xml"));
+ brokerFactory.afterPropertiesSet();
+ master=brokerFactory.getBroker();
+ brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/slave.xml"));
+ brokerFactory.afterPropertiesSet();
+ slave=brokerFactory.getBroker();
+ master.start();
+ slave.start();
+ // wait for thing to connect
+ Thread.sleep(1000);
+ super.setUp();
+
+ }
+
+ protected void tearDown() throws Exception{
+ super.tearDown();
+ slave.stop();
+ master.stop();
+ }
+
+ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
+ return new ActiveMQConnectionFactory(uriString);
+ }
+
+ protected void messageSent() throws Exception{
+ if (++inflightMessageCount >= failureCount){
+ inflightMessageCount = 0;
+ master.stop();
+ }
+ }
+
+ protected boolean isTopic(){
+ return false;
+ }
+}