You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/01/25 22:47:17 UTC

svn commit: r1438734 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/advisory/ activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-broker/src/main/j...

Author: gtully
Date: Fri Jan 25 21:47:17 2013
New Revision: 1438734

URL: http://svn.apache.org/viewvc?rev=1438734&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4276 - deal with destnotexist for temps. support alwaysSyncSend for duplex. improve reporting of failures. additional test.

Added:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PriorityDispatchPolicy.java   (with props)
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/DestinationDoesNotExistException.java   (contents, props changed)
      - copied, changed from r1438666, activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/DestinationAlreadyExistsException.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyTempDestRemovalAdvisoryRaceTest.java   (with props)
Removed:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/DestinationAlreadyExistsException.java
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/DataFileGeneratorTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1438734&r1=1438733&r2=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Fri Jan 25 21:47:17 2013
@@ -518,7 +518,7 @@ public class AdvisoryBroker extends Brok
         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
     }
 
-    protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
+    public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
         if (getBrokerService().isStarted()) {
             //set properties
             advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1438734&r1=1438733&r2=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Jan 25 21:47:17 2013
@@ -291,7 +291,7 @@ public class TransportConnection impleme
                 // Record the error that caused the transport to stop
                 this.stopError = e;
                 // Wait a little bit to try to get the output buffer to flush
-                // the exption notification to the client.
+                // the exception notification to the client.
                 try {
                     Thread.sleep(500);
                 } catch (InterruptedException ie) {
@@ -1326,12 +1326,7 @@ public class TransportConnection impleme
                     }
                     setDuplexNetworkConnectorId(duplexNetworkConnectorId);
                 }
-                URI uri = broker.getVmConnectorURI();
-                HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
-                map.put("network", "true");
-                map.put("async", "false");
-                uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
-                Transport localTransport = TransportFactory.connect(uri);
+                Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker);
                 Transport remoteBridgeTransport = new ResponseCorrelator(transport);
                 String duplexName = localTransport.toString();
                 if (duplexName.contains("#")) {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=1438734&r1=1438733&r2=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Fri Jan 25 21:47:17 2013
@@ -28,15 +28,13 @@ import java.util.concurrent.locks.Reentr
 import javax.jms.JMSException;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ConsumerBrokerExchange;
-import org.apache.activemq.broker.DestinationAlreadyExistsException;
+import org.apache.activemq.DestinationDoesNotExistException;
 import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.TransportConnection;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatchNotification;
@@ -147,7 +145,7 @@ public abstract class AbstractRegion imp
                     addSubscriptionsForDestination(context, dest);
                 }
                 if (dest == null) {
-                    throw new JMSException("The destination " + destination + " does not exist.");
+                    throw new DestinationDoesNotExistException(destination.getQualifiedName());
                 }
             }
             return dest;
@@ -451,13 +449,8 @@ public abstract class AbstractRegion imp
                 // Try to auto create the destination... re-invoke broker
                 // from the
                 // top so that the proper security checks are performed.
-                try {
-                    context.getBroker().addDestination(context, destination, createTemporary);
-                    dest = addDestination(context, destination, false);
-                } catch (DestinationAlreadyExistsException e) {
-                    // if the destination already exists then lets ignore
-                    // this error
-                }
+                context.getBroker().addDestination(context, destination, createTemporary);
+                dest = addDestination(context, destination, false);
                 // We should now have the dest created.
                 destinationsLock.readLock().lock();
                 try {

Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PriorityDispatchPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PriorityDispatchPolicy.java?rev=1438734&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PriorityDispatchPolicy.java (added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PriorityDispatchPolicy.java Fri Jan 25 21:47:17 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.filter.MessageEvaluationContext;
+
+/**
+ * Priority dispatch policy that sends a message to every subscription that
+ * matches the message in consumer priority order.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ */
+public class PriorityDispatchPolicy extends SimpleDispatchPolicy {
+
+    private final Comparator<? super Subscription> orderedCompare = new Comparator<Subscription>() {
+        @Override
+        public int compare(Subscription o1, Subscription o2) {
+            // We want the list sorted in descending order
+            return o2.getConsumerInfo().getPriority() - o1.getConsumerInfo().getPriority();
+        }
+    };
+
+    public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers)
+            throws Exception {
+        ArrayList<Subscription> ordered = new ArrayList<Subscription>(consumers);
+        Collections.sort(ordered, orderedCompare);
+
+        StringBuffer stringBuffer = new StringBuffer();
+        for (Subscription sub: ordered) {
+            stringBuffer.append(sub.getConsumerInfo().getPriority());
+            stringBuffer.append(',');
+        }
+        //System.err.println("Priority:" + stringBuffer.toString() + ", msg: " + node.getMessage());
+        return super.dispatch(node, msgContext, ordered);
+    }
+
+}

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PriorityDispatchPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PriorityDispatchPolicy.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PriorityDispatchPolicy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java?rev=1438734&r1=1438733&r2=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java Fri Jan 25 21:47:17 2013
@@ -28,6 +28,6 @@ import org.apache.activemq.command.Netwo
  */
 public class DefaultNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
     public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) {
-        return new NetworkBridgeFilter(remoteBrokerPath[0], networkTimeToLive);
+        return new NetworkBridgeFilter(info, remoteBrokerPath[0], networkTimeToLive);
     }
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1438734&r1=1438733&r2=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Jan 25 21:47:17 2013
@@ -34,9 +34,12 @@ import java.util.concurrent.atomic.Atomi
 import javax.management.ObjectName;
 
 import org.apache.activemq.Service;
+import org.apache.activemq.advisory.AdvisoryBroker;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.DestinationDoesNotExistException;
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.TransportConnection;
 import org.apache.activemq.broker.region.AbstractRegion;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
@@ -72,11 +75,13 @@ import org.apache.activemq.command.Shutd
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
+import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.util.IdGenerator;
@@ -140,6 +145,8 @@ public abstract class DemandForwardingBr
     private BrokerService brokerService = null;
     private ObjectName mbeanObjectName;
     private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
+    private Transport duplexInboundLocalBroker = null;
+    private ProducerInfo duplexInboundLocalProducerInfo;
 
     public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
         this.configuration = configuration;
@@ -163,6 +170,24 @@ public abstract class DemandForwardingBr
                 throw new IllegalArgumentException("BrokerService is null on " + this);
             }
 
+            if (isDuplex()) {
+                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
+                duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
+
+                    @Override
+                    public void onCommand(Object o) {
+                        Command command = (Command) o;
+                        serviceLocalCommand(command);
+                    }
+
+                    @Override
+                    public void onException(IOException error) {
+                        serviceLocalException(error);
+                    }
+                });
+                duplexInboundLocalBroker.start();
+            }
+
             localBroker.setTransportListener(new DefaultTransportListener() {
 
                 @Override
@@ -269,6 +294,28 @@ public abstract class DemandForwardingBr
                     localSessionInfo = new SessionInfo(localConnectionInfo, 1);
                     localBroker.oneway(localSessionInfo);
 
+                    if (configuration.isDuplex()) {
+                        // separate inbound chanel for forwards so we don't contend with outbound dispatch on same connection
+                        ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
+                        duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
+                        duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" + configuration.getBrokerName());
+                        duplexLocalConnectionInfo.setUserName(configuration.getUserName());
+                        duplexLocalConnectionInfo.setPassword(configuration.getPassword());
+
+                        if (originalTransport instanceof SslTransport) {
+                            X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
+                            duplexLocalConnectionInfo.setTransportContext(peerCerts);
+                        }
+                        // sync requests that may fail
+                        resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
+                        if (resp instanceof ExceptionResponse) {
+                            throw ((ExceptionResponse)resp).getException();
+                        }
+                        SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
+                        duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
+                        duplexInboundLocalBroker.oneway(duplexInboundSession);
+                        duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
+                    }
                     brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
                     NetworkBridgeListener l = this.networkBridgeListener;
                     if (l != null) {
@@ -388,6 +435,7 @@ public abstract class DemandForwardingBr
                     ServiceStopper ss = new ServiceStopper();
                     ss.stop(remoteBroker);
                     ss.stop(localBroker);
+                    ss.stop(duplexInboundLocalBroker);
                     // Release the started Latch since another thread could be
                     // stuck waiting for it to start up.
                     startedLatch.countDown();
@@ -466,8 +514,11 @@ public abstract class DemandForwardingBr
                     serviceRemoteException(ce.getException());
                 } else {
                     if (isDuplex()) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace(configuration.getBrokerName() + " duplex command type: "+ command.getCommandId());
+                        }
                         if (command.isMessage()) {
-                            ActiveMQMessage message = (ActiveMQMessage) command;
+                            final ActiveMQMessage message = (ActiveMQMessage) command;
                             if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
                                 || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
                                 serviceRemoteConsumerAdvisory(message.getDataStructure());
@@ -476,69 +527,83 @@ public abstract class DemandForwardingBr
                                 if (!isPermissableDestination(message.getDestination(), true)) {
                                     return;
                                 }
-                                if (message.isResponseRequired()) {
-                                    Response reply = new Response();
-                                    reply.setCorrelationId(message.getCommandId());
-                                    localBroker.oneway(message);
-                                    remoteBroker.oneway(reply);
+                                // message being forwarded - we need to propagate the response to our local send
+                                message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
+                                if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
+                                    duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
+                                        final int correlationId = message.getCommandId();
+                                        @Override
+                                        public void onCompletion(FutureResponse resp) {
+                                            try {
+                                                Response reply = resp.getResult();
+                                                reply.setCorrelationId(correlationId);
+                                                remoteBroker.oneway(reply);
+                                            } catch (IOException error) {
+                                                LOG.error("Exception: " + error + " on duplex forward of: " + message);
+                                                serviceRemoteException(error);
+                                            }
+                                        }
+                                    });
                                 } else {
-                                    localBroker.oneway(message);
+                                    duplexInboundLocalBroker.oneway(message);
                                 }
                             }
                         } else {
                             switch (command.getDataStructureType()) {
-                            case ConnectionInfo.DATA_STRUCTURE_TYPE:
-                            case SessionInfo.DATA_STRUCTURE_TYPE:
-                            case ProducerInfo.DATA_STRUCTURE_TYPE:
-                                localBroker.oneway(command);
-                                break;
-                            case MessageAck.DATA_STRUCTURE_TYPE:
-                                MessageAck ack = (MessageAck) command;
-                                DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
-                                if (localSub != null) {
-                                    ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
-                                    localBroker.oneway(ack);
-                                } else {
-                                    LOG.warn("Matching local subscription not found for ack: " + ack);
-                                }
-                                break;
-                            case ConsumerInfo.DATA_STRUCTURE_TYPE:
-                                localStartedLatch.await();
-                                if (started.get()) {
-                                    if (!addConsumerInfo((ConsumerInfo) command)) {
-                                        if (LOG.isDebugEnabled()) {
-                                            LOG.debug("Ignoring ConsumerInfo: " + command);
-                                        }
+                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
+                                case SessionInfo.DATA_STRUCTURE_TYPE:
+                                    localBroker.oneway(command);
+                                    break;
+                                case ProducerInfo.DATA_STRUCTURE_TYPE:
+                                    // using duplexInboundLocalProducerInfo
+                                    break;
+                                case MessageAck.DATA_STRUCTURE_TYPE:
+                                    MessageAck ack = (MessageAck) command;
+                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
+                                    if (localSub != null) {
+                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
+                                        localBroker.oneway(ack);
                                     } else {
-                                        if (LOG.isTraceEnabled()) {
-                                            LOG.trace("Adding ConsumerInfo: " + command);
+                                        LOG.warn("Matching local subscription not found for ack: " + ack);
+                                    }
+                                    break;
+                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
+                                    localStartedLatch.await();
+                                    if (started.get()) {
+                                        if (!addConsumerInfo((ConsumerInfo) command)) {
+                                            if (LOG.isDebugEnabled()) {
+                                                LOG.debug("Ignoring ConsumerInfo: " + command);
+                                            }
+                                        } else {
+                                            if (LOG.isTraceEnabled()) {
+                                                LOG.trace("Adding ConsumerInfo: " + command);
+                                            }
                                         }
+                                    } else {
+                                        // received a subscription whilst stopping
+                                        LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
+                                    }
+                                    break;
+                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
+                                    // initiator is shutting down, controlled case
+                                    // abortive close dealt with by inactivity monitor
+                                    LOG.info("Stopping network bridge on shutdown of remote broker");
+                                    serviceRemoteException(new IOException(command.toString()));
+                                    break;
+                                default:
+                                    if (LOG.isDebugEnabled()) {
+                                        LOG.debug("Ignoring remote command: " + command);
                                     }
-                                } else {
-                                    // received a subscription whilst stopping
-                                    LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
-                                }
-                                break;
-                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
-                                // initiator is shutting down, controlled case
-                                // abortive close dealt with by inactivity monitor
-                                LOG.info("Stopping network bridge on shutdown of remote broker");
-                                serviceRemoteException(new IOException(command.toString()));
-                                break;
-                            default:
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("Ignoring remote command: " + command);
-                                }
                             }
                         }
                     } else {
                         switch (command.getDataStructureType()) {
-                        case KeepAliveInfo.DATA_STRUCTURE_TYPE:
-                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
-                        case ShutdownInfo.DATA_STRUCTURE_TYPE:
-                            break;
-                        default:
-                            LOG.warn("Unexpected remote command: " + command);
+                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
+                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
+                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
+                                break;
+                            default:
+                                LOG.warn("Unexpected remote command: " + command);
                         }
                     }
                 }
@@ -659,7 +724,29 @@ public abstract class DemandForwardingBr
 
     @Override
     public void serviceLocalException(Throwable error) {
+        serviceLocalException(null, error);
+    }
+
+    public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
+
         if (!disposed.get()) {
+            if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException)error).isTemporary() ) {
+                // not a reason to terminate the bridge - temps can disappear with pending sends as the demand sub may outlive the remote dest
+                if (messageDispatch != null) {
+                    LOG.warn("PoisonAck of " + messageDispatch.getMessage().getMessageId() + " on forwarding error: " +  error);
+                    try {
+                        MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
+                        poisonAck.setPoisonCause(error);
+                        localBroker.oneway(poisonAck);
+                    } catch (IOException ioe) {
+                        LOG.error("Failed to posion ack message following forward failure: " + ioe, ioe);
+                    }
+                    fireFailedForwardAdvisory(messageDispatch, error);
+                } else {
+                    LOG.warn("Ignoring exception on forwarding to non existent temp dest: " +  error, error);
+                }
+                return;
+            }
             LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
             LOG.debug("The local Exception was:" + error, error);
             brokerService.getTaskRunnerFactory().execute(new Runnable() {
@@ -672,6 +759,33 @@ public abstract class DemandForwardingBr
         }
     }
 
+    private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
+        if (configuration.isAdvisoryForFailedForward()) {
+            AdvisoryBroker advisoryBroker = null;
+            try {
+                advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
+
+                if (advisoryBroker != null) {
+                    ConnectionContext context = new ConnectionContext();
+                    context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
+                    context.setBroker(brokerService.getBroker());
+
+                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+                    advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
+                    advisoryBroker.fireAdvisory(context,
+                            AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(),
+                            messageDispatch.getMessage(), null, advisoryMessage);
+
+                }
+            } catch (Exception e) {
+                LOG.warn("failed to fire forward failure advisory, cause: " + e);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("detail", e);
+                }
+            }
+        }
+    }
+
     protected Service getControllingService() {
         return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
     }
@@ -684,11 +798,11 @@ public abstract class DemandForwardingBr
 
     protected void removeSubscription(final DemandSubscription sub) throws IOException {
         if (sub != null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
+            if (LOG.isTraceEnabled()) {
+                LOG.trace(configuration.getBrokerName() + " remove local subscription:"
+                        + sub.getLocalInfo().getConsumerId()
+                        + " for remote " + sub.getRemoteInfo().getConsumerId());
             }
-            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
-            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
 
             // continue removal in separate thread to free up this thread for outstanding responses
             // serialise with removeDestination operations so that removeSubs are serialised with removeDestinations
@@ -701,6 +815,9 @@ public abstract class DemandForwardingBr
                         localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
                     } catch (IOException e) {
                         LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e);
+                    } finally {
+                        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
+                        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
                     }
                 }
             });
@@ -778,13 +895,13 @@ public abstract class DemandForwardingBr
                                         Response response = future.getResult();
                                         if (response.isException()) {
                                             ExceptionResponse er = (ExceptionResponse) response;
-                                            serviceLocalException(er.getException());
+                                            serviceLocalException(md, er.getException());
                                         } else {
                                             localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                                             dequeueCounter.incrementAndGet();
                                         }
                                     } catch (IOException e) {
-                                        serviceLocalException(e);
+                                        serviceLocalException(md, e);
                                     } finally {
                                         sub.decrementOutstandingResponses();
                                     }
@@ -1195,6 +1312,7 @@ public abstract class DemandForwardingBr
 
     final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
         ConsumerInfo info = new ConsumerInfo();
+        info.setNetworkSubscription(true);
         info.setDestination(destination);
 
         // Indicate that this subscription is being made on behalf of the remote broker.

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=1438734&r1=1438733&r2=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Fri Jan 25 21:47:17 2013
@@ -57,6 +57,7 @@ public class NetworkBridgeConfiguration 
     private boolean alwaysSyncSend = false;
     private boolean staticBridge = false;
     private boolean useCompression = false;
+    private boolean advisoryForFailedForward = false;
 
     /**
      * @return the conduitSubscriptions
@@ -385,4 +386,12 @@ public class NetworkBridgeConfiguration 
     public boolean isUseCompression() {
         return useCompression;
     }
+
+    public boolean isAdvisoryForFailedForward() {
+        return advisoryForFailedForward;
+    }
+
+    public void setAdvisoryForFailedForward(boolean advisoryForFailedForward) {
+        this.advisoryForFailedForward = advisoryForFailedForward;
+    }
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java?rev=1438734&r1=1438733&r2=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java Fri Jan 25 21:47:17 2013
@@ -16,7 +16,12 @@
  */
 package org.apache.activemq.network;
 
+import java.net.URI;
+import java.util.HashMap;
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.util.URISupport;
 
 /**
  * Factory for network bridges
@@ -65,4 +70,13 @@ public final class NetworkBridgeFactory 
         }
         return result;
     }
+
+    public static Transport createLocalTransport(Broker broker) throws Exception {
+        URI uri = broker.getVmConnectorURI();
+        HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
+        map.put("network", "true");
+        map.put("async", "false");
+        uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
+        return TransportFactory.connect(uri);
+    }
 }

Copied: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/DestinationDoesNotExistException.java (from r1438666, activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/DestinationAlreadyExistsException.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/DestinationDoesNotExistException.java?p2=activemq/trunk/activemq-client/src/main/java/org/apache/activemq/DestinationDoesNotExistException.java&p1=activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/DestinationAlreadyExistsException.java&r1=1438666&r2=1438734&rev=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/DestinationAlreadyExistsException.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/DestinationDoesNotExistException.java Fri Jan 25 21:47:17 2013
@@ -14,26 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker;
+package org.apache.activemq;
 
 import javax.jms.JMSException;
 
-import org.apache.activemq.command.ActiveMQDestination;
-
 /**
- * An exception thrown if a destination is attempted to be created when it already exists.
- * 
+ * An exception thrown on a send if a destination does not exist.
+ * Allows a network bridge to easily cherry-pick and ignore
  * 
  */
-public class DestinationAlreadyExistsException extends JMSException {
-    private final ActiveMQDestination destination;
+public class DestinationDoesNotExistException extends JMSException {
+
+    public DestinationDoesNotExistException(String destination) {
+        super(destination);
+    }
 
-    public DestinationAlreadyExistsException(ActiveMQDestination destination) {
-        super("Destination already exists: " + destination);
-        this.destination = destination;
+    public boolean isTemporary() {
+        return getMessage().startsWith("temp-");
     }
 
-    public ActiveMQDestination getDestination() {
-        return destination;
+    @Override
+    public String getLocalizedMessage() {
+        return "The destination " + getMessage() + " does not exist.";
     }
 }

Propchange: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/DestinationDoesNotExistException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/DestinationDoesNotExistException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/DestinationDoesNotExistException.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=1438734&r1=1438733&r2=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java Fri Jan 25 21:47:17 2013
@@ -51,6 +51,7 @@ public final class AdvisorySupport {
     public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
     public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
     public static final String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge";
+    public static final String NETWORK_BRIDGE_FORWARD_FAILURE_TOPIC_PREFIX = NETWORK_BRIDGE_TOPIC_PREFIX + ".ForwardFailure";
     public static final String AGENT_TOPIC = "ActiveMQ.Agent";
     public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
     public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId";
@@ -528,4 +529,8 @@ public final class AdvisorySupport {
     public static Destination getAgentDestination() {
         return AGENT_TOPIC_DESTINATION;
     }
+
+    public static ActiveMQTopic getNetworkBridgeForwardFailureAdvisoryTopic() {
+        return new ActiveMQTopic(NETWORK_BRIDGE_FORWARD_FAILURE_TOPIC_PREFIX);
+    }
 }

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java?rev=1438734&r1=1438733&r2=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java Fri Jan 25 21:47:17 2013
@@ -37,13 +37,15 @@ public class NetworkBridgeFilter impleme
 
     protected BrokerId networkBrokerId;
     protected int networkTTL;
+    transient ConsumerInfo consumerInfo;
 
     public NetworkBridgeFilter() {
     }
 
-    public NetworkBridgeFilter(BrokerId networkBrokerId, int networkTTL) {
+    public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int networkTTL) {
         this.networkBrokerId = networkBrokerId;
         this.networkTTL = networkTTL;
+        this.consumerInfo = consumerInfo;
     }
 
     public byte getDataStructureType() {
@@ -91,21 +93,29 @@ public class NetworkBridgeFilter impleme
             return false;
         }
 
-        if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
-            ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
-            hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
-            if (hops >= networkTTL) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
-                }
+        if (message.isAdvisory()) {
+            if (consumerInfo != null && consumerInfo.isNetworkSubscription()) {
+                // they will be interpreted by the bridge leading to dup commands
+                //if (LOG.isTraceEnabled()) {
+                LOG.error("not propagating advisory to network sub: " + consumerInfo.getConsumerId() + ", message: "+ message);
+                //}
                 return false;
-            }
+            } else if ( message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
+                ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
+                hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
+                if (hops >= networkTTL) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
+                    }
+                    return false;
+                }
 
-            if (contains(info.getBrokerPath(), networkBrokerId)) {
-                LOG.trace("ConsumerInfo advisory all ready routed once through target broker ("
-                        + networkBrokerId + "), path: "
-                        + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message);
-                return false;
+                if (contains(info.getBrokerPath(), networkBrokerId)) {
+                    LOG.trace("ConsumerInfo advisory all ready routed once through target broker ("
+                            + networkBrokerId + "), path: "
+                            + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message);
+                    return false;
+                }
             }
         }
         return true;

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=1438734&r1=1438733&r2=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java Fri Jan 25 21:47:17 2013
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Adds the incrementing sequence number to commands along with performing the
- * corelation of responses to requests to create a blocking request-response
+ * correlation of responses to requests to create a blocking request-response
  * semantics.
  * 
  * 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java?rev=1438734&r1=1438733&r2=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java Fri Jan 25 21:47:17 2013
@@ -19,10 +19,12 @@ package org.apache.activemq.network;
 import javax.jms.MessageProducer;
 import javax.jms.TemporaryQueue;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
 import org.junit.Test;
 
 
 import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
 
 public class DuplexNetworkTest extends SimpleNetworkTest {
 
@@ -47,7 +49,12 @@ public class DuplexNetworkTest extends S
         Thread.sleep(100);
         assertEquals("Destination not created", 1, remoteBroker.getAdminView().getTemporaryQueues().length);
         temp.delete();
-        Thread.sleep(100);
-        assertEquals("Destination not deleted", 0, remoteBroker.getAdminView().getTemporaryQueues().length);
+
+        assertTrue("Destination not deleted", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == remoteBroker.getAdminView().getTemporaryQueues().length;
+            }
+        }));
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=1438734&r1=1438733&r2=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java Fri Jan 25 21:47:17 2013
@@ -72,6 +72,8 @@ public class SimpleNetworkTest {
     protected ActiveMQTopic excluded;
     protected String consumerName = "durableSubs";
 
+    // works b/c of non marshaling vm transport, the connection
+    // ref from the client is used during the forward
     @Test
     public void testMessageCompression() throws Exception {
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/DataFileGeneratorTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/DataFileGeneratorTestSupport.java?rev=1438734&r1=1438733&r2=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/DataFileGeneratorTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/DataFileGeneratorTestSupport.java Fri Jan 25 21:47:17 2013
@@ -323,7 +323,7 @@ public abstract class DataFileGeneratorT
     }
 
     protected BooleanExpression createBooleanExpression(String string) {
-        return new NetworkBridgeFilter(new BrokerId(string), 10);
+        return new NetworkBridgeFilter(null, new BrokerId(string), 10);
     }
 
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java?rev=1438734&r1=1438733&r2=1438734&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java Fri Jan 25 21:47:17 2013
@@ -137,12 +137,12 @@ public class BrokerQueueNetworkWithDisco
                     public boolean isSatisified() throws Exception {
                         long numVmConnections = VMTransportFactory.SERVERS.get(HUB).getConnectionCount();
                         LOG.info("Num VM connetions:" + numVmConnections);
-                        return numVmConnections == 1;
+                        return numVmConnections == 2;
                     }});
         if (!allGood) {
             dumpAllThreads("ExtraHubVMConnection");
         }
-        assertTrue("should be only one vm connection for the single network duplex network connector", allGood);
+        assertTrue("should be only 2 vm connections for the single network duplex network connector", allGood);
     }
     
     public void testTwoDuplexNCsAreAllowed() throws Exception {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyTempDestRemovalAdvisoryRaceTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyTempDestRemovalAdvisoryRaceTest.java?rev=1438734&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyTempDestRemovalAdvisoryRaceTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyTempDestRemovalAdvisoryRaceTest.java Fri Jan 25 21:47:17 2013
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.PriorityDispatchPolicy;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.network.DemandForwardingBridgeSupport;
+import org.apache.activemq.network.NetworkBridge;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
+ */
+public class RequestReplyTempDestRemovalAdvisoryRaceTest extends JmsMultipleBrokersTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(RequestReplyTempDestRemovalAdvisoryRaceTest.class);
+
+    private static final String BROKER_A = "BrokerA";
+    private static final String BROKER_B = "BrokerB";
+    private static final String BROKER_C = "BrokerC";
+
+    private static final int NUM_RESPONDENTS = 1;
+    private static final int NUM_SENDS = 1;
+    private static final int RANDOM_SLEEP_FOR_RESPONDENT_MS = 0;
+    private static final int RANDOM_SLEEP_FOR_SENDER_MS = 1;
+    private static final String QUEUE_NAME = "foo.queue";
+    private static String[] TEST_ITERATIONS = new String[]{QUEUE_NAME+"0", QUEUE_NAME+"1", QUEUE_NAME+"2", QUEUE_NAME+"3"};
+
+    final AtomicLong messageCount = new AtomicLong(0);
+    final AtomicLong respondentSendError = new AtomicLong(0);
+    final AtomicLong responseReceived = new AtomicLong(0);
+    final AtomicLong sendsWithNoConsumers = new AtomicLong(0);
+    final AtomicLong forwardFailures = new AtomicLong(0);
+
+
+    protected final AtomicBoolean shutdown = new AtomicBoolean(false);
+    HashSet<NetworkConnector> networkConnectors = new HashSet<NetworkConnector>();
+    HashSet<Connection> advisoryConsumerConnections = new HashSet<Connection>();
+    Appender slowDownAppender;
+
+    CountDownLatch consumerDemandExists;
+
+    protected boolean useDuplex = false;
+
+    public static Test suite() {
+        return suite(RequestReplyTempDestRemovalAdvisoryRaceTest.class);
+    }
+
+    /**
+     * Notes: to reliably reproduce use debugger... set a "thread" breakpoint at line 679 in DemandForwardingBridgeSupport,
+     * and only break on the "2nd" pass (broker C's bridge). Allow debugging to continue shortly after hitting
+     * the breakpoint, for this test we use a logging appender to implement the pause,
+     * it fails most of the time, hence the combos
+     */
+    public void initCombos() {
+        addCombinationValues("QUEUE_NAME", TEST_ITERATIONS);
+    }
+
+    public void testTempDestRaceDuplex() throws Exception {
+        // duplex
+        useDuplex = true;
+        bridgeBrokers(BROKER_A, BROKER_B, false, 3);
+        bridgeBrokers(BROKER_B, BROKER_C, false, 3);
+
+        startAllBrokers();
+
+        waitForBridgeFormation(1);
+
+        HashSet<NetworkBridge> bridgesStart = new HashSet<NetworkBridge>();
+        for (NetworkConnector networkConnector : networkConnectors) {
+            bridgesStart.addAll(networkConnector.activeBridges());
+        }
+        LOG.info("Bridges start:" + bridgesStart);
+
+        slowDownAdvisoryDispatch();
+        noConsumerAdvisory();
+        forwardFailureAdvisory();
+
+        // set up respondents
+        ExecutorService respondentThreadPool = Executors.newFixedThreadPool(50);
+        BrokerItem brokerA = brokers.get(BROKER_A);
+        ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(brokerA.broker.getTransportConnectorByScheme("tcp").getName()
+                + "?jms.watchTopicAdvisories=false");
+        brokerAFactory.setAlwaysSyncSend(true);
+        for (int i = 0; i < NUM_RESPONDENTS; i++) {
+            respondentThreadPool.execute(new EchoRespondent(brokerAFactory));
+        }
+
+        // fire off sends
+        ExecutorService senderThreadPool = Executors.newCachedThreadPool();
+        BrokerItem brokerC = brokers.get(BROKER_C);
+        ActiveMQConnectionFactory brokerCFactory = new ActiveMQConnectionFactory(brokerC.broker.getTransportConnectorByScheme("tcp").getName()
+                + "?jms.watchTopicAdvisories=false");
+        for (int i = 0; i < NUM_SENDS; i++) {
+            senderThreadPool.execute(new MessageSender(brokerCFactory));
+        }
+
+        senderThreadPool.shutdown();
+        senderThreadPool.awaitTermination(30, TimeUnit.SECONDS);
+        TimeUnit.SECONDS.sleep(15);
+        LOG.info("shutting down");
+        shutdown.compareAndSet(false, true);
+
+        HashSet<NetworkBridge> bridgesEnd = new HashSet<NetworkBridge>();
+        for (NetworkConnector networkConnector : networkConnectors) {
+            bridgesEnd.addAll(networkConnector.activeBridges());
+        }
+        LOG.info("Bridges end:" + bridgesEnd);
+
+        assertEquals("no new bridges created", bridgesStart, bridgesEnd);
+
+        // validate success or error of dlq
+        LOG.info("received: " + responseReceived.get() + ", respondent error: " + respondentSendError.get()
+                + ", noConsumerCount: " + sendsWithNoConsumers.get()
+                + ", forwardFailures: " + forwardFailures.get());
+        assertEquals("success or error", NUM_SENDS, respondentSendError.get() + forwardFailures.get()
+                + responseReceived.get() + sendsWithNoConsumers.get());
+
+    }
+
+    private void forwardFailureAdvisory() throws JMSException {
+        for (BrokerItem item : brokers.values()) {
+            ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(item.broker.getTransportConnectorByScheme("tcp").getName()
+                    + "?jms.watchTopicAdvisories=false");
+            Connection connection = brokerAFactory.createConnection();
+            connection.start();
+            connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(
+                    AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic()).setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    forwardFailures.incrementAndGet();
+                }
+            });
+        }
+    }
+
+    private void noConsumerAdvisory() throws JMSException {
+        for (BrokerItem item : brokers.values()) {
+            ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(item.broker.getTransportConnectorByScheme("tcp").getName()
+                    + "?jms.watchTopicAdvisories=false");
+            Connection connection = brokerAFactory.createConnection();
+            connection.start();
+            connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(
+                    AdvisorySupport.getNoTopicConsumersAdvisoryTopic(new ActiveMQTempTopic(">"))).setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    sendsWithNoConsumers.incrementAndGet();
+                }
+            });
+        }
+    }
+
+
+    public void testTempDestRace() throws Exception {
+        // non duplex
+        bridgeBrokers(BROKER_A, BROKER_B, false, 3);
+        bridgeBrokers(BROKER_B, BROKER_A, false, 3);
+        bridgeBrokers(BROKER_B, BROKER_C, false, 3);
+        bridgeBrokers(BROKER_C, BROKER_B, false, 3);
+
+        startAllBrokers();
+
+        waitForBridgeFormation(1);
+
+        HashSet<NetworkBridge> bridgesStart = new HashSet<NetworkBridge>();
+        for (NetworkConnector networkConnector : networkConnectors) {
+            bridgesStart.addAll(networkConnector.activeBridges());
+        }
+
+        slowDownAdvisoryDispatch();
+        noConsumerAdvisory();
+        forwardFailureAdvisory();
+
+
+        // set up respondents
+        ExecutorService respondentThreadPool = Executors.newFixedThreadPool(50);
+        BrokerItem brokerA = brokers.get(BROKER_A);
+        ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(brokerA.broker.getTransportConnectorByScheme("tcp").getName()
+                + "?jms.watchTopicAdvisories=false");
+        brokerAFactory.setAlwaysSyncSend(true);
+        for (int i = 0; i < NUM_RESPONDENTS; i++) {
+            respondentThreadPool.execute(new EchoRespondent(brokerAFactory));
+        }
+
+        // fire off sends
+        ExecutorService senderThreadPool = Executors.newCachedThreadPool();
+        BrokerItem brokerC = brokers.get(BROKER_C);
+        ActiveMQConnectionFactory brokerCFactory = new ActiveMQConnectionFactory(brokerC.broker.getTransportConnectorByScheme("tcp").getName()
+                + "?jms.watchTopicAdvisories=false");
+        for (int i = 0; i < NUM_SENDS; i++) {
+            senderThreadPool.execute(new MessageSender(brokerCFactory));
+        }
+
+        senderThreadPool.shutdown();
+        senderThreadPool.awaitTermination(30, TimeUnit.SECONDS);
+        TimeUnit.SECONDS.sleep(10);
+        LOG.info("shutting down");
+        shutdown.compareAndSet(false, true);
+
+        HashSet<NetworkBridge> bridgesEnd = new HashSet<NetworkBridge>();
+        for (NetworkConnector networkConnector : networkConnectors) {
+            bridgesEnd.addAll(networkConnector.activeBridges());
+        }
+        assertEquals("no new bridges created", bridgesStart, bridgesEnd);
+
+        // validate success or error or dlq
+        LOG.info("received: " + responseReceived.get() + ", respondent error: " + respondentSendError.get()
+                + ", noConsumerCount: " + sendsWithNoConsumers.get()
+                + ", forwardFailures: " + forwardFailures.get());
+        assertEquals("success or error", NUM_SENDS, respondentSendError.get() + forwardFailures.get()
+                + responseReceived.get() + sendsWithNoConsumers.get());
+
+    }
+
+    private void slowDownAdvisoryDispatch() throws Exception {
+
+        org.apache.log4j.Logger.getLogger(DemandForwardingBridgeSupport.class).setLevel(Level.DEBUG);
+
+        // instrument a logger to block the processing of a remove sub advisory
+        // simulate a slow thread
+        slowDownAppender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent loggingEvent) {
+                if (Level.DEBUG.equals(loggingEvent.getLevel())) {
+                    String message = loggingEvent.getMessage().toString();
+                    if (message.startsWith("BrokerB") && message.contains("remove local subscription")) {
+                        // sleep for a bit
+                        try {
+                            consumerDemandExists.countDown();
+                            System.err.println("Sleeping on receipt of remove info debug message: " + message);
+                            TimeUnit.SECONDS.sleep(2);
+                        } catch (Exception ignored) {
+                        }
+                    }
+
+                }
+            }
+        };
+
+        org.apache.log4j.Logger.getRootLogger().addAppender(slowDownAppender);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        responseReceived.set(0);
+        respondentSendError.set(0);
+        forwardFailures.set(0);
+        sendsWithNoConsumers.set(0);
+        networkConnectors.clear();
+        advisoryConsumerConnections.clear();
+        consumerDemandExists = new CountDownLatch(1);
+        createBroker(new URI("broker:(tcp://localhost:0)/" + BROKER_A + "?persistent=false&useJmx=false")).setDedicatedTaskRunner(false);
+        createBroker(new URI("broker:(tcp://localhost:0)/" + BROKER_B + "?persistent=false&useJmx=false")).setDedicatedTaskRunner(false);
+        createBroker(new URI("broker:(tcp://localhost:0)/" + BROKER_C + "?persistent=false&useJmx=false")).setDedicatedTaskRunner(false);
+
+        PolicyMap map = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setSendAdvisoryIfNoConsumers(true);
+        DeadLetterStrategy deadletterStrategy = new SharedDeadLetterStrategy();
+        deadletterStrategy.setProcessNonPersistent(true);
+        defaultEntry.setDeadLetterStrategy(deadletterStrategy);
+        defaultEntry.setDispatchPolicy(new PriorityDispatchPolicy());
+        map.put(new ActiveMQTempTopic(">"), defaultEntry);
+
+        for (BrokerItem item : brokers.values()) {
+            item.broker.setDestinationPolicy(map);
+        }
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (slowDownAppender != null) {
+            org.apache.log4j.Logger.getRootLogger().removeAppender(slowDownAppender);
+        }
+        for (Connection connection : advisoryConsumerConnections) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+
+    protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly, int networkTTL) throws Exception {
+        NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName, dynamicOnly, networkTTL, true);
+        connector.setBridgeTempDestinations(true);
+        connector.setAdvisoryForFailedForward(true);
+        connector.setDuplex(useDuplex);
+        connector.setAlwaysSyncSend(true);
+        networkConnectors.add(connector);
+        return connector;
+    }
+
+    abstract class MessageClient {
+        protected Connection connection;
+        protected Session session;
+        protected MessageConsumer consumer;
+        protected MessageProducer producer;
+        protected Random random;
+        protected int timeToSleep;
+
+        // set up the connection and session
+        public MessageClient(ActiveMQConnectionFactory factory, int timeToSleep) throws Exception {
+            this.connection = factory.createConnection();
+            this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            this.timeToSleep = timeToSleep;
+            this.random = new Random(System.currentTimeMillis());
+            preInit();
+            initProducer();
+            initConsumer();
+            this.connection.start();
+        }
+
+        protected void preInit() throws JMSException {
+
+        }
+
+        abstract protected void initProducer() throws JMSException;
+
+        abstract protected void initConsumer() throws JMSException;
+    }
+
+    class MessageSender extends MessageClient implements Runnable {
+
+
+        protected Destination tempDest;
+
+        public MessageSender(ActiveMQConnectionFactory factory) throws Exception {
+            super(factory, RANDOM_SLEEP_FOR_SENDER_MS);
+        }
+
+        @Override
+        public void run() {
+            // create a message
+            try {
+                TextMessage message = session.createTextMessage("request: message #" + messageCount.getAndIncrement());
+                message.setJMSReplyTo(tempDest);
+                producer.send(message);
+                LOG.info("SENDER: Message [" + message.getText() + "] has been sent.");
+
+                Message incomingMessage = consumer.receive(timeToSleep);
+                if (incomingMessage instanceof TextMessage) {
+                    try {
+                        LOG.info("SENDER: Got a response from echo service!" + ((TextMessage) incomingMessage).getText());
+                        responseReceived.incrementAndGet();
+                    } catch (JMSException e) {
+                        LOG.error("SENDER: might want to see why i'm getting non-text messages..." + incomingMessage, e);
+                    }
+                } else {
+                    LOG.info("SENDER: Did not get a response this time");
+                }
+
+
+            } catch (JMSException e) {
+                LOG.error("SENDER: Could not complete message sending properly: " + e.getMessage());
+            } finally {
+                try {
+                    producer.close();
+                    consumer.close();
+                    session.close();
+                    connection.close();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        @Override
+        protected void preInit() throws JMSException {
+            this.tempDest = session.createTemporaryTopic();
+
+        }
+
+        @Override
+        protected void initProducer() throws JMSException {
+            this.producer = session.createProducer(new ActiveMQQueue(QUEUE_NAME));
+        }
+
+        @Override
+        protected void initConsumer() throws JMSException {
+            this.consumer = session.createConsumer(tempDest);
+            LOG.info("consumer for: " + tempDest + ", " + consumer);
+
+        }
+
+    }
+
+    class EchoRespondent extends MessageClient implements Runnable {
+
+        public EchoRespondent(ActiveMQConnectionFactory factory) throws Exception {
+            super(factory, RANDOM_SLEEP_FOR_RESPONDENT_MS);
+        }
+
+        @Override
+        public void run() {
+            try {
+                LOG.info("RESPONDENT LISTENING");
+                while (!shutdown.get()) {
+                    Message incomingMessage = consumer.receive(1000);
+                    if (incomingMessage instanceof TextMessage) {
+                        ActiveMQTextMessage textMessage = (ActiveMQTextMessage) incomingMessage;
+                        try {
+                            LOG.info("RESPONDENT: Received a message: [" + textMessage.getText() + "]" + Arrays.asList(textMessage.getBrokerPath()));
+                            Message message = session.createTextMessage("reply: " + textMessage.getText());
+                            Destination replyTo = incomingMessage.getJMSReplyTo();
+                            TimeUnit.MILLISECONDS.sleep(timeToSleep);
+                            consumerDemandExists.await(5, TimeUnit.SECONDS);
+                            try {
+                                producer.send(replyTo, message);
+                                LOG.info("RESPONDENT: sent reply:" + message.getJMSMessageID() + " back to: " + replyTo);
+                            } catch (JMSException e) {
+                                LOG.error("RESPONDENT: could not send reply message: " + e.getLocalizedMessage(), e);
+                                respondentSendError.incrementAndGet();
+                            }
+                        } catch (JMSException e) {
+                            LOG.error("RESPONDENT: could not create the reply message: " + e.getLocalizedMessage(), e);
+                        } catch (InterruptedException e) {
+                            LOG.info("RESPONDENT could not generate a random number");
+                        }
+                    }
+                }
+            } catch (JMSException e) {
+                LOG.info("RESPONDENT: Could not set the message listener on the respondent");
+            }
+        }
+
+        @Override
+        protected void initProducer() throws JMSException {
+            this.producer = session.createProducer(null);
+            // so that we can get an advisory on sending with no consumers
+            this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        }
+
+        @Override
+        protected void initConsumer() throws JMSException {
+            this.consumer = session.createConsumer(new ActiveMQQueue(QUEUE_NAME));
+        }
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyTempDestRemovalAdvisoryRaceTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyTempDestRemovalAdvisoryRaceTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date