You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/02/01 18:22:00 UTC

svn commit: r1441545 - in /activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network: DemandForwardingBridgeSupport.java NetworkBridgeConfiguration.java

Author: gtully
Date: Fri Feb  1 17:21:59 2013
New Revision: 1441545

URL: http://svn.apache.org/viewvc?rev=1441545&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3331 - make bridge alwaysSyncSend=true the default

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1441545&r1=1441544&r2=1441545&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Feb  1 17:21:59 2013
@@ -965,25 +965,12 @@ public abstract class DemandForwardingBr
                                 + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
                         }
 
-                        if (!configuration.isAlwaysSyncSend() && !message.isPersistent()) {
-
-                            // If the message was originally sent using async send, we will
-                            // preserve that QOS by bridging it using an async send (small chance
-                            // of message loss).
-                            try {
-                                remoteBroker.oneway(message);
-                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
-                                dequeueCounter.incrementAndGet();
-                            } finally {
-                                sub.decrementOutstandingResponses();
-                            }
-
-                        } else {
+                        if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
 
                             // The message was not sent using async send, so we should only
                             // ack the local broker when we get confirmation that the remote
                             // broker has received the message.
-                            ResponseCallback callback = new ResponseCallback() {
+                            remoteBroker.asyncRequest(message, new ResponseCallback() {
                                 @Override
                                 public void onCompletion(FutureResponse future) {
                                     try {
@@ -1001,9 +988,19 @@ public abstract class DemandForwardingBr
                                         sub.decrementOutstandingResponses();
                                     }
                                 }
-                            };
+                            });
 
-                            remoteBroker.asyncRequest(message, callback);
+                        } else {
+                            // If the message was originally sent using async send, we will
+                            // preserve that QOS by bridging it using an async send (small chance
+                            // of message loss).
+                            try {
+                                remoteBroker.oneway(message);
+                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
+                                dequeueCounter.incrementAndGet();
+                            } finally {
+                                sub.decrementOutstandingResponses();
+                            }
                         }
                     } else {
                         if (LOG.isDebugEnabled()) {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=1441545&r1=1441544&r2=1441545&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Fri Feb  1 17:21:59 2013
@@ -54,7 +54,7 @@ public class NetworkBridgeConfiguration 
     private boolean suppressDuplicateQueueSubscriptions = false;
     private boolean suppressDuplicateTopicSubscriptions = true;
 
-    private boolean alwaysSyncSend = false;
+    private boolean alwaysSyncSend = true;
     private boolean staticBridge = false;
     private boolean useCompression = false;
     private boolean advisoryForFailedForward = false;