You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2021/12/23 13:49:56 UTC

[activemq] branch activemq-5.16.x updated (485d32f -> 6a25d65)

This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a change to branch activemq-5.16.x
in repository https://gitbox.apache.org/repos/asf/activemq.git.


    from 485d32f  [AMQ-8405] Upgrade to ASM 9.2
     new c58e531  [AMQ-8413] Add remoteUserName and remotePassword config fields to network connector (#725)
     new f2dbc92  [AMQ-8400] Add synchronization handling for Transaction to prevent CME (#720)
     new 6a25d65  [AMQ-8397] Add a destination option to sendDuplicateFromStoreToDLQ (#724)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../activemq/broker/jmx/DestinationView.java       |  9 ++++
 .../activemq/broker/jmx/DestinationViewMBean.java  | 20 ++++++++
 .../activemq/broker/jmx/NetworkConnectorView.java  | 31 +++++++++---
 .../broker/jmx/NetworkConnectorViewMBean.java      |  8 +++
 .../activemq/broker/region/BaseDestination.java    | 14 +++++-
 .../apache/activemq/broker/region/Destination.java |  4 ++
 .../activemq/broker/region/DestinationFilter.java  | 10 ++++
 .../broker/region/DestinationStatistics.java       | 11 ++++
 .../activemq/broker/region/policy/PolicyEntry.java | 17 ++++++-
 .../network/DemandForwardingBridgeSupport.java     | 10 +++-
 .../network/NetworkBridgeConfiguration.java        | 30 +++++++++++
 .../apache/activemq/transaction/Transaction.java   | 58 +++++++++++++---------
 12 files changed, 188 insertions(+), 34 deletions(-)

[activemq] 03/03: [AMQ-8397] Add a destination option to sendDuplicateFromStoreToDLQ (#724)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch activemq-5.16.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit 6a25d654f22c034bbb4e5d23f931507e911d0308
Author: Matt Pavlovich <ma...@hyte.io>
AuthorDate: Mon Dec 20 08:38:32 2021 -0600

    [AMQ-8397] Add a destination option to sendDuplicateFromStoreToDLQ (#724)
    
    - Default 'true' to match existing behavior
     - Added counter to DestinationView
---
 .../apache/activemq/broker/jmx/DestinationView.java  |  9 +++++++++
 .../activemq/broker/jmx/DestinationViewMBean.java    | 20 ++++++++++++++++++++
 .../activemq/broker/region/BaseDestination.java      | 14 +++++++++++++-
 .../apache/activemq/broker/region/Destination.java   |  4 ++++
 .../activemq/broker/region/DestinationFilter.java    | 10 ++++++++++
 .../broker/region/DestinationStatistics.java         | 11 +++++++++++
 .../activemq/broker/region/policy/PolicyEntry.java   | 17 ++++++++++++++++-
 7 files changed, 83 insertions(+), 2 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index d795c4d..db0c67f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -103,6 +103,11 @@ public class DestinationView implements DestinationViewMBean {
     }
 
     @Override
+    public long getDuplicateFromStoreCount() {
+        return destination.getDestinationStatistics().getDuplicateFromStore().getCount();
+    }
+
+    @Override
     public long getInFlightCount() {
         return destination.getDestinationStatistics().getInflight().getCount();
     }
@@ -570,4 +575,8 @@ public class DestinationView implements DestinationViewMBean {
         return destination.getDestinationStatistics().getBlockedTime().getTotalTime();
     }
 
+    @Override
+    public boolean isSendDuplicateFromStoreToDLQ() {
+        return destination.isSendDuplicateFromStoreToDLQ();
+    }
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
index ad0ae32..526e648 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
@@ -70,6 +70,26 @@ public interface DestinationViewMBean {
     long getDequeueCount();
 
     /**
+     * Returns the number of duplicate messages that have been paged-in 
+     * from the store.
+     *
+     * @return The number of duplicate messages that have been paged-in 
+     *         from the store.
+     */
+    @MBeanInfo("Number of duplicate messages that have been paged-in from the store.")
+    long getDuplicateFromStoreCount();
+
+    /**
+     * Returns the config setting to send a duplicate message from store 
+     * to the dead letter queue.
+     *
+     * @return The config setting to send a duplicate message from store 
+     *         to the dead letter queue.
+     */
+    @MBeanInfo("Config setting to send a duplicate from store message to the dead letter queue.")
+    boolean isSendDuplicateFromStoreToDLQ();
+
+    /**
      * Returns the number of messages that have been acknowledged by network subscriptions from the
      * destination.
      *
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 443a8f2..55b3cba 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -88,6 +88,7 @@ public abstract class BaseDestination implements Destination {
     private boolean advisoryForDelivery;
     private boolean advisoryForConsumed;
     private boolean sendAdvisoryIfNoConsumers;
+    private boolean sendDuplicateFromStoreToDLQ = true;
     private boolean includeBodyForAdvisory;
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
     protected final BrokerService brokerService;
@@ -477,6 +478,14 @@ public abstract class BaseDestination implements Destination {
         this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
     }
 
+    public boolean isSendDuplicateFromStoreToDLQ() {
+        return this.sendDuplicateFromStoreToDLQ;
+    }
+
+    public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) {
+        this.sendDuplicateFromStoreToDLQ = sendDuplicateFromStoreToDLQ;
+    }
+
     public boolean isIncludeBodyForAdvisory() {
         return includeBodyForAdvisory;
     }
@@ -889,11 +898,14 @@ public abstract class BaseDestination implements Destination {
 
     @Override
     public void duplicateFromStore(Message message, Subscription subscription) {
+        destinationStatistics.getDuplicateFromStore().increment();
         ConnectionContext connectionContext = createConnectionContext();
         getLog().warn("{}{}, redirecting {} for dlq processing", DUPLICATE_FROM_STORE_MSG_PREFIX, destination, message.getMessageId());
         Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination);
         message.setRegionDestination(this);
-        broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
+        if(this.isSendDuplicateFromStoreToDLQ()) {
+            broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
+        }
         MessageAck messageAck = new MessageAck(message, MessageAck.POISON_ACK_TYPE, 1);
         messageAck.setPoisonCause(cause);
         try {
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
index 031015e..180cf25 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
@@ -245,4 +245,8 @@ public interface Destination extends Service, Task, Message.MessageDestination {
     public void clearPendingMessages(int pendingAdditionsCount);
 
     void duplicateFromStore(Message message, Subscription subscription);
+
+    boolean isSendDuplicateFromStoreToDLQ();
+
+    void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ);
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index 0d0db05..2c11bc3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -394,6 +394,16 @@ public class DestinationFilter implements Destination {
         next.duplicateFromStore(message, subscription);
     }
 
+    @Override
+    public boolean isSendDuplicateFromStoreToDLQ() {
+        return next.isSendDuplicateFromStoreToDLQ();
+    }
+
+    @Override
+    public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) {
+        next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ);
+    }
+
     public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
         if (next instanceof DestinationFilter) {
             DestinationFilter filter = (DestinationFilter) next;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
index 0a9176e..88dd911 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
@@ -37,6 +37,7 @@ public class DestinationStatistics extends StatsImpl {
     protected CountStatisticImpl messages;
     protected PollCountStatisticImpl messagesCached;
     protected CountStatisticImpl dispatched;
+    protected CountStatisticImpl duplicateFromStore;
     protected CountStatisticImpl inflight;
     protected CountStatisticImpl expired;
     protected TimeStatisticImpl processTime;
@@ -50,6 +51,7 @@ public class DestinationStatistics extends StatsImpl {
         enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
         dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination");
         dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination");
+        duplicateFromStore = new CountStatisticImpl("duplicateFromStore", "The number of duplicate messages that have been paged-in from the store for this destination");
         forwards = new CountStatisticImpl("forwards", "The number of messages that have been forwarded to a networked broker from the destination");
         inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement");
         expired = new CountStatisticImpl("expired", "The number of messages that have expired");
@@ -68,6 +70,7 @@ public class DestinationStatistics extends StatsImpl {
         addStatistic("enqueues", enqueues);
         addStatistic("dispatched", dispatched);
         addStatistic("dequeues", dequeues);
+        addStatistic("duplicateFromStore", duplicateFromStore);
         addStatistic("inflight", inflight);
         addStatistic("expired", expired);
         addStatistic("consumers", consumers);
@@ -124,6 +127,10 @@ public class DestinationStatistics extends StatsImpl {
         return dispatched;
     }
 
+    public CountStatisticImpl getDuplicateFromStore() {
+        return duplicateFromStore;
+    }
+
     public TimeStatisticImpl getProcessTime() {
         return this.processTime;
     }
@@ -145,6 +152,7 @@ public class DestinationStatistics extends StatsImpl {
             dequeues.reset();
             forwards.reset();
             dispatched.reset();
+            duplicateFromStore.reset();
             inflight.reset();
             expired.reset();
             blockedSends.reset();
@@ -158,6 +166,7 @@ public class DestinationStatistics extends StatsImpl {
         enqueues.setEnabled(enabled);
         dispatched.setEnabled(enabled);
         dequeues.setEnabled(enabled);
+        duplicateFromStore.setEnabled(enabled);
         forwards.setEnabled(enabled);
         inflight.setEnabled(enabled);
         expired.setEnabled(true);
@@ -177,6 +186,7 @@ public class DestinationStatistics extends StatsImpl {
             enqueues.setParent(parent.enqueues);
             dispatched.setParent(parent.dispatched);
             dequeues.setParent(parent.dequeues);
+            duplicateFromStore.setParent(parent.duplicateFromStore);
             forwards.setParent(parent.forwards);
             inflight.setParent(parent.inflight);
             expired.setParent(parent.expired);
@@ -192,6 +202,7 @@ public class DestinationStatistics extends StatsImpl {
             enqueues.setParent(null);
             dispatched.setParent(null);
             dequeues.setParent(null);
+            duplicateFromStore.setParent(null);
             forwards.setParent(null);
             inflight.setParent(null);
             expired.setParent(null);
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index ab869b0..862d5dd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -51,6 +51,7 @@ public class PolicyEntry extends DestinationMapEntry {
     private DispatchPolicy dispatchPolicy;
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private boolean sendAdvisoryIfNoConsumers;
+    private boolean sendDuplicateFromStoreToDLQ = true;
     private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY;
     private PendingMessageLimitStrategy pendingMessageLimitStrategy;
     private MessageEvictionStrategy messageEvictionStrategy;
@@ -241,7 +242,6 @@ public class PolicyEntry extends DestinationMapEntry {
         if (isUpdate("maxBrowsePageSize", includedProperties)) {
             destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
         }
-
         if (isUpdate("minimumMessageSize", includedProperties)) {
             destination.setMinimumMessageSize((int) getMinimumMessageSize());
         }
@@ -296,6 +296,9 @@ public class PolicyEntry extends DestinationMapEntry {
         if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) {
             destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
         }
+        if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) {
+            destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ());
+        }
     }
 
     public void baseConfiguration(Broker broker, BaseDestination destination) {
@@ -456,6 +459,18 @@ public class PolicyEntry extends DestinationMapEntry {
         this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
     }
 
+    public boolean isSendDuplicateFromStoreToDLQ() {
+        return sendDuplicateFromStoreToDLQ;
+    }
+
+    /**
+     * Sends a copy of message to DLQ if a duplicate messages are paged-in from
+     * the messages store
+     */
+    public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) {
+        this.sendDuplicateFromStoreToDLQ = sendDuplicateFromStoreToDLQ;
+    }
+
     public DeadLetterStrategy getDeadLetterStrategy() {
         return deadLetterStrategy;
     }

[activemq] 02/03: [AMQ-8400] Add synchronization handling for Transaction to prevent CME (#720)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch activemq-5.16.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit f2dbc92743cfacf3b5397f0aae82a24dcfa4b7c1
Author: Matt Pavlovich <ma...@hyte.io>
AuthorDate: Mon Dec 20 08:37:56 2021 -0600

    [AMQ-8400] Add synchronization handling for Transaction to prevent CME (#720)
---
 .../apache/activemq/transaction/Transaction.java   | 58 +++++++++++++---------
 1 file changed, 34 insertions(+), 24 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
index 13ec353..0933480 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
@@ -52,15 +52,15 @@ public abstract class Transaction {
         public Object call() throws Exception {
             doPreCommit();
             return null;
-        }   
+        }
     });
     protected FutureTask<?> postCommitTask = new FutureTask<Object>(new Callable<Object>() {
         public Object call() throws Exception {
             doPostCommit();
             return null;
-        }   
+        }
     });
-    
+
     public byte getState() {
         return state;
     }
@@ -87,15 +87,19 @@ public abstract class Transaction {
     }
 
     public Synchronization findMatching(Synchronization r) {
-        int existing = synchronizations.indexOf(r);
-        if (existing != -1) {
-            return synchronizations.get(existing);
-        }
+    	synchronized(synchronizations) {
+            int existing = synchronizations.indexOf(r);
+            if (existing != -1) {
+                return synchronizations.get(existing);
+            }
+    	}
         return null;
     }
 
     public void removeSynchronization(Synchronization r) {
-        synchronizations.remove(r);
+        synchronized(synchronizations) {
+            synchronizations.remove(r);
+        }
     }
 
     public void prePrepare() throws Exception {
@@ -119,26 +123,32 @@ public abstract class Transaction {
             throw xae;
         }
     }
-    
+
     protected void fireBeforeCommit() throws Exception {
-        for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
-            Synchronization s = iter.next();
-            s.beforeCommit();
+        synchronized(synchronizations) {
+            for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+                Synchronization s = iter.next();
+                s.beforeCommit();
+            }
         }
     }
 
     protected void fireAfterCommit() throws Exception {
-        for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
-            Synchronization s = iter.next();
-            s.afterCommit();
+        synchronized(synchronizations) {
+            for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+                Synchronization s = iter.next();
+                s.afterCommit();
+            }
         }
     }
 
     public void fireAfterRollback() throws Exception {
-    	Collections.reverse(synchronizations);
-        for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
-            Synchronization s = iter.next();
-            s.afterRollback();
+    	synchronized(synchronizations) {
+            Collections.reverse(synchronizations);
+            for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+                Synchronization s = iter.next();
+               s.afterRollback();
+            }
         }
     }
 
@@ -156,15 +166,15 @@ public abstract class Transaction {
     public abstract TransactionId getTransactionId();
 
     public abstract Logger getLog();
-    
+
     public boolean isPrepared() {
         return getState() == PREPARED_STATE;
     }
-    
+
     public int size() {
         return synchronizations.size();
     }
-    
+
     protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException {
         try {
             postCommitTask.get();
@@ -179,9 +189,9 @@ public abstract class Transaction {
             } else {
                 throw new XAException(e.toString());
             }
-        }    
+        }
     }
-    
+
     protected void doPreCommit() throws XAException {
         try {
             fireBeforeCommit();

[activemq] 01/03: [AMQ-8413] Add remoteUserName and remotePassword config fields to network connector (#725)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch activemq-5.16.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit c58e531584955419d5f388ab2e398d4edcada3e0
Author: Matt Pavlovich <ma...@hyte.io>
AuthorDate: Mon Dec 20 08:36:56 2021 -0600

    [AMQ-8413] Add remoteUserName and remotePassword config fields to network connector (#725)
---
 .../activemq/broker/jmx/NetworkConnectorView.java  | 31 +++++++++++++++++-----
 .../broker/jmx/NetworkConnectorViewMBean.java      |  8 ++++++
 .../network/DemandForwardingBridgeSupport.java     | 10 +++++--
 .../network/NetworkBridgeConfiguration.java        | 30 +++++++++++++++++++++
 4 files changed, 71 insertions(+), 8 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
index b3d0762..18e00e6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
@@ -20,6 +20,7 @@ import org.apache.activemq.network.NetworkConnector;
 
 public class NetworkConnectorView implements NetworkConnectorViewMBean {
 
+    private static final String PASSWORD_MASK = "****";
     private final NetworkConnector connector;
 
     public NetworkConnectorView(NetworkConnector connector) {
@@ -158,12 +159,8 @@ public class NetworkConnectorView implements NetworkConnectorViewMBean {
 
     @Override
     public String getPassword() {
-        String pw = connector.getPassword();
-        // Hide the password for security reasons.
-        if (pw != null) {
-            pw = pw.replaceAll(".", "*");
-        }
-        return pw;
+        // Hide the password for security reasons
+        return PASSWORD_MASK;
     }
 
     @Override
@@ -180,4 +177,26 @@ public class NetworkConnectorView implements NetworkConnectorViewMBean {
     public void setSuppressDuplicateTopicSubscriptions(boolean val) {
         connector.setSuppressDuplicateTopicSubscriptions(val);
     }
+
+    @Override
+    public String getRemotePassword() {
+        // Hide the password for security reasons.
+        return PASSWORD_MASK;
+    }
+
+    @Override
+    public void setRemotePassword(String remotePassword) {
+        connector.setRemotePassword(remotePassword);
+    }
+
+    @Override
+    public String getRemoteUserName() {
+        return connector.getRemoteUserName();
+    }
+
+    @Override
+    public void setRemoteUserName(String remoteUserName) {
+        connector.setRemoteUserName(remoteUserName);
+    }
+
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
index 99974ce..3dac547 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
@@ -81,4 +81,12 @@ public interface NetworkConnectorViewMBean extends Service {
 
     void setSuppressDuplicateTopicSubscriptions(boolean val);
 
+    String getRemoteUserName();
+
+    void setRemoteUserName(String remoteUserName);
+
+    String getRemotePassword();
+
+    void setRemotePassword(String remotePassword);
+
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 2ee198b..578a05f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -608,8 +608,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 remoteConnectionInfo = new ConnectionInfo();
                 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
                 remoteConnectionInfo.setClientId(configuration.getName() + configuration.getClientIdToken() + configuration.getBrokerName() + configuration.getClientIdToken() + "outbound");
-                remoteConnectionInfo.setUserName(configuration.getUserName());
-                remoteConnectionInfo.setPassword(configuration.getPassword());
+                
+                if(configuration.getRemoteUserName() != null) {
+                    remoteConnectionInfo.setUserName(configuration.getRemoteUserName());
+                    remoteConnectionInfo.setPassword(configuration.getRemotePassword());
+                } else {
+                    remoteConnectionInfo.setUserName(configuration.getUserName());
+                    remoteConnectionInfo.setPassword(configuration.getPassword());
+                }
                 remoteBroker.oneway(remoteConnectionInfo);
 
                 SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index a564f57..018355c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -58,6 +58,8 @@ public class NetworkBridgeConfiguration {
     private String brokerURL = "";
     private String userName;
     private String password;
+    private String remoteUserName;
+    private String remotePassword;
     private String destinationFilter = null;
     private String name = "NC";
     private String clientIdToken = "_";
@@ -301,6 +303,34 @@ public class NetworkBridgeConfiguration {
     }
 
     /**
+     * @return the remoteUserName
+     */
+    public String getRemoteUserName() {
+        return this.remoteUserName;
+    }
+
+    /**
+     * @param remoteUserName the remoteUserName to set
+     */
+    public void setRemoteUserName(String remoteUserName) {
+        this.remoteUserName = remoteUserName;
+    }
+
+    /**
+     * @return the remotePassword
+     */
+    public String getRemotePassword() {
+        return this.remotePassword;
+    }
+
+    /**
+     * @param userName the userName to set
+     */
+    public void setRemotePassword(String remotePassword) {
+        this.remotePassword = remotePassword;
+    }
+
+    /**
      * @return the destinationFilter
      */
     public String getDestinationFilter() {