You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Henning Verbeek <ha...@gmail.com> on 2019/03/06 11:02:19 UTC

Multicast bridge, network of brokers: Failed to add Connection after temporary network outage

Hi there,

we are using a network of 8 brokers, connected via a multicast bridge.
ActiveMQ is running in LXC containers on hosts, which are randomly
distributed in multiple datacenters of a public hosting company. The
hosts are interconnected by a VPN (tinc). Due to the nature of this
reasonably distributed approach, temporary network failures are quite
possible. It appears that in those situations, some of the brokers
loose connection to others, and when they attempt to reconnect, the
network bridge fails:

Here is an example where broker02 is trying to make a connection to broker01:

2019-03-06 11:45:34,933 | INFO  | Establishing network connection from
vm://broker02.c02.skalio.net to tcp://broker01.c02.skalio.net:61616 |
org.apache.activemq.network.DiscoveryNetworkConnector |
Notifier-MulticastDiscoveryAgent-listener:DiscoveryNetworkConnector:amq-mcast-c02-teambeam:BrokerService[broker02.c02.skalio.net]
2019-03-06 11:45:34,947 | WARN  | Failed to add Connection
id=broker02.c02.skalio.net->broker01.c02.skalio.net-41703-1550607882497-1348899:1,
clientId=amq-mcast-c02-teambeam_broker01.c02.skalio.net_inbound_broker02.c02.skalio.net
due to {} | org.apache.activemq.broker.TransportConnection |
triggerStartAsyncNetworkBridgeCreation:
remoteBroker=tcp://broker01.c02.skalio.net/10.35.18.141:61616@37668,
localBroker= vm://broker02.c02.skalio.net#1327176
javax.jms.InvalidClientIDException: Broker: broker02.c02.skalio.net -
Client: amq-mcast-c02-teambeam_broker01.c02.skalio.net_inbound_broker02.c02.skalio.net
already connected from vm://broker02.c02.skalio.net#2
at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:247)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:119)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:843)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)[activemq-client-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:330)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:194)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)[activemq-client-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-client-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.vm.VMTransport.doDispatch(VMTransport.java:165)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:157)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:134)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)[activemq-client-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)[activemq-client-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)[activemq-client-5.15.4.jar:5.15.4]
at org.apache.activemq.network.DemandForwardingBridgeSupport.startLocalBridge(DemandForwardingBridgeSupport.java:508)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.network.DemandForwardingBridgeSupport.doStartLocalAndRemoteBridges(DemandForwardingBridgeSupport.java:460)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.network.DemandForwardingBridgeSupport.access$600(DemandForwardingBridgeSupport.java:113)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.network.DemandForwardingBridgeSupport$5.run(DemandForwardingBridgeSupport.java:370)[activemq-broker-5.15.4.jar:5.15.4]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[:1.8.0_181]
at java.lang.Thread.run(Thread.java:748)[:1.8.0_181]
2019-03-06 11:45:34,947 | INFO  | Network connection between
vm://broker02.c02.skalio.net#1327176 and
tcp://broker01.c02.skalio.net/10.35.18.141:61616@37668 shutdown due to
a local error: {} |
org.apache.activemq.network.DemandForwardingBridgeSupport |
triggerStartAsyncNetworkBridgeCreation:
remoteBroker=tcp://broker01.c02.skalio.net/10.35.18.141:61616@37668,
localBroker= vm://broker02.c02.skalio.net#1327176
javax.jms.InvalidClientIDException: Broker: broker02.c02.skalio.net -
Client: amq-mcast-c02-teambeam_broker01.c02.skalio.net_inbound_broker02.c02.skalio.net
already connected from vm://broker02.c02.skalio.net#2
at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:247)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:119)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:843)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)[activemq-client-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:330)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:194)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)[activemq-client-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-client-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.vm.VMTransport.doDispatch(VMTransport.java:165)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:157)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:134)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)[activemq-client-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)[activemq-client-5.15.4.jar:5.15.4]
at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)[activemq-client-5.15.4.jar:5.15.4]
at org.apache.activemq.network.DemandForwardingBridgeSupport.startLocalBridge(DemandForwardingBridgeSupport.java:508)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.network.DemandForwardingBridgeSupport.doStartLocalAndRemoteBridges(DemandForwardingBridgeSupport.java:460)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.network.DemandForwardingBridgeSupport.access$600(DemandForwardingBridgeSupport.java:113)[activemq-broker-5.15.4.jar:5.15.4]
at org.apache.activemq.network.DemandForwardingBridgeSupport$5.run(DemandForwardingBridgeSupport.java:370)[activemq-broker-5.15.4.jar:5.15.4]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[:1.8.0_181]
at java.lang.Thread.run(Thread.java:748)[:1.8.0_181]
2019-03-06 11:45:34,948 | INFO  | broker02.c02.skalio.net bridge to
broker01.c02.skalio.net stopped |
org.apache.activemq.network.DemandForwardingBridgeSupport | ActiveMQ
BrokerService[broker02.c02.skalio.net] Task-29293

These messages come in every few seoncds and can be seen on all
brokers. The only solution to stop this is to bring all-but-one
brokers down and then start them one after the other again.

At the same time, the functionality of the broker is _uninterrupted_!
Messages are accepted for delivery and pass through the network just
fine. We do see a number of message in the DLQ from the time of the
network outage, but these are non-persistent messages to a topic...

I'd like to ask for help regarding this situation.
Thanks a lot!
Cheers,
Hank



Environment:
Linux broker01 4.15.18-1-pve #1 SMP PVE 4.15.18-17 (Mon, 30 Jul 2018
12:53:35 +0200) x86_64 GNU/Linux
OpenJDK Runtime Environment (build 1.8.0_181-8u181-b13-2~deb9u1-b13)
ActiveMQ 5.15.4

activemq.xml:
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">

    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>


    <broker xmlns="http://activemq.apache.org/schema/core"
            brokerName="broker02.c02.skalio.net"
            dataDirectory="${activemq.data}">

        <networkConnectors>

          <networkConnector name="amq-mcast-c02-teambeam"
            dynamicOnly="true"
            messageTTL="100"

            prefetchSize="1"
            decreaseNetworkConsumerPriority="true"
            uri="multicast://239.16.18.254:6261" />

        </networkConnectors>


        <destinationPolicy>
            <policyMap>
              <policyEntries>

                <policyEntry queue=">" producerFlowControl="false"
memoryLimit="20mb">
                  <deadLetterStrategy>
                    <individualDeadLetterStrategy queuePrefix="DLQ."
useQueueForQueueMessages="true" />
                  </deadLetterStrategy>
                </policyEntry>

                <policyEntry topic=">" producerFlowControl="false"
memoryLimit="20mb">
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>

              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <managementContext>
            <managementContext createConnector="true"/>
        </managementContext>

        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>


        <systemUsage>
          <systemUsage sendFailIfNoSpaceAfterTimeout="3000">

            <memoryUsage>
              <memoryUsage limit="250 mb" />
            </memoryUsage>

            <storeUsage>
              <storeUsage limit="2 gb" />
            </storeUsage>

            <tempUsage>
              <tempUsage limit="2 gb" />
            </tempUsage>

          </systemUsage>
        </systemUsage>

        <transportConnectors>

          <!-- openwire -->
          <transportConnector name="openwire"
uri="tcp://0.0.0.0:61616"
discoveryUri="multicast://239.16.18.254:6261" />

          <!-- stomp -->
          <transportConnector name="stomp"
uri="stomp://0.0.0.0:61613?transport.closeAsync=false" />

        </transportConnectors>



        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans"
class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

    <import resource="jetty.xml" />

</beans>