You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by "Cannock, Richard" <Ri...@edfenergy.com> on 2019/10/16 10:45:01 UTC

compositeTopic and lost messages - broker restart

Hi,

We are trying to enlist in a network of brokers arrangement with a third party broker hosted externally to us. We wish to receive messages from a topic (called sourceTopic) and forward to internal queues on our local broker for load balancing and throughput purposes using virtual destinations, a compositeTopic forwarding to two local queues on our local broker.

In order to avoid implementing ingress firewall rules for the third party onto our estate, we have created a duplex network connector so that we initiate the outbound tcp connection. We have excluded our local broker queues from propagating to the other broker using the <excludedDestinations> configuration element.

Our configuration below works fine, as long as our local broker is up, and we have an active consumer. In this case, messages published to the third party broker sourceTopic are successfully received on our composite topic forwarded queues, test1 and test2.  To avoid any confusion, the source message producer publishing to sourceTopic is explicitly using PersistentDelivery (even though I believe that is the default anyway).

If we restart our broker, any messages that were published to the sourceTopic in the intervening period that our broker was down, are not received onto local forwarded queues test and test2.

It would appear that the network connector configuration and\or compositeTopic do not result in a durable topic subscription to the sourceTopic on the other broker.

I can also successfully recreate this behaviour with two local instances of Active MQ on my development machine running side by side with broker B duplex connected to broker A, so it doesn't appear to be an issue with the third party broker either.

Our configuration (sensitive data removed) is as below:

Any ideas on how we can resolve this?

Many thanks

Richard

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

   <!-- Allows accessing the server log -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" useVirtualDestSubs="true">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
                                                                <policyEntry queue="test" enableAudit="false">
                                                                                <networkBridgeFilterFactory>
                                                                                                <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
                                                                                </networkBridgeFilterFactory>
                                                                </policyEntry>
                                                                <policyEntry topic="sourceTopic" enableAudit="false">
                                                                                <networkBridgeFilterFactory>
                                                                                                <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
                                                                                </networkBridgeFilterFactory>
                                                                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>



                                <destinationInterceptors>
                                                <virtualDestinationInterceptor>
                                                  <virtualDestinations>
                                                                <compositeTopic forwardOnly="true" name=" sourceTopic">
                                                                  <forwardTo>
                                                                                <queue physicalName="test" />
                                                                                <queue physicalName="test2" />
                                                                  </forwardTo>
                                                                </compositeTopic>
                                                  </virtualDestinations>
                                                </virtualDestinationInterceptor>
                                </destinationInterceptors>

        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

           http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <!--
                                                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                                                -->
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

                                <networkConnectors>
                                                <networkConnector name="edf-to-thirdpartybroker-network-connector"
                                                useVirtualDestSubs="true"
                                                duplex="true"
                                                uri="static:(tcp://127.0.0.1:61616)" userName="omitted" password="omitted" >
                                                                <staticallyIncludedDestinations>
                                                                                <topic physicalName="sourceTopic"/>
                                                                </staticallyIncludedDestinations>
                                                                <excludedDestinations>
                                                                                <queue physicalName=">"/>
                                                                </excludedDestinations>
                                                </networkConnector>
                                </networkConnectors>

                                <plugins>
                                                <simpleAuthenticationPlugin>
                                                  <users>
                                                                <authenticationUser username="system" password="omitted" groups="users,admins"/>
                                                                <authenticationUser username="queueAUser" password="omitted" groups="queueAusers"/>
                                                                <authenticationUser username="queueBUser" password="omitted" groups="queueBusers"/>
                                                  </users>
                                                </simpleAuthenticationPlugin>

                                                <authorizationPlugin>
        <map>
          <authorizationMap>
            <authorizationEntries>
              <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
              <authorizationEntry queue="test" read="queueAusers" write="queueAusers" admin="queueAUsers" />
                                                  <authorizationEntry queue="test2" read="queueBusers" write="queueBusers" admin="queueBusers" />

              <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />


              <authorizationEntry topic="ActiveMQ.Advisory.>" read="queueAusers,queueBusers" write="admins,queueAusers,queueBusers" admin="admins,queueAusers,queueBusers" />
            </authorizationEntries>

            <!-- let's assign roles to temporary destinations. comment this entry if we don't want any roles assigned to temp destinations  -->

          </authorizationMap>
        </map>
      </authorizationPlugin>


                                </plugins>




                </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

Richard Cannock
Solution Architect
Business Change and IT
Customers

T: +44 (0) 2081862465
M: +44 (0)7729 320505

Barnwood
Barnett Way
Gloucester, Gloucestershire, GL4 3RS

[cid:4E18CA00-3CA9-4B94-970C-09C6DB028C24]

edfenergy.com<http://www.edfenergy.com/>

Please consider the environment before printing this email

This e-mail and any files transmitted with it are confidential and may be protected by legal privilege. If you are not the intended recipient, please notify the sender and delete the e-mail from your system.
This e-mail has been scanned for malicious content but the internet is inherently insecure and EDF Energy Limited can not accept any liability for the integrity of this message or its attachments.
No employee or agent of EDF Energy Limited or any related company is authorised to conclude any binding agreement on behalf of EDF Energy Limited or any related company by e-mail.

All e-mails sent and received by EDF Energy Limited are monitored to ensure compliance with the company's information security policy.  Executable and script files are not permitted through the EDF Energy Limited mail gateway.  EDF Energy does not accept or send mails above 30 Mb in size.

EDF Energy Limited
Registered in England and Wales No. 2366852
Registered Office: 90 Whitfield Street, London W1T 4EZ

Re: compositeTopic and lost messages - broker restart

Posted by Christopher Shannon <ch...@gmail.com>.
I put a lot of work into the network bridging in 5.x and it looks like I
didn't add documentation for some of it including the forceDurable flag and
also the ability to synchronize durable subscriptions on bridge reconnect
with a flag.  I did document the Virtual destinations demand feature which
seems relevant here. I will go back and update the network of brokers page
with the new flags/features.

For some tests that show the features see:

https://github.com/apache/activemq/blob/activemq-5.15.10/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
https://github.com/apache/activemq/blob/activemq-5.15.10/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
https://github.com/apache/activemq/blob/activemq-5.15.10/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java

On Mon, Nov 11, 2019 at 1:17 PM Christopher Shannon <
christopher.l.shannon@gmail.com> wrote:

> You can force durable subscriptions by setting a flag.  Looks like this
> may not have made it into the documentation.
>
>      <networkConnectors>
>
>          <networkConnector name="edf-to-thirdpartybroker-network-connector" useVirtualDestSubs="true"
> duplex="true"
>
>                                                 uri="static:(tcp://
> 127.0.0.1:61616)" userName="omitted" password="omitted" >
>
>            <staticallyIncludedDestinations>
>
>                <topic physicalName="include.test.bar?forceDurable=true"/>
>
>            </staticallyIncludedDestinations>
>
>
>         </networkConnector>
>
>     </networkConnectors>
>
> On Mon, Nov 11, 2019 at 8:33 AM Tim Bain <tb...@alumni.duke.edu> wrote:
>
>> OK, thanks for sharing what you did.
>>
>> Tim
>>
>> On Mon, Nov 11, 2019, 2:05 AM Cannock, Richard <
>> Richard.Cannock@edfenergy.com> wrote:
>>
>> > HI Tim
>> >
>> > Thanks for the advice. In the end, I got the network connector
>> 'promoted'
>> > to a durable topic subscriber by adding a temporary Topic to the forward
>> > declaration like this:
>> >
>> > <destinationInterceptors>
>> >         <virtualDestinationInterceptor>
>> >             <virtualDestinations>
>> >                 <compositeTopic forwardOnly="true" name="myTopic">
>> >                     <forwardTo>
>> >                         <queue physicalName="myqyeue"/>
>> >                         <topic physicalName="VirtualTopic.myTopic"/>
>> >                     </forwardTo>
>> >                 </compositeTopic>
>> >             </virtualDestinations>
>> >         </virtualDestinationInterceptor>
>> >     </destinationInterceptors>
>> >
>> > Before attaching a durable scubscriber to this topic. Once restarted the
>> > broker made the network connector durable.
>> >
>> > I then removed the   <topic physicalName="VirtualTopic.myTopic"/> and
>> > restarted the broker again. The network connector remained durable and
>> > messages remained to be sent to the queue even when the broker was
>> offline.
>> >
>> > This is clearly a workaround, so a bit nervous of it's reliability, so
>> > clearly an enhancement request to make this a duranble subscription
>> might
>> > be best.
>> >
>> > Thanks
>> >
>> > Richard
>> >
>> > -----Original Message-----
>> > From: Tim Bain <tb...@alumni.duke.edu>
>> > Sent: 08 November 2019 07:00
>> > To: ActiveMQ Users <us...@activemq.apache.org>
>> > Subject: Re: compositeTopic and lost messages - broker restart
>> >
>> > Durable topic subscriptions are dangerous things, because if the
>> consumer
>> > disappears without removing the subscription, the broker will keep the
>> > messages for it forever, eventually running the broker out of memory. A
>> > durable topic subscription between brokers (for implementing a composite
>> > destination) is doubly so, since the subscription isn't held by a
>> > traditional consumer who might give assurances of cleaning things up
>> before
>> > tearing itself down. So it's no surprise to me that a non-durable
>> > subscription is the default behavior, since that's the safer behavior.
>> >
>> > However, I think maybe your real question isn't why this isn't the
>> > default, but why there isn't an option to specify the use of a durable
>> > subscription if you so choose. Your use case seems like a valid one for
>> > that particular feature, so if you submit an enhancement request,
>> perhaps
>> > it could be added in a future version.
>> >
>> > In the meantime, could you make the composite topic in your broker use a
>> > different name than the remote topic (call it *myCompositeTopic*), and
>> then
>> > have an embedded Camel route that uses a durable subscription to consume
>> > from *myCompositeTopic* and publishes the message to *sourceTopic*? Then
>> > the network of brokers can ensure that there's a durable subscription on
>> > the remote broker, and you'll still get the composite destination on
>> your
>> > local broker. This has the additional benefit of not having
>> *sourceTopic*
>> > be a composite destination on your local broker while being a
>> non-composite
>> > destination on the remote broker, which is a pretty wonky configuration
>> > anyway.
>> >
>> > Tim
>> >
>> > On Fri, Oct 18, 2019 at 2:05 AM Cannock, Richard <
>> > Richard.Cannock@edfenergy.com> wrote:
>> >
>> > > HI,
>> > >
>> > > Further investigations indicate that the resulting network connector
>> > > establishes only a non durable topic subscription to the sourceTopic
>> > > via the CompositeTopic, which is why we do not get any messages queued
>> > > up at source and subsequently delivered received post the networked
>> > > broker restart.
>> > >
>> > > However, this does not explain why this is only a non durable topic
>> > > subscription, as I believe this should be a durable topic subscription
>> > > by default.
>> > >
>> > > I have tried this with various ActiveMQ versions upto 5.15.9 and the
>> > > behaviour is consistent.
>> > >
>> > > Any advice?
>> > >
>> > > Thanks
>> > >
>> > > Richard
>> > >
>> > > -----Original Message-----
>> > > From: Cannock, Richard <Ri...@edfenergy.com>
>> > > Sent: 16 October 2019 19:36
>> > > To: users@activemq.apache.org
>> > > Subject: RE: compositeTopic and lost messages - broker restart
>> > >
>> > > Apologies, mail server issue meant I wasn't sure that the original
>> > > message had gotten through. Obviously it had!
>> > >
>> > > These are the same issue so only looking for advice on one of them,
>> > > thanks.
>> > >
>> > > _______
>> > > From: Justin Bertram [jbertram@apache.org]
>> > > Sent: 16 October 2019 17:09
>> > > To: users@activemq.apache.org
>> > > Subject: Re: compositeTopic and lost messages - broker restart
>> > >
>> > > Richard, didn't you just send this same email to the ActiveMQ user
>> > > list (although with a slightly different subject)? Just want to
>> > > clarify if they are the same or different issues.
>> > >
>> > >
>> > > Justin
>> > >
>> > > On Wed, Oct 16, 2019 at 10:40 AM Cannock, Richard <
>> > > Richard.Cannock@edfenergy.com> wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > >
>> > > >
>> > > > We are trying to enlist in a network of brokers arrangement with a
>> > > > third party broker hosted externally to us. We wish to receive
>> > > > messages from a topic (called *sourceTopic*) and forward to internal
>> > > > queues on our local broker for load balancing and throughput
>> > > > purposes using virtual destinations, a compositeTopic forwarding to
>> > > > two local queues on our local broker.
>> > > >
>> > > >
>> > > >
>> > > > In order to avoid implementing ingress firewall rules for the third
>> > > > party onto our estate, we have created a duplex network connector so
>> > > > that we initiate the outbound tcp connection. We have excluded our
>> > > > local broker queues from propagating to the other broker using the
>> > > > <excludedDestinations> configuration element.
>> > > >
>> > > >
>> > > >
>> > > > Our configuration below works fine, as long as our local broker is
>> > > > up, and we have an active consumer. In this case, messages published
>> > > > to the third party broker *sourceTopic* are successfully received on
>> > > > our composite topic forwarded queues, test1 and test2.  To avoid any
>> > > > confusion, the source message producer publishing to *sourceTopic
>> > > > *is explicitly using PersistentDelivery (even though I believe that
>> > > > is the
>> > > default anyway).
>> > > >
>> > > >
>> > > >
>> > > > If we restart our broker, any messages that were published to the
>> > > > *sourceTopic* in the intervening period that our broker was down,
>> > > > are not received onto local forwarded queues test and test2.
>> > > >
>> > > >
>> > > >
>> > > > It would appear that the network connector configuration and\or
>> > > > compositeTopic do not result in a durable topic subscription to the
>> > > > *sourceTopic* on the other broker.
>> > > >
>> > > >
>> > > >
>> > > > I can also successfully recreate this behaviour with two local
>> > > > instances of Active MQ on my development machine running side by
>> > > > side with broker B duplex connected to broker A, so it doesn't
>> > > > appear to be an issue with the third party broker either.
>> > > >
>> > > >
>> > > >
>> > > > Our configuration (sensitive data removed) is as below:
>> > > >
>> > > >
>> > > >
>> > > > Any ideas on how we can resolve this?
>> > > >
>> > > >
>> > > >
>> > > > Many thanks
>> > > >
>> > > >
>> > > > Richard
>> > > >
>> > > >
>> > > >
>> > > > <beans
>> > > >
>> > > >   xmlns="http://www.springframework.org/schema/beans"
>> > > >
>> > > >   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>> > > >
>> > > >   xsi:schemaLocation="http://www.springframework.org/schema/beans
>> > > > http://www.springframework.org/schema/beans/spring-beans.xsd
>> > > >
>> > > >   http://activemq.apache.org/schema/core
>> > > > http://activemq.apache.org/schema/core/activemq-core.xsd">
>> > > >
>> > > >
>> > > >
>> > > >     <!-- Allows us to use system properties as variables in this
>> > > > configuration file -->
>> > > >
>> > > >     <bean
>> > > > class="org.springframework.beans.factory.config.PropertyPlaceholderC
>> > > > on
>> > > > figurer">
>> > > >
>> > > >         <property name="locations">
>> > > >
>> > > >
>> > > > <value>file:${activemq.conf}/credentials.properties</value>
>> > > >
>> > > >         </property>
>> > > >
>> > > >     </bean>
>> > > >
>> > > >
>> > > >
>> > > >    <!-- Allows accessing the server log -->
>> > > >
>> > > >     <bean id="logQuery"
>> > > class="io.fabric8.insight.log.log4j.Log4jLogQuery"
>> > > >
>> > > >           lazy-init="false" scope="singleton"
>> > > >
>> > > >           init-method="start" destroy-method="stop">
>> > > >
>> > > >     </bean>
>> > > >
>> > > >
>> > > >
>> > > >     <!--
>> > > >
>> > > >         The <broker> element is used to configure the ActiveMQ
>> broker.
>> > > >
>> > > >     -->
>> > > >
>> > > >     <broker xmlns="http://activemq.apache.org/schema/core"
>> > > > brokerName="localhost" dataDirectory="${activemq.data}"
>> > > > useVirtualDestSubs="true">
>> > > >
>> > > >
>> > > >
>> > > >         <destinationPolicy>
>> > > >
>> > > >             <policyMap>
>> > > >
>> > > >               <policyEntries>
>> > > >
>> > > >                 <policyEntry topic=">" >
>> > > >
>> > > >                     <!-- The constantPendingMessageLimitStrategy is
>> > > > used to prevent
>> > > >
>> > > >                          slow topic consumers to block producers and
>> > > > affect other consumers
>> > > >
>> > > >                          by limiting the number of messages that are
>> > > > retained
>> > > >
>> > > >                          For more information, see:
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > http://activemq.apache.org/slow-consumer-handling.html
>> > > >
>> > > >
>> > > >
>> > > >                     -->
>> > > >
>> > > >                   <pendingMessageLimitStrategy>
>> > > >
>> > > >                     <constantPendingMessageLimitStrategy
>> > > > limit="1000"/>
>> > > >
>> > > >                   </pendingMessageLimitStrategy>
>> > > >
>> > > >                 </policyEntry>
>> > > >
>> > > >
>> > > > <policyEntry queue="test" enableAudit="false">
>> > > >
>> > > >
>> > > > <networkBridgeFilterFactory>
>> > > >
>> > > >
>> > > > <conditionalNetworkBridgeFilterFactory
>> > > > replayWhenNoConsumers="true"/>
>> > > >
>> > > >
>> > > > </networkBridgeFilterFactory>
>> > > >
>> > > >
>> > > > </policyEntry>
>> > > >
>> > > >
>> > > > <policyEntry topic="*sourceTopic*" enableAudit="false">
>> > > >
>> > > >
>> > > > <networkBridgeFilterFactory>
>> > > >
>> > > >
>> > > > <conditionalNetworkBridgeFilterFactory
>> > > > replayWhenNoConsumers="true"/>
>> > > >
>> > > >
>> > > > </networkBridgeFilterFactory>
>> > > >
>> > > >
>> > > > </policyEntry>
>> > > >
>> > > >               </policyEntries>
>> > > >
>> > > >             </policyMap>
>> > > >
>> > > >         </destinationPolicy>
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >                                 <destinationInterceptors>
>> > > >
>> > > >
>> > > > <virtualDestinationInterceptor>
>> > > >
>> > > >
>> > > > <virtualDestinations>
>> > > >
>> > > >
>> > > > <compositeTopic forwardOnly="true" name="* sourceTopic*">
>> > > >
>> > > >
>> > > > <forwardTo>
>> > > >
>> > > >
>> > > > <queue physicalName="test" />
>> > > >
>> > > >
>> > > >
>> > > > <queue physicalName="test2" />
>> > > >
>> > > >
>> > > > </forwardTo>
>> > > >
>> > > >
>> > > > </compositeTopic>
>> > > >
>> > > >
>> > > > </virtualDestinations>
>> > > >
>> > > >
>> > > > </virtualDestinationInterceptor>
>> > > >
>> > > >                                 </destinationInterceptors>
>> > > >
>> > > >
>> > > >
>> > > >         <!--
>> > > >
>> > > >             The managementContext is used to configure how ActiveMQ
>> > > > is exposed in
>> > > >
>> > > >             JMX. By default, ActiveMQ uses the MBean server that is
>> > > > started by
>> > > >
>> > > >             the JVM. For more information, see:
>> > > >
>> > > >
>> > > >
>> > > >            http://activemq.apache.org/jmx.html
>> > > >
>> > > >         -->
>> > > >
>> > > >         <managementContext>
>> > > >
>> > > >             <managementContext createConnector="false"/>
>> > > >
>> > > >         </managementContext>
>> > > >
>> > > >
>> > > >
>> > > >         <!--
>> > > >
>> > > >             Configure message persistence for the broker. The
>> > > > default persistence
>> > > >
>> > > >             mechanism is the KahaDB store (identified by the kahaDB
>> > tag).
>> > > >
>> > > >             For more information, see:
>> > > >
>> > > >
>> > > >
>> > > >             http://activemq.apache.org/persistence.html
>> > > >
>> > > >         -->
>> > > >
>> > > >         <persistenceAdapter>
>> > > >
>> > > >             <kahaDB directory="${activemq.data}/kahadb"/>
>> > > >
>> > > >         </persistenceAdapter>
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >           <!--
>> > > >
>> > > >             The systemUsage controls the maximum amount of space the
>> > > > broker will
>> > > >
>> > > >             use before disabling caching and/or slowing down
>> producers.
>> > > > For more information, see:
>> > > >
>> > > >             http://activemq.apache.org/producer-flow-control.html
>> > > >
>> > > >           -->
>> > > >
>> > > >           <systemUsage>
>> > > >
>> > > >             <systemUsage>
>> > > >
>> > > >                 <memoryUsage>
>> > > >
>> > > >                     <memoryUsage percentOfJvmHeap="70" />
>> > > >
>> > > >                 </memoryUsage>
>> > > >
>> > > >                 <storeUsage>
>> > > >
>> > > >                     <storeUsage limit="100 gb"/>
>> > > >
>> > > >                 </storeUsage>
>> > > >
>> > > >                 <tempUsage>
>> > > >
>> > > >                     <tempUsage limit="50 gb"/>
>> > > >
>> > > >                 </tempUsage>
>> > > >
>> > > >             </systemUsage>
>> > > >
>> > > >         </systemUsage>
>> > > >
>> > > >
>> > > >
>> > > >         <!--
>> > > >
>> > > >             The transport connectors expose ActiveMQ over a given
>> > > > protocol to
>> > > >
>> > > >             clients and other brokers. For more information, see:
>> > > >
>> > > >
>> > > >
>> > > >             http://activemq.apache.org/configuring-transports.html
>> > > >
>> > > >         -->
>> > > >
>> > > >         <transportConnectors>
>> > > >
>> > > >             <!-- DOS protection, limit concurrent connections to
>> > > > 1000 and frame size to 100MB -->
>> > > >
>> > > >             <transportConnector name="openwire" uri="tcp://
>> > > >
>> 0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
>> > > > 48
>> > > > 57600
>> > > > "/>
>> > > >
>> > > >             <!--
>> > > >
>> > > >                                                 <transportConnector
>> > > > name="amqp" uri="amqp://
>> > > >
>> 0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104
>> > > > 85
>> > > > 7600
>> > > > "/>
>> > > >
>> > > >             <transportConnector name="stomp" uri="stomp://
>> > > >
>> 0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
>> > > > 48
>> > > > 57600
>> > > > "/>
>> > > >
>> > > >             <transportConnector name="mqtt" uri="mqtt://
>> > > >
>> 0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104
>> > > > 85
>> > > > 7600
>> > > > "/>
>> > > >
>> > > >             <transportConnector name="ws" uri="ws://
>> > > >
>> 0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
>> > > > 48
>> > > > 57600
>> > > > "/>
>> > > >
>> > > >                                                 -->
>> > > >
>> > > >         </transportConnectors>
>> > > >
>> > > >
>> > > >
>> > > >         <!-- destroy the spring context on shutdown to stop jetty
>> > > > -->
>> > > >
>> > > >         <shutdownHooks>
>> > > >
>> > > >             <bean xmlns="
>> http://www.springframework.org/schema/beans"
>> > > > class="org.apache.activemq.hooks.SpringContextHook" />
>> > > >
>> > > >         </shutdownHooks>
>> > > >
>> > > >
>> > > >
>> > > >                                 <networkConnectors>
>> > > >
>> > > >                                                 <networkConnector
>> > > > name="edf-to-thirdpartybroker-network-connector"
>> > > >
>> > > >
>> >  useVirtualDestSubs="true"
>> > > >
>> > > >                                                 duplex="true"
>> > > >
>> > > >                                                 uri="static:(tcp://
>> > > > 127.0.0.1:61616)" userName="omitted" password="omitted" >
>> > > >
>> > > >
>> > > > <staticallyIncludedDestinations>
>> > > >
>> > > >
>> > > > <topic physicalName="*sourceTopic*"/>
>> > > >
>> > > >
>> > > > </staticallyIncludedDestinations>
>> > > >
>> > > >
>> > > > <excludedDestinations>
>> > > >
>> > > >
>> > > > <queue physicalName=">"/>
>> > > >
>> > > >
>> > > > </excludedDestinations>
>> > > >
>> > > >
>> > > >                                                 </networkConnector>
>> > > >
>> > > >                                 </networkConnectors>
>> > > >
>> > > >
>> > > >
>> > > >                                 <plugins>
>> > > >
>> > > >
>> > > > <simpleAuthenticationPlugin>
>> > > >
>> > > >                                                   <users>
>> > > >
>> > > >
>> > > > <authenticationUser username="system" password="omitted"
>> > > > groups="users,admins"/>
>> > > >
>> > > >
>> > > > <authenticationUser username="queueAUser" password="omitted"
>> > > > groups="queueAusers"/>
>> > > >
>> > > >
>> > > > <authenticationUser username="queueBUser" password="omitted"
>> > > > groups="queueBusers"/>
>> > > >
>> > > >                                                   </users>
>> > > >
>> > > >
>> > > > </simpleAuthenticationPlugin>
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > <authorizationPlugin>
>> > > >
>> > > >         <map>
>> > > >
>> > > >           <authorizationMap>
>> > > >
>> > > >             <authorizationEntries>
>> > > >
>> > > >               <authorizationEntry queue=">" read="admins"
>> > write="admins"
>> > > > admin="admins" />
>> > > >
>> > > >               <authorizationEntry queue="test" read="queueAusers"
>> > > > write="queueAusers" admin="queueAUsers" />
>> > > >
>> > > >
>> > > > <authorizationEntry queue="test2" read="queueBusers"
>> > write="queueBusers"
>> > > > admin="queueBusers" />
>> > > >
>> > > >
>> > > >
>> > > >               <authorizationEntry topic=">" read="admins"
>> > write="admins"
>> > > > admin="admins" />
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >               <authorizationEntry topic="ActiveMQ.Advisory.>"
>> > > > read="queueAusers,queueBusers"
>> write="admins,queueAusers,queueBusers"
>> > > > admin="admins,queueAusers,queueBusers" />
>> > > >
>> > > >             </authorizationEntries>
>> > > >
>> > > >
>> > > >
>> > > >             <!-- let's assign roles to temporary destinations.
>> > > > comment this entry if we don't want any roles assigned to temp
>> > > > destinations
>> > > > -->
>> > > >
>> > > >
>> > > >
>> > > >           </authorizationMap>
>> > > >
>> > > >         </map>
>> > > >
>> > > >       </authorizationPlugin>
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >                                 </plugins>
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >                 </broker>
>> > > >
>> > > >
>> > > >
>> > > >     <!--
>> > > >
>> > > >         Enable web consoles, REST and Ajax APIs and demos
>> > > >
>> > > >         The web consoles requires by default login, you can disable
>> > > > this in the jetty.xml file
>> > > >
>> > > >
>> > > >
>> > > >         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more
>> > > > details
>> > > >
>> > > >     -->
>> > > >
>> > > >     <import resource="jetty.xml"/>
>> > > >
>> > > >
>> > > >
>> > > > </beans>
>> > > >
>> > > > <!-- END SNIPPET: example -->
>> > > >
>> > > >
>> > > >
>> > > > *Richard Cannock*
>> > > >
>> > > > *Solution Architect*
>> > > >
>> > > > Business Change and IT
>> > > >
>> > > > Customers
>> > > >
>> > > >
>> > > >
>> > > > *T:* +44 (0) 2081862465
>> > > >
>> > > > *M:* +44 (0)7729 320505
>> > > >
>> > > >
>> > > >
>> > > > Barnwood
>> > > >
>> > > > Barnett Way
>> > > >
>> > > > Gloucester, Gloucestershire, GL4 3RS
>> > > >
>> > > >
>> > > >
>> > > > [image: cid:4E18CA00-3CA9-4B94-970C-09C6DB028C24]
>> > > >
>> > > >
>> > > >
>> > > > edfenergy.com <http://www.edfenergy.com/>
>> > > >
>> > > >
>> > > >
>> > > > Please consider the environment before printing this email
>> > > >
>> > > >
>> > > >
>> > > > This e-mail and any files transmitted with it are confidential and
>> > > > may be protected by legal privilege. If you are not the intended
>> > > > recipient, please notify the sender and delete the e-mail from your
>> > > system.
>> > > > This e-mail has been scanned for malicious content but the internet
>> > > > is inherently insecure and EDF Energy Limited can not accept any
>> > > > liability for the integrity of this message or its attachments.
>> > > > No employee or agent of EDF Energy Limited or any related company is
>> > > > authorised to conclude any binding agreement on behalf of EDF Energy
>> > > > Limited or any related company by e-mail.
>> > > >
>> > > > All e-mails sent and received by EDF Energy Limited are monitored to
>> > > > ensure compliance with the company's information security policy.
>> > > > Executable and script files are not permitted through the EDF Energy
>> > > > Limited mail gateway.  EDF Energy does not accept or send mails
>> > > > above
>> > > > 30 Mb in size.
>> > > >
>> > > > EDF Energy Limited
>> > > > Registered in England and Wales No. 2366852 Registered Office: 90
>> > > > Whitfield Street, London W1T 4EZ
>> > > >
>> > >
>> > >
>> > > This e-mail and any files transmitted with it are confidential and may
>> > > be protected by legal privilege. If you are not the intended
>> > > recipient, please notify the sender and delete the e-mail from your
>> > system.
>> > > This e-mail has been scanned for malicious content but the internet is
>> > > inherently insecure and EDF Energy Limited cannot accept any liability
>> > > for the integrity of this message or its attachments. No employee or
>> > > agent of EDF Energy Limited or any related company is authorised to
>> > > conclude any binding agreement on behalf of EDF Energy Limited or any
>> > > related company by e-mail.
>> > > All e-mails sent and received by EDF Energy Limited are monitored to
>> > > ensure compliance with the company's information security policy.
>> > > Executable and script files are not permitted through the EDF Energy
>> > > Limited mail gateway. EDF Energy does not accept or send mails above
>> > > 30 Mb in size.
>> > >
>> > > EDF Energy Limited Registered in England and Wales No. 2366852
>> > > Registered
>> > > Office: 90 Whitfield Street, London W1T 4EZ
>> > >
>> >
>> >
>> > This e-mail and any files transmitted with it are confidential and may
>> be
>> > protected by legal privilege. If you are not the intended recipient,
>> please
>> > notify the sender and delete the e-mail from your system.
>> > This e-mail has been scanned for malicious content but the internet is
>> > inherently insecure and EDF Energy Limited cannot accept any liability
>> for
>> > the integrity of this message or its attachments. No employee or agent
>> of
>> > EDF Energy Limited or any related company is authorised to conclude any
>> > binding agreement on behalf of EDF Energy Limited or any related
>> company by
>> > e-mail.
>> > All e-mails sent and received by EDF Energy Limited are monitored to
>> > ensure compliance with the company's information security policy.
>> > Executable and script files are not permitted through the EDF Energy
>> > Limited mail gateway. EDF Energy does not accept or send mails above 30
>> Mb
>> > in size.
>> >
>> > EDF Energy Limited Registered in England and Wales No. 2366852
>> Registered
>> > Office: 90 Whitfield Street, London W1T 4EZ
>> >
>>
>

Re: compositeTopic and lost messages - broker restart

Posted by Christopher Shannon <ch...@gmail.com>.
You can force durable subscriptions by setting a flag.  Looks like this may
not have made it into the documentation.

     <networkConnectors>

         <networkConnector
name="edf-to-thirdpartybroker-network-connector"
useVirtualDestSubs="true"
duplex="true"

                                                uri="static:(tcp://
127.0.0.1:61616)" userName="omitted" password="omitted" >

           <staticallyIncludedDestinations>

               <topic physicalName="include.test.bar?forceDurable=true"/>

           </staticallyIncludedDestinations>


        </networkConnector>

    </networkConnectors>

On Mon, Nov 11, 2019 at 8:33 AM Tim Bain <tb...@alumni.duke.edu> wrote:

> OK, thanks for sharing what you did.
>
> Tim
>
> On Mon, Nov 11, 2019, 2:05 AM Cannock, Richard <
> Richard.Cannock@edfenergy.com> wrote:
>
> > HI Tim
> >
> > Thanks for the advice. In the end, I got the network connector 'promoted'
> > to a durable topic subscriber by adding a temporary Topic to the forward
> > declaration like this:
> >
> > <destinationInterceptors>
> >         <virtualDestinationInterceptor>
> >             <virtualDestinations>
> >                 <compositeTopic forwardOnly="true" name="myTopic">
> >                     <forwardTo>
> >                         <queue physicalName="myqyeue"/>
> >                         <topic physicalName="VirtualTopic.myTopic"/>
> >                     </forwardTo>
> >                 </compositeTopic>
> >             </virtualDestinations>
> >         </virtualDestinationInterceptor>
> >     </destinationInterceptors>
> >
> > Before attaching a durable scubscriber to this topic. Once restarted the
> > broker made the network connector durable.
> >
> > I then removed the   <topic physicalName="VirtualTopic.myTopic"/> and
> > restarted the broker again. The network connector remained durable and
> > messages remained to be sent to the queue even when the broker was
> offline.
> >
> > This is clearly a workaround, so a bit nervous of it's reliability, so
> > clearly an enhancement request to make this a duranble subscription might
> > be best.
> >
> > Thanks
> >
> > Richard
> >
> > -----Original Message-----
> > From: Tim Bain <tb...@alumni.duke.edu>
> > Sent: 08 November 2019 07:00
> > To: ActiveMQ Users <us...@activemq.apache.org>
> > Subject: Re: compositeTopic and lost messages - broker restart
> >
> > Durable topic subscriptions are dangerous things, because if the consumer
> > disappears without removing the subscription, the broker will keep the
> > messages for it forever, eventually running the broker out of memory. A
> > durable topic subscription between brokers (for implementing a composite
> > destination) is doubly so, since the subscription isn't held by a
> > traditional consumer who might give assurances of cleaning things up
> before
> > tearing itself down. So it's no surprise to me that a non-durable
> > subscription is the default behavior, since that's the safer behavior.
> >
> > However, I think maybe your real question isn't why this isn't the
> > default, but why there isn't an option to specify the use of a durable
> > subscription if you so choose. Your use case seems like a valid one for
> > that particular feature, so if you submit an enhancement request, perhaps
> > it could be added in a future version.
> >
> > In the meantime, could you make the composite topic in your broker use a
> > different name than the remote topic (call it *myCompositeTopic*), and
> then
> > have an embedded Camel route that uses a durable subscription to consume
> > from *myCompositeTopic* and publishes the message to *sourceTopic*? Then
> > the network of brokers can ensure that there's a durable subscription on
> > the remote broker, and you'll still get the composite destination on your
> > local broker. This has the additional benefit of not having *sourceTopic*
> > be a composite destination on your local broker while being a
> non-composite
> > destination on the remote broker, which is a pretty wonky configuration
> > anyway.
> >
> > Tim
> >
> > On Fri, Oct 18, 2019 at 2:05 AM Cannock, Richard <
> > Richard.Cannock@edfenergy.com> wrote:
> >
> > > HI,
> > >
> > > Further investigations indicate that the resulting network connector
> > > establishes only a non durable topic subscription to the sourceTopic
> > > via the CompositeTopic, which is why we do not get any messages queued
> > > up at source and subsequently delivered received post the networked
> > > broker restart.
> > >
> > > However, this does not explain why this is only a non durable topic
> > > subscription, as I believe this should be a durable topic subscription
> > > by default.
> > >
> > > I have tried this with various ActiveMQ versions upto 5.15.9 and the
> > > behaviour is consistent.
> > >
> > > Any advice?
> > >
> > > Thanks
> > >
> > > Richard
> > >
> > > -----Original Message-----
> > > From: Cannock, Richard <Ri...@edfenergy.com>
> > > Sent: 16 October 2019 19:36
> > > To: users@activemq.apache.org
> > > Subject: RE: compositeTopic and lost messages - broker restart
> > >
> > > Apologies, mail server issue meant I wasn't sure that the original
> > > message had gotten through. Obviously it had!
> > >
> > > These are the same issue so only looking for advice on one of them,
> > > thanks.
> > >
> > > _______
> > > From: Justin Bertram [jbertram@apache.org]
> > > Sent: 16 October 2019 17:09
> > > To: users@activemq.apache.org
> > > Subject: Re: compositeTopic and lost messages - broker restart
> > >
> > > Richard, didn't you just send this same email to the ActiveMQ user
> > > list (although with a slightly different subject)? Just want to
> > > clarify if they are the same or different issues.
> > >
> > >
> > > Justin
> > >
> > > On Wed, Oct 16, 2019 at 10:40 AM Cannock, Richard <
> > > Richard.Cannock@edfenergy.com> wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > >
> > > > We are trying to enlist in a network of brokers arrangement with a
> > > > third party broker hosted externally to us. We wish to receive
> > > > messages from a topic (called *sourceTopic*) and forward to internal
> > > > queues on our local broker for load balancing and throughput
> > > > purposes using virtual destinations, a compositeTopic forwarding to
> > > > two local queues on our local broker.
> > > >
> > > >
> > > >
> > > > In order to avoid implementing ingress firewall rules for the third
> > > > party onto our estate, we have created a duplex network connector so
> > > > that we initiate the outbound tcp connection. We have excluded our
> > > > local broker queues from propagating to the other broker using the
> > > > <excludedDestinations> configuration element.
> > > >
> > > >
> > > >
> > > > Our configuration below works fine, as long as our local broker is
> > > > up, and we have an active consumer. In this case, messages published
> > > > to the third party broker *sourceTopic* are successfully received on
> > > > our composite topic forwarded queues, test1 and test2.  To avoid any
> > > > confusion, the source message producer publishing to *sourceTopic
> > > > *is explicitly using PersistentDelivery (even though I believe that
> > > > is the
> > > default anyway).
> > > >
> > > >
> > > >
> > > > If we restart our broker, any messages that were published to the
> > > > *sourceTopic* in the intervening period that our broker was down,
> > > > are not received onto local forwarded queues test and test2.
> > > >
> > > >
> > > >
> > > > It would appear that the network connector configuration and\or
> > > > compositeTopic do not result in a durable topic subscription to the
> > > > *sourceTopic* on the other broker.
> > > >
> > > >
> > > >
> > > > I can also successfully recreate this behaviour with two local
> > > > instances of Active MQ on my development machine running side by
> > > > side with broker B duplex connected to broker A, so it doesn't
> > > > appear to be an issue with the third party broker either.
> > > >
> > > >
> > > >
> > > > Our configuration (sensitive data removed) is as below:
> > > >
> > > >
> > > >
> > > > Any ideas on how we can resolve this?
> > > >
> > > >
> > > >
> > > > Many thanks
> > > >
> > > >
> > > > Richard
> > > >
> > > >
> > > >
> > > > <beans
> > > >
> > > >   xmlns="http://www.springframework.org/schema/beans"
> > > >
> > > >   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> > > >
> > > >   xsi:schemaLocation="http://www.springframework.org/schema/beans
> > > > http://www.springframework.org/schema/beans/spring-beans.xsd
> > > >
> > > >   http://activemq.apache.org/schema/core
> > > > http://activemq.apache.org/schema/core/activemq-core.xsd">
> > > >
> > > >
> > > >
> > > >     <!-- Allows us to use system properties as variables in this
> > > > configuration file -->
> > > >
> > > >     <bean
> > > > class="org.springframework.beans.factory.config.PropertyPlaceholderC
> > > > on
> > > > figurer">
> > > >
> > > >         <property name="locations">
> > > >
> > > >
> > > > <value>file:${activemq.conf}/credentials.properties</value>
> > > >
> > > >         </property>
> > > >
> > > >     </bean>
> > > >
> > > >
> > > >
> > > >    <!-- Allows accessing the server log -->
> > > >
> > > >     <bean id="logQuery"
> > > class="io.fabric8.insight.log.log4j.Log4jLogQuery"
> > > >
> > > >           lazy-init="false" scope="singleton"
> > > >
> > > >           init-method="start" destroy-method="stop">
> > > >
> > > >     </bean>
> > > >
> > > >
> > > >
> > > >     <!--
> > > >
> > > >         The <broker> element is used to configure the ActiveMQ
> broker.
> > > >
> > > >     -->
> > > >
> > > >     <broker xmlns="http://activemq.apache.org/schema/core"
> > > > brokerName="localhost" dataDirectory="${activemq.data}"
> > > > useVirtualDestSubs="true">
> > > >
> > > >
> > > >
> > > >         <destinationPolicy>
> > > >
> > > >             <policyMap>
> > > >
> > > >               <policyEntries>
> > > >
> > > >                 <policyEntry topic=">" >
> > > >
> > > >                     <!-- The constantPendingMessageLimitStrategy is
> > > > used to prevent
> > > >
> > > >                          slow topic consumers to block producers and
> > > > affect other consumers
> > > >
> > > >                          by limiting the number of messages that are
> > > > retained
> > > >
> > > >                          For more information, see:
> > > >
> > > >
> > > >
> > > >
> > > > http://activemq.apache.org/slow-consumer-handling.html
> > > >
> > > >
> > > >
> > > >                     -->
> > > >
> > > >                   <pendingMessageLimitStrategy>
> > > >
> > > >                     <constantPendingMessageLimitStrategy
> > > > limit="1000"/>
> > > >
> > > >                   </pendingMessageLimitStrategy>
> > > >
> > > >                 </policyEntry>
> > > >
> > > >
> > > > <policyEntry queue="test" enableAudit="false">
> > > >
> > > >
> > > > <networkBridgeFilterFactory>
> > > >
> > > >
> > > > <conditionalNetworkBridgeFilterFactory
> > > > replayWhenNoConsumers="true"/>
> > > >
> > > >
> > > > </networkBridgeFilterFactory>
> > > >
> > > >
> > > > </policyEntry>
> > > >
> > > >
> > > > <policyEntry topic="*sourceTopic*" enableAudit="false">
> > > >
> > > >
> > > > <networkBridgeFilterFactory>
> > > >
> > > >
> > > > <conditionalNetworkBridgeFilterFactory
> > > > replayWhenNoConsumers="true"/>
> > > >
> > > >
> > > > </networkBridgeFilterFactory>
> > > >
> > > >
> > > > </policyEntry>
> > > >
> > > >               </policyEntries>
> > > >
> > > >             </policyMap>
> > > >
> > > >         </destinationPolicy>
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >                                 <destinationInterceptors>
> > > >
> > > >
> > > > <virtualDestinationInterceptor>
> > > >
> > > >
> > > > <virtualDestinations>
> > > >
> > > >
> > > > <compositeTopic forwardOnly="true" name="* sourceTopic*">
> > > >
> > > >
> > > > <forwardTo>
> > > >
> > > >
> > > > <queue physicalName="test" />
> > > >
> > > >
> > > >
> > > > <queue physicalName="test2" />
> > > >
> > > >
> > > > </forwardTo>
> > > >
> > > >
> > > > </compositeTopic>
> > > >
> > > >
> > > > </virtualDestinations>
> > > >
> > > >
> > > > </virtualDestinationInterceptor>
> > > >
> > > >                                 </destinationInterceptors>
> > > >
> > > >
> > > >
> > > >         <!--
> > > >
> > > >             The managementContext is used to configure how ActiveMQ
> > > > is exposed in
> > > >
> > > >             JMX. By default, ActiveMQ uses the MBean server that is
> > > > started by
> > > >
> > > >             the JVM. For more information, see:
> > > >
> > > >
> > > >
> > > >            http://activemq.apache.org/jmx.html
> > > >
> > > >         -->
> > > >
> > > >         <managementContext>
> > > >
> > > >             <managementContext createConnector="false"/>
> > > >
> > > >         </managementContext>
> > > >
> > > >
> > > >
> > > >         <!--
> > > >
> > > >             Configure message persistence for the broker. The
> > > > default persistence
> > > >
> > > >             mechanism is the KahaDB store (identified by the kahaDB
> > tag).
> > > >
> > > >             For more information, see:
> > > >
> > > >
> > > >
> > > >             http://activemq.apache.org/persistence.html
> > > >
> > > >         -->
> > > >
> > > >         <persistenceAdapter>
> > > >
> > > >             <kahaDB directory="${activemq.data}/kahadb"/>
> > > >
> > > >         </persistenceAdapter>
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >           <!--
> > > >
> > > >             The systemUsage controls the maximum amount of space the
> > > > broker will
> > > >
> > > >             use before disabling caching and/or slowing down
> producers.
> > > > For more information, see:
> > > >
> > > >             http://activemq.apache.org/producer-flow-control.html
> > > >
> > > >           -->
> > > >
> > > >           <systemUsage>
> > > >
> > > >             <systemUsage>
> > > >
> > > >                 <memoryUsage>
> > > >
> > > >                     <memoryUsage percentOfJvmHeap="70" />
> > > >
> > > >                 </memoryUsage>
> > > >
> > > >                 <storeUsage>
> > > >
> > > >                     <storeUsage limit="100 gb"/>
> > > >
> > > >                 </storeUsage>
> > > >
> > > >                 <tempUsage>
> > > >
> > > >                     <tempUsage limit="50 gb"/>
> > > >
> > > >                 </tempUsage>
> > > >
> > > >             </systemUsage>
> > > >
> > > >         </systemUsage>
> > > >
> > > >
> > > >
> > > >         <!--
> > > >
> > > >             The transport connectors expose ActiveMQ over a given
> > > > protocol to
> > > >
> > > >             clients and other brokers. For more information, see:
> > > >
> > > >
> > > >
> > > >             http://activemq.apache.org/configuring-transports.html
> > > >
> > > >         -->
> > > >
> > > >         <transportConnectors>
> > > >
> > > >             <!-- DOS protection, limit concurrent connections to
> > > > 1000 and frame size to 100MB -->
> > > >
> > > >             <transportConnector name="openwire" uri="tcp://
> > > > 0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
> > > > 48
> > > > 57600
> > > > "/>
> > > >
> > > >             <!--
> > > >
> > > >                                                 <transportConnector
> > > > name="amqp" uri="amqp://
> > > > 0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104
> > > > 85
> > > > 7600
> > > > "/>
> > > >
> > > >             <transportConnector name="stomp" uri="stomp://
> > > > 0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
> > > > 48
> > > > 57600
> > > > "/>
> > > >
> > > >             <transportConnector name="mqtt" uri="mqtt://
> > > > 0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104
> > > > 85
> > > > 7600
> > > > "/>
> > > >
> > > >             <transportConnector name="ws" uri="ws://
> > > > 0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
> > > > 48
> > > > 57600
> > > > "/>
> > > >
> > > >                                                 -->
> > > >
> > > >         </transportConnectors>
> > > >
> > > >
> > > >
> > > >         <!-- destroy the spring context on shutdown to stop jetty
> > > > -->
> > > >
> > > >         <shutdownHooks>
> > > >
> > > >             <bean xmlns="http://www.springframework.org/schema/beans
> "
> > > > class="org.apache.activemq.hooks.SpringContextHook" />
> > > >
> > > >         </shutdownHooks>
> > > >
> > > >
> > > >
> > > >                                 <networkConnectors>
> > > >
> > > >                                                 <networkConnector
> > > > name="edf-to-thirdpartybroker-network-connector"
> > > >
> > > >
> >  useVirtualDestSubs="true"
> > > >
> > > >                                                 duplex="true"
> > > >
> > > >                                                 uri="static:(tcp://
> > > > 127.0.0.1:61616)" userName="omitted" password="omitted" >
> > > >
> > > >
> > > > <staticallyIncludedDestinations>
> > > >
> > > >
> > > > <topic physicalName="*sourceTopic*"/>
> > > >
> > > >
> > > > </staticallyIncludedDestinations>
> > > >
> > > >
> > > > <excludedDestinations>
> > > >
> > > >
> > > > <queue physicalName=">"/>
> > > >
> > > >
> > > > </excludedDestinations>
> > > >
> > > >
> > > >                                                 </networkConnector>
> > > >
> > > >                                 </networkConnectors>
> > > >
> > > >
> > > >
> > > >                                 <plugins>
> > > >
> > > >
> > > > <simpleAuthenticationPlugin>
> > > >
> > > >                                                   <users>
> > > >
> > > >
> > > > <authenticationUser username="system" password="omitted"
> > > > groups="users,admins"/>
> > > >
> > > >
> > > > <authenticationUser username="queueAUser" password="omitted"
> > > > groups="queueAusers"/>
> > > >
> > > >
> > > > <authenticationUser username="queueBUser" password="omitted"
> > > > groups="queueBusers"/>
> > > >
> > > >                                                   </users>
> > > >
> > > >
> > > > </simpleAuthenticationPlugin>
> > > >
> > > >
> > > >
> > > >
> > > > <authorizationPlugin>
> > > >
> > > >         <map>
> > > >
> > > >           <authorizationMap>
> > > >
> > > >             <authorizationEntries>
> > > >
> > > >               <authorizationEntry queue=">" read="admins"
> > write="admins"
> > > > admin="admins" />
> > > >
> > > >               <authorizationEntry queue="test" read="queueAusers"
> > > > write="queueAusers" admin="queueAUsers" />
> > > >
> > > >
> > > > <authorizationEntry queue="test2" read="queueBusers"
> > write="queueBusers"
> > > > admin="queueBusers" />
> > > >
> > > >
> > > >
> > > >               <authorizationEntry topic=">" read="admins"
> > write="admins"
> > > > admin="admins" />
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >               <authorizationEntry topic="ActiveMQ.Advisory.>"
> > > > read="queueAusers,queueBusers" write="admins,queueAusers,queueBusers"
> > > > admin="admins,queueAusers,queueBusers" />
> > > >
> > > >             </authorizationEntries>
> > > >
> > > >
> > > >
> > > >             <!-- let's assign roles to temporary destinations.
> > > > comment this entry if we don't want any roles assigned to temp
> > > > destinations
> > > > -->
> > > >
> > > >
> > > >
> > > >           </authorizationMap>
> > > >
> > > >         </map>
> > > >
> > > >       </authorizationPlugin>
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >                                 </plugins>
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >                 </broker>
> > > >
> > > >
> > > >
> > > >     <!--
> > > >
> > > >         Enable web consoles, REST and Ajax APIs and demos
> > > >
> > > >         The web consoles requires by default login, you can disable
> > > > this in the jetty.xml file
> > > >
> > > >
> > > >
> > > >         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more
> > > > details
> > > >
> > > >     -->
> > > >
> > > >     <import resource="jetty.xml"/>
> > > >
> > > >
> > > >
> > > > </beans>
> > > >
> > > > <!-- END SNIPPET: example -->
> > > >
> > > >
> > > >
> > > > *Richard Cannock*
> > > >
> > > > *Solution Architect*
> > > >
> > > > Business Change and IT
> > > >
> > > > Customers
> > > >
> > > >
> > > >
> > > > *T:* +44 (0) 2081862465
> > > >
> > > > *M:* +44 (0)7729 320505
> > > >
> > > >
> > > >
> > > > Barnwood
> > > >
> > > > Barnett Way
> > > >
> > > > Gloucester, Gloucestershire, GL4 3RS
> > > >
> > > >
> > > >
> > > > [image: cid:4E18CA00-3CA9-4B94-970C-09C6DB028C24]
> > > >
> > > >
> > > >
> > > > edfenergy.com <http://www.edfenergy.com/>
> > > >
> > > >
> > > >
> > > > Please consider the environment before printing this email
> > > >
> > > >
> > > >
> > > > This e-mail and any files transmitted with it are confidential and
> > > > may be protected by legal privilege. If you are not the intended
> > > > recipient, please notify the sender and delete the e-mail from your
> > > system.
> > > > This e-mail has been scanned for malicious content but the internet
> > > > is inherently insecure and EDF Energy Limited can not accept any
> > > > liability for the integrity of this message or its attachments.
> > > > No employee or agent of EDF Energy Limited or any related company is
> > > > authorised to conclude any binding agreement on behalf of EDF Energy
> > > > Limited or any related company by e-mail.
> > > >
> > > > All e-mails sent and received by EDF Energy Limited are monitored to
> > > > ensure compliance with the company's information security policy.
> > > > Executable and script files are not permitted through the EDF Energy
> > > > Limited mail gateway.  EDF Energy does not accept or send mails
> > > > above
> > > > 30 Mb in size.
> > > >
> > > > EDF Energy Limited
> > > > Registered in England and Wales No. 2366852 Registered Office: 90
> > > > Whitfield Street, London W1T 4EZ
> > > >
> > >
> > >
> > > This e-mail and any files transmitted with it are confidential and may
> > > be protected by legal privilege. If you are not the intended
> > > recipient, please notify the sender and delete the e-mail from your
> > system.
> > > This e-mail has been scanned for malicious content but the internet is
> > > inherently insecure and EDF Energy Limited cannot accept any liability
> > > for the integrity of this message or its attachments. No employee or
> > > agent of EDF Energy Limited or any related company is authorised to
> > > conclude any binding agreement on behalf of EDF Energy Limited or any
> > > related company by e-mail.
> > > All e-mails sent and received by EDF Energy Limited are monitored to
> > > ensure compliance with the company's information security policy.
> > > Executable and script files are not permitted through the EDF Energy
> > > Limited mail gateway. EDF Energy does not accept or send mails above
> > > 30 Mb in size.
> > >
> > > EDF Energy Limited Registered in England and Wales No. 2366852
> > > Registered
> > > Office: 90 Whitfield Street, London W1T 4EZ
> > >
> >
> >
> > This e-mail and any files transmitted with it are confidential and may be
> > protected by legal privilege. If you are not the intended recipient,
> please
> > notify the sender and delete the e-mail from your system.
> > This e-mail has been scanned for malicious content but the internet is
> > inherently insecure and EDF Energy Limited cannot accept any liability
> for
> > the integrity of this message or its attachments. No employee or agent of
> > EDF Energy Limited or any related company is authorised to conclude any
> > binding agreement on behalf of EDF Energy Limited or any related company
> by
> > e-mail.
> > All e-mails sent and received by EDF Energy Limited are monitored to
> > ensure compliance with the company's information security policy.
> > Executable and script files are not permitted through the EDF Energy
> > Limited mail gateway. EDF Energy does not accept or send mails above 30
> Mb
> > in size.
> >
> > EDF Energy Limited Registered in England and Wales No. 2366852 Registered
> > Office: 90 Whitfield Street, London W1T 4EZ
> >
>

Re: compositeTopic and lost messages - broker restart

Posted by Tim Bain <tb...@alumni.duke.edu>.
OK, thanks for sharing what you did.

Tim

On Mon, Nov 11, 2019, 2:05 AM Cannock, Richard <
Richard.Cannock@edfenergy.com> wrote:

> HI Tim
>
> Thanks for the advice. In the end, I got the network connector 'promoted'
> to a durable topic subscriber by adding a temporary Topic to the forward
> declaration like this:
>
> <destinationInterceptors>
>         <virtualDestinationInterceptor>
>             <virtualDestinations>
>                 <compositeTopic forwardOnly="true" name="myTopic">
>                     <forwardTo>
>                         <queue physicalName="myqyeue"/>
>                         <topic physicalName="VirtualTopic.myTopic"/>
>                     </forwardTo>
>                 </compositeTopic>
>             </virtualDestinations>
>         </virtualDestinationInterceptor>
>     </destinationInterceptors>
>
> Before attaching a durable scubscriber to this topic. Once restarted the
> broker made the network connector durable.
>
> I then removed the   <topic physicalName="VirtualTopic.myTopic"/> and
> restarted the broker again. The network connector remained durable and
> messages remained to be sent to the queue even when the broker was offline.
>
> This is clearly a workaround, so a bit nervous of it's reliability, so
> clearly an enhancement request to make this a duranble subscription might
> be best.
>
> Thanks
>
> Richard
>
> -----Original Message-----
> From: Tim Bain <tb...@alumni.duke.edu>
> Sent: 08 November 2019 07:00
> To: ActiveMQ Users <us...@activemq.apache.org>
> Subject: Re: compositeTopic and lost messages - broker restart
>
> Durable topic subscriptions are dangerous things, because if the consumer
> disappears without removing the subscription, the broker will keep the
> messages for it forever, eventually running the broker out of memory. A
> durable topic subscription between brokers (for implementing a composite
> destination) is doubly so, since the subscription isn't held by a
> traditional consumer who might give assurances of cleaning things up before
> tearing itself down. So it's no surprise to me that a non-durable
> subscription is the default behavior, since that's the safer behavior.
>
> However, I think maybe your real question isn't why this isn't the
> default, but why there isn't an option to specify the use of a durable
> subscription if you so choose. Your use case seems like a valid one for
> that particular feature, so if you submit an enhancement request, perhaps
> it could be added in a future version.
>
> In the meantime, could you make the composite topic in your broker use a
> different name than the remote topic (call it *myCompositeTopic*), and then
> have an embedded Camel route that uses a durable subscription to consume
> from *myCompositeTopic* and publishes the message to *sourceTopic*? Then
> the network of brokers can ensure that there's a durable subscription on
> the remote broker, and you'll still get the composite destination on your
> local broker. This has the additional benefit of not having *sourceTopic*
> be a composite destination on your local broker while being a non-composite
> destination on the remote broker, which is a pretty wonky configuration
> anyway.
>
> Tim
>
> On Fri, Oct 18, 2019 at 2:05 AM Cannock, Richard <
> Richard.Cannock@edfenergy.com> wrote:
>
> > HI,
> >
> > Further investigations indicate that the resulting network connector
> > establishes only a non durable topic subscription to the sourceTopic
> > via the CompositeTopic, which is why we do not get any messages queued
> > up at source and subsequently delivered received post the networked
> > broker restart.
> >
> > However, this does not explain why this is only a non durable topic
> > subscription, as I believe this should be a durable topic subscription
> > by default.
> >
> > I have tried this with various ActiveMQ versions upto 5.15.9 and the
> > behaviour is consistent.
> >
> > Any advice?
> >
> > Thanks
> >
> > Richard
> >
> > -----Original Message-----
> > From: Cannock, Richard <Ri...@edfenergy.com>
> > Sent: 16 October 2019 19:36
> > To: users@activemq.apache.org
> > Subject: RE: compositeTopic and lost messages - broker restart
> >
> > Apologies, mail server issue meant I wasn't sure that the original
> > message had gotten through. Obviously it had!
> >
> > These are the same issue so only looking for advice on one of them,
> > thanks.
> >
> > _______
> > From: Justin Bertram [jbertram@apache.org]
> > Sent: 16 October 2019 17:09
> > To: users@activemq.apache.org
> > Subject: Re: compositeTopic and lost messages - broker restart
> >
> > Richard, didn't you just send this same email to the ActiveMQ user
> > list (although with a slightly different subject)? Just want to
> > clarify if they are the same or different issues.
> >
> >
> > Justin
> >
> > On Wed, Oct 16, 2019 at 10:40 AM Cannock, Richard <
> > Richard.Cannock@edfenergy.com> wrote:
> >
> > > Hi,
> > >
> > >
> > >
> > > We are trying to enlist in a network of brokers arrangement with a
> > > third party broker hosted externally to us. We wish to receive
> > > messages from a topic (called *sourceTopic*) and forward to internal
> > > queues on our local broker for load balancing and throughput
> > > purposes using virtual destinations, a compositeTopic forwarding to
> > > two local queues on our local broker.
> > >
> > >
> > >
> > > In order to avoid implementing ingress firewall rules for the third
> > > party onto our estate, we have created a duplex network connector so
> > > that we initiate the outbound tcp connection. We have excluded our
> > > local broker queues from propagating to the other broker using the
> > > <excludedDestinations> configuration element.
> > >
> > >
> > >
> > > Our configuration below works fine, as long as our local broker is
> > > up, and we have an active consumer. In this case, messages published
> > > to the third party broker *sourceTopic* are successfully received on
> > > our composite topic forwarded queues, test1 and test2.  To avoid any
> > > confusion, the source message producer publishing to *sourceTopic
> > > *is explicitly using PersistentDelivery (even though I believe that
> > > is the
> > default anyway).
> > >
> > >
> > >
> > > If we restart our broker, any messages that were published to the
> > > *sourceTopic* in the intervening period that our broker was down,
> > > are not received onto local forwarded queues test and test2.
> > >
> > >
> > >
> > > It would appear that the network connector configuration and\or
> > > compositeTopic do not result in a durable topic subscription to the
> > > *sourceTopic* on the other broker.
> > >
> > >
> > >
> > > I can also successfully recreate this behaviour with two local
> > > instances of Active MQ on my development machine running side by
> > > side with broker B duplex connected to broker A, so it doesn't
> > > appear to be an issue with the third party broker either.
> > >
> > >
> > >
> > > Our configuration (sensitive data removed) is as below:
> > >
> > >
> > >
> > > Any ideas on how we can resolve this?
> > >
> > >
> > >
> > > Many thanks
> > >
> > >
> > > Richard
> > >
> > >
> > >
> > > <beans
> > >
> > >   xmlns="http://www.springframework.org/schema/beans"
> > >
> > >   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> > >
> > >   xsi:schemaLocation="http://www.springframework.org/schema/beans
> > > http://www.springframework.org/schema/beans/spring-beans.xsd
> > >
> > >   http://activemq.apache.org/schema/core
> > > http://activemq.apache.org/schema/core/activemq-core.xsd">
> > >
> > >
> > >
> > >     <!-- Allows us to use system properties as variables in this
> > > configuration file -->
> > >
> > >     <bean
> > > class="org.springframework.beans.factory.config.PropertyPlaceholderC
> > > on
> > > figurer">
> > >
> > >         <property name="locations">
> > >
> > >
> > > <value>file:${activemq.conf}/credentials.properties</value>
> > >
> > >         </property>
> > >
> > >     </bean>
> > >
> > >
> > >
> > >    <!-- Allows accessing the server log -->
> > >
> > >     <bean id="logQuery"
> > class="io.fabric8.insight.log.log4j.Log4jLogQuery"
> > >
> > >           lazy-init="false" scope="singleton"
> > >
> > >           init-method="start" destroy-method="stop">
> > >
> > >     </bean>
> > >
> > >
> > >
> > >     <!--
> > >
> > >         The <broker> element is used to configure the ActiveMQ broker.
> > >
> > >     -->
> > >
> > >     <broker xmlns="http://activemq.apache.org/schema/core"
> > > brokerName="localhost" dataDirectory="${activemq.data}"
> > > useVirtualDestSubs="true">
> > >
> > >
> > >
> > >         <destinationPolicy>
> > >
> > >             <policyMap>
> > >
> > >               <policyEntries>
> > >
> > >                 <policyEntry topic=">" >
> > >
> > >                     <!-- The constantPendingMessageLimitStrategy is
> > > used to prevent
> > >
> > >                          slow topic consumers to block producers and
> > > affect other consumers
> > >
> > >                          by limiting the number of messages that are
> > > retained
> > >
> > >                          For more information, see:
> > >
> > >
> > >
> > >
> > > http://activemq.apache.org/slow-consumer-handling.html
> > >
> > >
> > >
> > >                     -->
> > >
> > >                   <pendingMessageLimitStrategy>
> > >
> > >                     <constantPendingMessageLimitStrategy
> > > limit="1000"/>
> > >
> > >                   </pendingMessageLimitStrategy>
> > >
> > >                 </policyEntry>
> > >
> > >
> > > <policyEntry queue="test" enableAudit="false">
> > >
> > >
> > > <networkBridgeFilterFactory>
> > >
> > >
> > > <conditionalNetworkBridgeFilterFactory
> > > replayWhenNoConsumers="true"/>
> > >
> > >
> > > </networkBridgeFilterFactory>
> > >
> > >
> > > </policyEntry>
> > >
> > >
> > > <policyEntry topic="*sourceTopic*" enableAudit="false">
> > >
> > >
> > > <networkBridgeFilterFactory>
> > >
> > >
> > > <conditionalNetworkBridgeFilterFactory
> > > replayWhenNoConsumers="true"/>
> > >
> > >
> > > </networkBridgeFilterFactory>
> > >
> > >
> > > </policyEntry>
> > >
> > >               </policyEntries>
> > >
> > >             </policyMap>
> > >
> > >         </destinationPolicy>
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >                                 <destinationInterceptors>
> > >
> > >
> > > <virtualDestinationInterceptor>
> > >
> > >
> > > <virtualDestinations>
> > >
> > >
> > > <compositeTopic forwardOnly="true" name="* sourceTopic*">
> > >
> > >
> > > <forwardTo>
> > >
> > >
> > > <queue physicalName="test" />
> > >
> > >
> > >
> > > <queue physicalName="test2" />
> > >
> > >
> > > </forwardTo>
> > >
> > >
> > > </compositeTopic>
> > >
> > >
> > > </virtualDestinations>
> > >
> > >
> > > </virtualDestinationInterceptor>
> > >
> > >                                 </destinationInterceptors>
> > >
> > >
> > >
> > >         <!--
> > >
> > >             The managementContext is used to configure how ActiveMQ
> > > is exposed in
> > >
> > >             JMX. By default, ActiveMQ uses the MBean server that is
> > > started by
> > >
> > >             the JVM. For more information, see:
> > >
> > >
> > >
> > >            http://activemq.apache.org/jmx.html
> > >
> > >         -->
> > >
> > >         <managementContext>
> > >
> > >             <managementContext createConnector="false"/>
> > >
> > >         </managementContext>
> > >
> > >
> > >
> > >         <!--
> > >
> > >             Configure message persistence for the broker. The
> > > default persistence
> > >
> > >             mechanism is the KahaDB store (identified by the kahaDB
> tag).
> > >
> > >             For more information, see:
> > >
> > >
> > >
> > >             http://activemq.apache.org/persistence.html
> > >
> > >         -->
> > >
> > >         <persistenceAdapter>
> > >
> > >             <kahaDB directory="${activemq.data}/kahadb"/>
> > >
> > >         </persistenceAdapter>
> > >
> > >
> > >
> > >
> > >
> > >           <!--
> > >
> > >             The systemUsage controls the maximum amount of space the
> > > broker will
> > >
> > >             use before disabling caching and/or slowing down producers.
> > > For more information, see:
> > >
> > >             http://activemq.apache.org/producer-flow-control.html
> > >
> > >           -->
> > >
> > >           <systemUsage>
> > >
> > >             <systemUsage>
> > >
> > >                 <memoryUsage>
> > >
> > >                     <memoryUsage percentOfJvmHeap="70" />
> > >
> > >                 </memoryUsage>
> > >
> > >                 <storeUsage>
> > >
> > >                     <storeUsage limit="100 gb"/>
> > >
> > >                 </storeUsage>
> > >
> > >                 <tempUsage>
> > >
> > >                     <tempUsage limit="50 gb"/>
> > >
> > >                 </tempUsage>
> > >
> > >             </systemUsage>
> > >
> > >         </systemUsage>
> > >
> > >
> > >
> > >         <!--
> > >
> > >             The transport connectors expose ActiveMQ over a given
> > > protocol to
> > >
> > >             clients and other brokers. For more information, see:
> > >
> > >
> > >
> > >             http://activemq.apache.org/configuring-transports.html
> > >
> > >         -->
> > >
> > >         <transportConnectors>
> > >
> > >             <!-- DOS protection, limit concurrent connections to
> > > 1000 and frame size to 100MB -->
> > >
> > >             <transportConnector name="openwire" uri="tcp://
> > > 0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
> > > 48
> > > 57600
> > > "/>
> > >
> > >             <!--
> > >
> > >                                                 <transportConnector
> > > name="amqp" uri="amqp://
> > > 0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104
> > > 85
> > > 7600
> > > "/>
> > >
> > >             <transportConnector name="stomp" uri="stomp://
> > > 0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
> > > 48
> > > 57600
> > > "/>
> > >
> > >             <transportConnector name="mqtt" uri="mqtt://
> > > 0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104
> > > 85
> > > 7600
> > > "/>
> > >
> > >             <transportConnector name="ws" uri="ws://
> > > 0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
> > > 48
> > > 57600
> > > "/>
> > >
> > >                                                 -->
> > >
> > >         </transportConnectors>
> > >
> > >
> > >
> > >         <!-- destroy the spring context on shutdown to stop jetty
> > > -->
> > >
> > >         <shutdownHooks>
> > >
> > >             <bean xmlns="http://www.springframework.org/schema/beans"
> > > class="org.apache.activemq.hooks.SpringContextHook" />
> > >
> > >         </shutdownHooks>
> > >
> > >
> > >
> > >                                 <networkConnectors>
> > >
> > >                                                 <networkConnector
> > > name="edf-to-thirdpartybroker-network-connector"
> > >
> > >
>  useVirtualDestSubs="true"
> > >
> > >                                                 duplex="true"
> > >
> > >                                                 uri="static:(tcp://
> > > 127.0.0.1:61616)" userName="omitted" password="omitted" >
> > >
> > >
> > > <staticallyIncludedDestinations>
> > >
> > >
> > > <topic physicalName="*sourceTopic*"/>
> > >
> > >
> > > </staticallyIncludedDestinations>
> > >
> > >
> > > <excludedDestinations>
> > >
> > >
> > > <queue physicalName=">"/>
> > >
> > >
> > > </excludedDestinations>
> > >
> > >
> > >                                                 </networkConnector>
> > >
> > >                                 </networkConnectors>
> > >
> > >
> > >
> > >                                 <plugins>
> > >
> > >
> > > <simpleAuthenticationPlugin>
> > >
> > >                                                   <users>
> > >
> > >
> > > <authenticationUser username="system" password="omitted"
> > > groups="users,admins"/>
> > >
> > >
> > > <authenticationUser username="queueAUser" password="omitted"
> > > groups="queueAusers"/>
> > >
> > >
> > > <authenticationUser username="queueBUser" password="omitted"
> > > groups="queueBusers"/>
> > >
> > >                                                   </users>
> > >
> > >
> > > </simpleAuthenticationPlugin>
> > >
> > >
> > >
> > >
> > > <authorizationPlugin>
> > >
> > >         <map>
> > >
> > >           <authorizationMap>
> > >
> > >             <authorizationEntries>
> > >
> > >               <authorizationEntry queue=">" read="admins"
> write="admins"
> > > admin="admins" />
> > >
> > >               <authorizationEntry queue="test" read="queueAusers"
> > > write="queueAusers" admin="queueAUsers" />
> > >
> > >
> > > <authorizationEntry queue="test2" read="queueBusers"
> write="queueBusers"
> > > admin="queueBusers" />
> > >
> > >
> > >
> > >               <authorizationEntry topic=">" read="admins"
> write="admins"
> > > admin="admins" />
> > >
> > >
> > >
> > >
> > >
> > >               <authorizationEntry topic="ActiveMQ.Advisory.>"
> > > read="queueAusers,queueBusers" write="admins,queueAusers,queueBusers"
> > > admin="admins,queueAusers,queueBusers" />
> > >
> > >             </authorizationEntries>
> > >
> > >
> > >
> > >             <!-- let's assign roles to temporary destinations.
> > > comment this entry if we don't want any roles assigned to temp
> > > destinations
> > > -->
> > >
> > >
> > >
> > >           </authorizationMap>
> > >
> > >         </map>
> > >
> > >       </authorizationPlugin>
> > >
> > >
> > >
> > >
> > >
> > >                                 </plugins>
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >                 </broker>
> > >
> > >
> > >
> > >     <!--
> > >
> > >         Enable web consoles, REST and Ajax APIs and demos
> > >
> > >         The web consoles requires by default login, you can disable
> > > this in the jetty.xml file
> > >
> > >
> > >
> > >         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more
> > > details
> > >
> > >     -->
> > >
> > >     <import resource="jetty.xml"/>
> > >
> > >
> > >
> > > </beans>
> > >
> > > <!-- END SNIPPET: example -->
> > >
> > >
> > >
> > > *Richard Cannock*
> > >
> > > *Solution Architect*
> > >
> > > Business Change and IT
> > >
> > > Customers
> > >
> > >
> > >
> > > *T:* +44 (0) 2081862465
> > >
> > > *M:* +44 (0)7729 320505
> > >
> > >
> > >
> > > Barnwood
> > >
> > > Barnett Way
> > >
> > > Gloucester, Gloucestershire, GL4 3RS
> > >
> > >
> > >
> > > [image: cid:4E18CA00-3CA9-4B94-970C-09C6DB028C24]
> > >
> > >
> > >
> > > edfenergy.com <http://www.edfenergy.com/>
> > >
> > >
> > >
> > > Please consider the environment before printing this email
> > >
> > >
> > >
> > > This e-mail and any files transmitted with it are confidential and
> > > may be protected by legal privilege. If you are not the intended
> > > recipient, please notify the sender and delete the e-mail from your
> > system.
> > > This e-mail has been scanned for malicious content but the internet
> > > is inherently insecure and EDF Energy Limited can not accept any
> > > liability for the integrity of this message or its attachments.
> > > No employee or agent of EDF Energy Limited or any related company is
> > > authorised to conclude any binding agreement on behalf of EDF Energy
> > > Limited or any related company by e-mail.
> > >
> > > All e-mails sent and received by EDF Energy Limited are monitored to
> > > ensure compliance with the company's information security policy.
> > > Executable and script files are not permitted through the EDF Energy
> > > Limited mail gateway.  EDF Energy does not accept or send mails
> > > above
> > > 30 Mb in size.
> > >
> > > EDF Energy Limited
> > > Registered in England and Wales No. 2366852 Registered Office: 90
> > > Whitfield Street, London W1T 4EZ
> > >
> >
> >
> > This e-mail and any files transmitted with it are confidential and may
> > be protected by legal privilege. If you are not the intended
> > recipient, please notify the sender and delete the e-mail from your
> system.
> > This e-mail has been scanned for malicious content but the internet is
> > inherently insecure and EDF Energy Limited cannot accept any liability
> > for the integrity of this message or its attachments. No employee or
> > agent of EDF Energy Limited or any related company is authorised to
> > conclude any binding agreement on behalf of EDF Energy Limited or any
> > related company by e-mail.
> > All e-mails sent and received by EDF Energy Limited are monitored to
> > ensure compliance with the company's information security policy.
> > Executable and script files are not permitted through the EDF Energy
> > Limited mail gateway. EDF Energy does not accept or send mails above
> > 30 Mb in size.
> >
> > EDF Energy Limited Registered in England and Wales No. 2366852
> > Registered
> > Office: 90 Whitfield Street, London W1T 4EZ
> >
>
>
> This e-mail and any files transmitted with it are confidential and may be
> protected by legal privilege. If you are not the intended recipient, please
> notify the sender and delete the e-mail from your system.
> This e-mail has been scanned for malicious content but the internet is
> inherently insecure and EDF Energy Limited cannot accept any liability for
> the integrity of this message or its attachments. No employee or agent of
> EDF Energy Limited or any related company is authorised to conclude any
> binding agreement on behalf of EDF Energy Limited or any related company by
> e-mail.
> All e-mails sent and received by EDF Energy Limited are monitored to
> ensure compliance with the company's information security policy.
> Executable and script files are not permitted through the EDF Energy
> Limited mail gateway. EDF Energy does not accept or send mails above 30 Mb
> in size.
>
> EDF Energy Limited Registered in England and Wales No. 2366852 Registered
> Office: 90 Whitfield Street, London W1T 4EZ
>

RE: compositeTopic and lost messages - broker restart

Posted by "Cannock, Richard" <Ri...@edfenergy.com>.
HI Tim

Thanks for the advice. In the end, I got the network connector 'promoted' to a durable topic subscriber by adding a temporary Topic to the forward declaration like this:

<destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                <compositeTopic forwardOnly="true" name="myTopic">
                    <forwardTo>
                        <queue physicalName="myqyeue"/>
                        <topic physicalName="VirtualTopic.myTopic"/>
                    </forwardTo>
                </compositeTopic>
            </virtualDestinations>
        </virtualDestinationInterceptor>
    </destinationInterceptors>

Before attaching a durable scubscriber to this topic. Once restarted the broker made the network connector durable.

I then removed the   <topic physicalName="VirtualTopic.myTopic"/> and restarted the broker again. The network connector remained durable and messages remained to be sent to the queue even when the broker was offline.

This is clearly a workaround, so a bit nervous of it's reliability, so clearly an enhancement request to make this a duranble subscription might be best.

Thanks

Richard

-----Original Message-----
From: Tim Bain <tb...@alumni.duke.edu> 
Sent: 08 November 2019 07:00
To: ActiveMQ Users <us...@activemq.apache.org>
Subject: Re: compositeTopic and lost messages - broker restart

Durable topic subscriptions are dangerous things, because if the consumer disappears without removing the subscription, the broker will keep the messages for it forever, eventually running the broker out of memory. A durable topic subscription between brokers (for implementing a composite
destination) is doubly so, since the subscription isn't held by a traditional consumer who might give assurances of cleaning things up before tearing itself down. So it's no surprise to me that a non-durable subscription is the default behavior, since that's the safer behavior.

However, I think maybe your real question isn't why this isn't the default, but why there isn't an option to specify the use of a durable subscription if you so choose. Your use case seems like a valid one for that particular feature, so if you submit an enhancement request, perhaps it could be added in a future version.

In the meantime, could you make the composite topic in your broker use a different name than the remote topic (call it *myCompositeTopic*), and then have an embedded Camel route that uses a durable subscription to consume from *myCompositeTopic* and publishes the message to *sourceTopic*? Then the network of brokers can ensure that there's a durable subscription on the remote broker, and you'll still get the composite destination on your local broker. This has the additional benefit of not having *sourceTopic* be a composite destination on your local broker while being a non-composite destination on the remote broker, which is a pretty wonky configuration anyway.

Tim

On Fri, Oct 18, 2019 at 2:05 AM Cannock, Richard < Richard.Cannock@edfenergy.com> wrote:

> HI,
>
> Further investigations indicate that the resulting network connector 
> establishes only a non durable topic subscription to the sourceTopic 
> via the CompositeTopic, which is why we do not get any messages queued 
> up at source and subsequently delivered received post the networked 
> broker restart.
>
> However, this does not explain why this is only a non durable topic 
> subscription, as I believe this should be a durable topic subscription 
> by default.
>
> I have tried this with various ActiveMQ versions upto 5.15.9 and the 
> behaviour is consistent.
>
> Any advice?
>
> Thanks
>
> Richard
>
> -----Original Message-----
> From: Cannock, Richard <Ri...@edfenergy.com>
> Sent: 16 October 2019 19:36
> To: users@activemq.apache.org
> Subject: RE: compositeTopic and lost messages - broker restart
>
> Apologies, mail server issue meant I wasn't sure that the original 
> message had gotten through. Obviously it had!
>
> These are the same issue so only looking for advice on one of them, 
> thanks.
>
> _______
> From: Justin Bertram [jbertram@apache.org]
> Sent: 16 October 2019 17:09
> To: users@activemq.apache.org
> Subject: Re: compositeTopic and lost messages - broker restart
>
> Richard, didn't you just send this same email to the ActiveMQ user 
> list (although with a slightly different subject)? Just want to 
> clarify if they are the same or different issues.
>
>
> Justin
>
> On Wed, Oct 16, 2019 at 10:40 AM Cannock, Richard < 
> Richard.Cannock@edfenergy.com> wrote:
>
> > Hi,
> >
> >
> >
> > We are trying to enlist in a network of brokers arrangement with a 
> > third party broker hosted externally to us. We wish to receive 
> > messages from a topic (called *sourceTopic*) and forward to internal 
> > queues on our local broker for load balancing and throughput 
> > purposes using virtual destinations, a compositeTopic forwarding to 
> > two local queues on our local broker.
> >
> >
> >
> > In order to avoid implementing ingress firewall rules for the third 
> > party onto our estate, we have created a duplex network connector so 
> > that we initiate the outbound tcp connection. We have excluded our 
> > local broker queues from propagating to the other broker using the 
> > <excludedDestinations> configuration element.
> >
> >
> >
> > Our configuration below works fine, as long as our local broker is 
> > up, and we have an active consumer. In this case, messages published 
> > to the third party broker *sourceTopic* are successfully received on 
> > our composite topic forwarded queues, test1 and test2.  To avoid any 
> > confusion, the source message producer publishing to *sourceTopic 
> > *is explicitly using PersistentDelivery (even though I believe that 
> > is the
> default anyway).
> >
> >
> >
> > If we restart our broker, any messages that were published to the
> > *sourceTopic* in the intervening period that our broker was down, 
> > are not received onto local forwarded queues test and test2.
> >
> >
> >
> > It would appear that the network connector configuration and\or 
> > compositeTopic do not result in a durable topic subscription to the
> > *sourceTopic* on the other broker.
> >
> >
> >
> > I can also successfully recreate this behaviour with two local 
> > instances of Active MQ on my development machine running side by 
> > side with broker B duplex connected to broker A, so it doesn't 
> > appear to be an issue with the third party broker either.
> >
> >
> >
> > Our configuration (sensitive data removed) is as below:
> >
> >
> >
> > Any ideas on how we can resolve this?
> >
> >
> >
> > Many thanks
> >
> >
> > Richard
> >
> >
> >
> > <beans
> >
> >   xmlns="http://www.springframework.org/schema/beans"
> >
> >   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> >
> >   xsi:schemaLocation="http://www.springframework.org/schema/beans
> > http://www.springframework.org/schema/beans/spring-beans.xsd
> >
> >   http://activemq.apache.org/schema/core
> > http://activemq.apache.org/schema/core/activemq-core.xsd">
> >
> >
> >
> >     <!-- Allows us to use system properties as variables in this 
> > configuration file -->
> >
> >     <bean
> > class="org.springframework.beans.factory.config.PropertyPlaceholderC
> > on
> > figurer">
> >
> >         <property name="locations">
> >
> >
> > <value>file:${activemq.conf}/credentials.properties</value>
> >
> >         </property>
> >
> >     </bean>
> >
> >
> >
> >    <!-- Allows accessing the server log -->
> >
> >     <bean id="logQuery"
> class="io.fabric8.insight.log.log4j.Log4jLogQuery"
> >
> >           lazy-init="false" scope="singleton"
> >
> >           init-method="start" destroy-method="stop">
> >
> >     </bean>
> >
> >
> >
> >     <!--
> >
> >         The <broker> element is used to configure the ActiveMQ broker.
> >
> >     -->
> >
> >     <broker xmlns="http://activemq.apache.org/schema/core"
> > brokerName="localhost" dataDirectory="${activemq.data}"
> > useVirtualDestSubs="true">
> >
> >
> >
> >         <destinationPolicy>
> >
> >             <policyMap>
> >
> >               <policyEntries>
> >
> >                 <policyEntry topic=">" >
> >
> >                     <!-- The constantPendingMessageLimitStrategy is 
> > used to prevent
> >
> >                          slow topic consumers to block producers and 
> > affect other consumers
> >
> >                          by limiting the number of messages that are 
> > retained
> >
> >                          For more information, see:
> >
> >
> >
> >
> > http://activemq.apache.org/slow-consumer-handling.html
> >
> >
> >
> >                     -->
> >
> >                   <pendingMessageLimitStrategy>
> >
> >                     <constantPendingMessageLimitStrategy
> > limit="1000"/>
> >
> >                   </pendingMessageLimitStrategy>
> >
> >                 </policyEntry>
> >
> >
> > <policyEntry queue="test" enableAudit="false">
> >
> >
> > <networkBridgeFilterFactory>
> >
> >
> > <conditionalNetworkBridgeFilterFactory 
> > replayWhenNoConsumers="true"/>
> >
> >
> > </networkBridgeFilterFactory>
> >
> >
> > </policyEntry>
> >
> >
> > <policyEntry topic="*sourceTopic*" enableAudit="false">
> >
> >
> > <networkBridgeFilterFactory>
> >
> >
> > <conditionalNetworkBridgeFilterFactory 
> > replayWhenNoConsumers="true"/>
> >
> >
> > </networkBridgeFilterFactory>
> >
> >
> > </policyEntry>
> >
> >               </policyEntries>
> >
> >             </policyMap>
> >
> >         </destinationPolicy>
> >
> >
> >
> >
> >
> >
> >
> >                                 <destinationInterceptors>
> >
> >
> > <virtualDestinationInterceptor>
> >
> >
> > <virtualDestinations>
> >
> >
> > <compositeTopic forwardOnly="true" name="* sourceTopic*">
> >
> >
> > <forwardTo>
> >
> >
> > <queue physicalName="test" />
> >
> >
> >                                                                 
> > <queue physicalName="test2" />
> >
> >
> > </forwardTo>
> >
> >
> > </compositeTopic>
> >
> >
> > </virtualDestinations>
> >
> >
> > </virtualDestinationInterceptor>
> >
> >                                 </destinationInterceptors>
> >
> >
> >
> >         <!--
> >
> >             The managementContext is used to configure how ActiveMQ 
> > is exposed in
> >
> >             JMX. By default, ActiveMQ uses the MBean server that is 
> > started by
> >
> >             the JVM. For more information, see:
> >
> >
> >
> >            http://activemq.apache.org/jmx.html
> >
> >         -->
> >
> >         <managementContext>
> >
> >             <managementContext createConnector="false"/>
> >
> >         </managementContext>
> >
> >
> >
> >         <!--
> >
> >             Configure message persistence for the broker. The 
> > default persistence
> >
> >             mechanism is the KahaDB store (identified by the kahaDB tag).
> >
> >             For more information, see:
> >
> >
> >
> >             http://activemq.apache.org/persistence.html
> >
> >         -->
> >
> >         <persistenceAdapter>
> >
> >             <kahaDB directory="${activemq.data}/kahadb"/>
> >
> >         </persistenceAdapter>
> >
> >
> >
> >
> >
> >           <!--
> >
> >             The systemUsage controls the maximum amount of space the 
> > broker will
> >
> >             use before disabling caching and/or slowing down producers.
> > For more information, see:
> >
> >             http://activemq.apache.org/producer-flow-control.html
> >
> >           -->
> >
> >           <systemUsage>
> >
> >             <systemUsage>
> >
> >                 <memoryUsage>
> >
> >                     <memoryUsage percentOfJvmHeap="70" />
> >
> >                 </memoryUsage>
> >
> >                 <storeUsage>
> >
> >                     <storeUsage limit="100 gb"/>
> >
> >                 </storeUsage>
> >
> >                 <tempUsage>
> >
> >                     <tempUsage limit="50 gb"/>
> >
> >                 </tempUsage>
> >
> >             </systemUsage>
> >
> >         </systemUsage>
> >
> >
> >
> >         <!--
> >
> >             The transport connectors expose ActiveMQ over a given 
> > protocol to
> >
> >             clients and other brokers. For more information, see:
> >
> >
> >
> >             http://activemq.apache.org/configuring-transports.html
> >
> >         -->
> >
> >         <transportConnectors>
> >
> >             <!-- DOS protection, limit concurrent connections to 
> > 1000 and frame size to 100MB -->
> >
> >             <transportConnector name="openwire" uri="tcp://
> > 0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
> > 48
> > 57600
> > "/>
> >
> >             <!--
> >
> >                                                 <transportConnector 
> > name="amqp" uri="amqp://
> > 0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104
> > 85
> > 7600
> > "/>
> >
> >             <transportConnector name="stomp" uri="stomp://
> > 0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
> > 48
> > 57600
> > "/>
> >
> >             <transportConnector name="mqtt" uri="mqtt://
> > 0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104
> > 85
> > 7600
> > "/>
> >
> >             <transportConnector name="ws" uri="ws://
> > 0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
> > 48
> > 57600
> > "/>
> >
> >                                                 -->
> >
> >         </transportConnectors>
> >
> >
> >
> >         <!-- destroy the spring context on shutdown to stop jetty 
> > -->
> >
> >         <shutdownHooks>
> >
> >             <bean xmlns="http://www.springframework.org/schema/beans"
> > class="org.apache.activemq.hooks.SpringContextHook" />
> >
> >         </shutdownHooks>
> >
> >
> >
> >                                 <networkConnectors>
> >
> >                                                 <networkConnector 
> > name="edf-to-thirdpartybroker-network-connector"
> >
> >                                                 useVirtualDestSubs="true"
> >
> >                                                 duplex="true"
> >
> >                                                 uri="static:(tcp:// 
> > 127.0.0.1:61616)" userName="omitted" password="omitted" >
> >
> >
> > <staticallyIncludedDestinations>
> >
> >
> > <topic physicalName="*sourceTopic*"/>
> >
> >
> > </staticallyIncludedDestinations>
> >
> >
> > <excludedDestinations>
> >
> >
> > <queue physicalName=">"/>
> >
> >
> > </excludedDestinations>
> >
> >
> >                                                 </networkConnector>
> >
> >                                 </networkConnectors>
> >
> >
> >
> >                                 <plugins>
> >
> >
> > <simpleAuthenticationPlugin>
> >
> >                                                   <users>
> >
> >
> > <authenticationUser username="system" password="omitted"
> > groups="users,admins"/>
> >
> >
> > <authenticationUser username="queueAUser" password="omitted"
> > groups="queueAusers"/>
> >
> >
> > <authenticationUser username="queueBUser" password="omitted"
> > groups="queueBusers"/>
> >
> >                                                   </users>
> >
> >
> > </simpleAuthenticationPlugin>
> >
> >
> >
> >                                                 
> > <authorizationPlugin>
> >
> >         <map>
> >
> >           <authorizationMap>
> >
> >             <authorizationEntries>
> >
> >               <authorizationEntry queue=">" read="admins" write="admins"
> > admin="admins" />
> >
> >               <authorizationEntry queue="test" read="queueAusers"
> > write="queueAusers" admin="queueAUsers" />
> >
> >                                                   
> > <authorizationEntry queue="test2" read="queueBusers" write="queueBusers"
> > admin="queueBusers" />
> >
> >
> >
> >               <authorizationEntry topic=">" read="admins" write="admins"
> > admin="admins" />
> >
> >
> >
> >
> >
> >               <authorizationEntry topic="ActiveMQ.Advisory.>"
> > read="queueAusers,queueBusers" write="admins,queueAusers,queueBusers"
> > admin="admins,queueAusers,queueBusers" />
> >
> >             </authorizationEntries>
> >
> >
> >
> >             <!-- let's assign roles to temporary destinations. 
> > comment this entry if we don't want any roles assigned to temp 
> > destinations
> > -->
> >
> >
> >
> >           </authorizationMap>
> >
> >         </map>
> >
> >       </authorizationPlugin>
> >
> >
> >
> >
> >
> >                                 </plugins>
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >                 </broker>
> >
> >
> >
> >     <!--
> >
> >         Enable web consoles, REST and Ajax APIs and demos
> >
> >         The web consoles requires by default login, you can disable 
> > this in the jetty.xml file
> >
> >
> >
> >         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more 
> > details
> >
> >     -->
> >
> >     <import resource="jetty.xml"/>
> >
> >
> >
> > </beans>
> >
> > <!-- END SNIPPET: example -->
> >
> >
> >
> > *Richard Cannock*
> >
> > *Solution Architect*
> >
> > Business Change and IT
> >
> > Customers
> >
> >
> >
> > *T:* +44 (0) 2081862465
> >
> > *M:* +44 (0)7729 320505
> >
> >
> >
> > Barnwood
> >
> > Barnett Way
> >
> > Gloucester, Gloucestershire, GL4 3RS
> >
> >
> >
> > [image: cid:4E18CA00-3CA9-4B94-970C-09C6DB028C24]
> >
> >
> >
> > edfenergy.com <http://www.edfenergy.com/>
> >
> >
> >
> > Please consider the environment before printing this email
> >
> >
> >
> > This e-mail and any files transmitted with it are confidential and 
> > may be protected by legal privilege. If you are not the intended 
> > recipient, please notify the sender and delete the e-mail from your
> system.
> > This e-mail has been scanned for malicious content but the internet 
> > is inherently insecure and EDF Energy Limited can not accept any 
> > liability for the integrity of this message or its attachments.
> > No employee or agent of EDF Energy Limited or any related company is 
> > authorised to conclude any binding agreement on behalf of EDF Energy 
> > Limited or any related company by e-mail.
> >
> > All e-mails sent and received by EDF Energy Limited are monitored to 
> > ensure compliance with the company's information security policy.
> > Executable and script files are not permitted through the EDF Energy 
> > Limited mail gateway.  EDF Energy does not accept or send mails 
> > above
> > 30 Mb in size.
> >
> > EDF Energy Limited
> > Registered in England and Wales No. 2366852 Registered Office: 90 
> > Whitfield Street, London W1T 4EZ
> >
>
>
> This e-mail and any files transmitted with it are confidential and may 
> be protected by legal privilege. If you are not the intended 
> recipient, please notify the sender and delete the e-mail from your system.
> This e-mail has been scanned for malicious content but the internet is 
> inherently insecure and EDF Energy Limited cannot accept any liability 
> for the integrity of this message or its attachments. No employee or 
> agent of EDF Energy Limited or any related company is authorised to 
> conclude any binding agreement on behalf of EDF Energy Limited or any 
> related company by e-mail.
> All e-mails sent and received by EDF Energy Limited are monitored to 
> ensure compliance with the company's information security policy.
> Executable and script files are not permitted through the EDF Energy 
> Limited mail gateway. EDF Energy does not accept or send mails above 
> 30 Mb in size.
>
> EDF Energy Limited Registered in England and Wales No. 2366852 
> Registered
> Office: 90 Whitfield Street, London W1T 4EZ
>


This e-mail and any files transmitted with it are confidential and may be protected by legal privilege. If you are not the intended recipient, please notify the sender and delete the e-mail from your system.
This e-mail has been scanned for malicious content but the internet is inherently insecure and EDF Energy Limited cannot accept any liability for the integrity of this message or its attachments. No employee or agent of EDF Energy Limited or any related company is authorised to conclude any binding agreement on behalf of EDF Energy Limited or any related company by e-mail.
All e-mails sent and received by EDF Energy Limited are monitored to ensure compliance with the company's information security policy. Executable and script files are not permitted through the EDF Energy Limited mail gateway. EDF Energy does not accept or send mails above 30 Mb in size.

EDF Energy Limited Registered in England and Wales No. 2366852 Registered Office: 90 Whitfield Street, London W1T 4EZ

Re: compositeTopic and lost messages - broker restart

Posted by Tim Bain <tb...@alumni.duke.edu>.
Durable topic subscriptions are dangerous things, because if the consumer
disappears without removing the subscription, the broker will keep the
messages for it forever, eventually running the broker out of memory. A
durable topic subscription between brokers (for implementing a composite
destination) is doubly so, since the subscription isn't held by a
traditional consumer who might give assurances of cleaning things up before
tearing itself down. So it's no surprise to me that a non-durable
subscription is the default behavior, since that's the safer behavior.

However, I think maybe your real question isn't why this isn't the default,
but why there isn't an option to specify the use of a durable subscription
if you so choose. Your use case seems like a valid one for that particular
feature, so if you submit an enhancement request, perhaps it could be added
in a future version.

In the meantime, could you make the composite topic in your broker use a
different name than the remote topic (call it *myCompositeTopic*), and then
have an embedded Camel route that uses a durable subscription to consume
from *myCompositeTopic* and publishes the message to *sourceTopic*? Then
the network of brokers can ensure that there's a durable subscription on
the remote broker, and you'll still get the composite destination on your
local broker. This has the additional benefit of not having *sourceTopic*
be a composite destination on your local broker while being a non-composite
destination on the remote broker, which is a pretty wonky configuration
anyway.

Tim

On Fri, Oct 18, 2019 at 2:05 AM Cannock, Richard <
Richard.Cannock@edfenergy.com> wrote:

> HI,
>
> Further investigations indicate that the resulting network connector
> establishes only a non durable topic subscription to the sourceTopic via
> the CompositeTopic, which is why we do not get any messages queued up at
> source and subsequently delivered received post the networked broker
> restart.
>
> However, this does not explain why this is only a non durable topic
> subscription, as I believe this should be a durable topic subscription by
> default.
>
> I have tried this with various ActiveMQ versions upto 5.15.9 and the
> behaviour is consistent.
>
> Any advice?
>
> Thanks
>
> Richard
>
> -----Original Message-----
> From: Cannock, Richard <Ri...@edfenergy.com>
> Sent: 16 October 2019 19:36
> To: users@activemq.apache.org
> Subject: RE: compositeTopic and lost messages - broker restart
>
> Apologies, mail server issue meant I wasn't sure that the original message
> had gotten through. Obviously it had!
>
> These are the same issue so only looking for advice on one of them,
> thanks.
>
> _______
> From: Justin Bertram [jbertram@apache.org]
> Sent: 16 October 2019 17:09
> To: users@activemq.apache.org
> Subject: Re: compositeTopic and lost messages - broker restart
>
> Richard, didn't you just send this same email to the ActiveMQ user list
> (although with a slightly different subject)? Just want to clarify if they
> are the same or different issues.
>
>
> Justin
>
> On Wed, Oct 16, 2019 at 10:40 AM Cannock, Richard <
> Richard.Cannock@edfenergy.com> wrote:
>
> > Hi,
> >
> >
> >
> > We are trying to enlist in a network of brokers arrangement with a
> > third party broker hosted externally to us. We wish to receive
> > messages from a topic (called *sourceTopic*) and forward to internal
> > queues on our local broker for load balancing and throughput purposes
> > using virtual destinations, a compositeTopic forwarding to two local
> > queues on our local broker.
> >
> >
> >
> > In order to avoid implementing ingress firewall rules for the third
> > party onto our estate, we have created a duplex network connector so
> > that we initiate the outbound tcp connection. We have excluded our
> > local broker queues from propagating to the other broker using the
> > <excludedDestinations> configuration element.
> >
> >
> >
> > Our configuration below works fine, as long as our local broker is up,
> > and we have an active consumer. In this case, messages published to
> > the third party broker *sourceTopic* are successfully received on our
> > composite topic forwarded queues, test1 and test2.  To avoid any
> > confusion, the source message producer publishing to *sourceTopic *is
> > explicitly using PersistentDelivery (even though I believe that is the
> default anyway).
> >
> >
> >
> > If we restart our broker, any messages that were published to the
> > *sourceTopic* in the intervening period that our broker was down, are
> > not received onto local forwarded queues test and test2.
> >
> >
> >
> > It would appear that the network connector configuration and\or
> > compositeTopic do not result in a durable topic subscription to the
> > *sourceTopic* on the other broker.
> >
> >
> >
> > I can also successfully recreate this behaviour with two local
> > instances of Active MQ on my development machine running side by side
> > with broker B duplex connected to broker A, so it doesn't appear to be
> > an issue with the third party broker either.
> >
> >
> >
> > Our configuration (sensitive data removed) is as below:
> >
> >
> >
> > Any ideas on how we can resolve this?
> >
> >
> >
> > Many thanks
> >
> >
> > Richard
> >
> >
> >
> > <beans
> >
> >   xmlns="http://www.springframework.org/schema/beans"
> >
> >   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> >
> >   xsi:schemaLocation="http://www.springframework.org/schema/beans
> > http://www.springframework.org/schema/beans/spring-beans.xsd
> >
> >   http://activemq.apache.org/schema/core
> > http://activemq.apache.org/schema/core/activemq-core.xsd">
> >
> >
> >
> >     <!-- Allows us to use system properties as variables in this
> > configuration file -->
> >
> >     <bean
> > class="org.springframework.beans.factory.config.PropertyPlaceholderCon
> > figurer">
> >
> >         <property name="locations">
> >
> >
> > <value>file:${activemq.conf}/credentials.properties</value>
> >
> >         </property>
> >
> >     </bean>
> >
> >
> >
> >    <!-- Allows accessing the server log -->
> >
> >     <bean id="logQuery"
> class="io.fabric8.insight.log.log4j.Log4jLogQuery"
> >
> >           lazy-init="false" scope="singleton"
> >
> >           init-method="start" destroy-method="stop">
> >
> >     </bean>
> >
> >
> >
> >     <!--
> >
> >         The <broker> element is used to configure the ActiveMQ broker.
> >
> >     -->
> >
> >     <broker xmlns="http://activemq.apache.org/schema/core"
> > brokerName="localhost" dataDirectory="${activemq.data}"
> > useVirtualDestSubs="true">
> >
> >
> >
> >         <destinationPolicy>
> >
> >             <policyMap>
> >
> >               <policyEntries>
> >
> >                 <policyEntry topic=">" >
> >
> >                     <!-- The constantPendingMessageLimitStrategy is
> > used to prevent
> >
> >                          slow topic consumers to block producers and
> > affect other consumers
> >
> >                          by limiting the number of messages that are
> > retained
> >
> >                          For more information, see:
> >
> >
> >
> >
> > http://activemq.apache.org/slow-consumer-handling.html
> >
> >
> >
> >                     -->
> >
> >                   <pendingMessageLimitStrategy>
> >
> >                     <constantPendingMessageLimitStrategy
> > limit="1000"/>
> >
> >                   </pendingMessageLimitStrategy>
> >
> >                 </policyEntry>
> >
> >
> > <policyEntry queue="test" enableAudit="false">
> >
> >
> > <networkBridgeFilterFactory>
> >
> >
> > <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
> >
> >
> > </networkBridgeFilterFactory>
> >
> >
> > </policyEntry>
> >
> >
> > <policyEntry topic="*sourceTopic*" enableAudit="false">
> >
> >
> > <networkBridgeFilterFactory>
> >
> >
> > <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
> >
> >
> > </networkBridgeFilterFactory>
> >
> >
> > </policyEntry>
> >
> >               </policyEntries>
> >
> >             </policyMap>
> >
> >         </destinationPolicy>
> >
> >
> >
> >
> >
> >
> >
> >                                 <destinationInterceptors>
> >
> >
> > <virtualDestinationInterceptor>
> >
> >
> > <virtualDestinations>
> >
> >
> > <compositeTopic forwardOnly="true" name="* sourceTopic*">
> >
> >
> > <forwardTo>
> >
> >
> > <queue physicalName="test" />
> >
> >
> >                                                                 <queue
> > physicalName="test2" />
> >
> >
> > </forwardTo>
> >
> >
> > </compositeTopic>
> >
> >
> > </virtualDestinations>
> >
> >
> > </virtualDestinationInterceptor>
> >
> >                                 </destinationInterceptors>
> >
> >
> >
> >         <!--
> >
> >             The managementContext is used to configure how ActiveMQ is
> > exposed in
> >
> >             JMX. By default, ActiveMQ uses the MBean server that is
> > started by
> >
> >             the JVM. For more information, see:
> >
> >
> >
> >            http://activemq.apache.org/jmx.html
> >
> >         -->
> >
> >         <managementContext>
> >
> >             <managementContext createConnector="false"/>
> >
> >         </managementContext>
> >
> >
> >
> >         <!--
> >
> >             Configure message persistence for the broker. The default
> > persistence
> >
> >             mechanism is the KahaDB store (identified by the kahaDB tag).
> >
> >             For more information, see:
> >
> >
> >
> >             http://activemq.apache.org/persistence.html
> >
> >         -->
> >
> >         <persistenceAdapter>
> >
> >             <kahaDB directory="${activemq.data}/kahadb"/>
> >
> >         </persistenceAdapter>
> >
> >
> >
> >
> >
> >           <!--
> >
> >             The systemUsage controls the maximum amount of space the
> > broker will
> >
> >             use before disabling caching and/or slowing down producers.
> > For more information, see:
> >
> >             http://activemq.apache.org/producer-flow-control.html
> >
> >           -->
> >
> >           <systemUsage>
> >
> >             <systemUsage>
> >
> >                 <memoryUsage>
> >
> >                     <memoryUsage percentOfJvmHeap="70" />
> >
> >                 </memoryUsage>
> >
> >                 <storeUsage>
> >
> >                     <storeUsage limit="100 gb"/>
> >
> >                 </storeUsage>
> >
> >                 <tempUsage>
> >
> >                     <tempUsage limit="50 gb"/>
> >
> >                 </tempUsage>
> >
> >             </systemUsage>
> >
> >         </systemUsage>
> >
> >
> >
> >         <!--
> >
> >             The transport connectors expose ActiveMQ over a given
> > protocol to
> >
> >             clients and other brokers. For more information, see:
> >
> >
> >
> >             http://activemq.apache.org/configuring-transports.html
> >
> >         -->
> >
> >         <transportConnectors>
> >
> >             <!-- DOS protection, limit concurrent connections to 1000
> > and frame size to 100MB -->
> >
> >             <transportConnector name="openwire" uri="tcp://
> > 0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048
> > 57600
> > "/>
> >
> >             <!--
> >
> >                                                 <transportConnector
> > name="amqp" uri="amqp://
> > 0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=10485
> > 7600
> > "/>
> >
> >             <transportConnector name="stomp" uri="stomp://
> > 0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048
> > 57600
> > "/>
> >
> >             <transportConnector name="mqtt" uri="mqtt://
> > 0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=10485
> > 7600
> > "/>
> >
> >             <transportConnector name="ws" uri="ws://
> > 0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048
> > 57600
> > "/>
> >
> >                                                 -->
> >
> >         </transportConnectors>
> >
> >
> >
> >         <!-- destroy the spring context on shutdown to stop jetty -->
> >
> >         <shutdownHooks>
> >
> >             <bean xmlns="http://www.springframework.org/schema/beans"
> > class="org.apache.activemq.hooks.SpringContextHook" />
> >
> >         </shutdownHooks>
> >
> >
> >
> >                                 <networkConnectors>
> >
> >                                                 <networkConnector
> > name="edf-to-thirdpartybroker-network-connector"
> >
> >                                                 useVirtualDestSubs="true"
> >
> >                                                 duplex="true"
> >
> >                                                 uri="static:(tcp://
> > 127.0.0.1:61616)" userName="omitted" password="omitted" >
> >
> >
> > <staticallyIncludedDestinations>
> >
> >
> > <topic physicalName="*sourceTopic*"/>
> >
> >
> > </staticallyIncludedDestinations>
> >
> >
> > <excludedDestinations>
> >
> >
> > <queue physicalName=">"/>
> >
> >
> > </excludedDestinations>
> >
> >
> >                                                 </networkConnector>
> >
> >                                 </networkConnectors>
> >
> >
> >
> >                                 <plugins>
> >
> >
> > <simpleAuthenticationPlugin>
> >
> >                                                   <users>
> >
> >
> > <authenticationUser username="system" password="omitted"
> > groups="users,admins"/>
> >
> >
> > <authenticationUser username="queueAUser" password="omitted"
> > groups="queueAusers"/>
> >
> >
> > <authenticationUser username="queueBUser" password="omitted"
> > groups="queueBusers"/>
> >
> >                                                   </users>
> >
> >
> > </simpleAuthenticationPlugin>
> >
> >
> >
> >                                                 <authorizationPlugin>
> >
> >         <map>
> >
> >           <authorizationMap>
> >
> >             <authorizationEntries>
> >
> >               <authorizationEntry queue=">" read="admins" write="admins"
> > admin="admins" />
> >
> >               <authorizationEntry queue="test" read="queueAusers"
> > write="queueAusers" admin="queueAUsers" />
> >
> >                                                   <authorizationEntry
> > queue="test2" read="queueBusers" write="queueBusers"
> > admin="queueBusers" />
> >
> >
> >
> >               <authorizationEntry topic=">" read="admins" write="admins"
> > admin="admins" />
> >
> >
> >
> >
> >
> >               <authorizationEntry topic="ActiveMQ.Advisory.>"
> > read="queueAusers,queueBusers" write="admins,queueAusers,queueBusers"
> > admin="admins,queueAusers,queueBusers" />
> >
> >             </authorizationEntries>
> >
> >
> >
> >             <!-- let's assign roles to temporary destinations. comment
> > this entry if we don't want any roles assigned to temp destinations
> > -->
> >
> >
> >
> >           </authorizationMap>
> >
> >         </map>
> >
> >       </authorizationPlugin>
> >
> >
> >
> >
> >
> >                                 </plugins>
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >                 </broker>
> >
> >
> >
> >     <!--
> >
> >         Enable web consoles, REST and Ajax APIs and demos
> >
> >         The web consoles requires by default login, you can disable
> > this in the jetty.xml file
> >
> >
> >
> >         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more
> > details
> >
> >     -->
> >
> >     <import resource="jetty.xml"/>
> >
> >
> >
> > </beans>
> >
> > <!-- END SNIPPET: example -->
> >
> >
> >
> > *Richard Cannock*
> >
> > *Solution Architect*
> >
> > Business Change and IT
> >
> > Customers
> >
> >
> >
> > *T:* +44 (0) 2081862465
> >
> > *M:* +44 (0)7729 320505
> >
> >
> >
> > Barnwood
> >
> > Barnett Way
> >
> > Gloucester, Gloucestershire, GL4 3RS
> >
> >
> >
> > [image: cid:4E18CA00-3CA9-4B94-970C-09C6DB028C24]
> >
> >
> >
> > edfenergy.com <http://www.edfenergy.com/>
> >
> >
> >
> > Please consider the environment before printing this email
> >
> >
> >
> > This e-mail and any files transmitted with it are confidential and may
> > be protected by legal privilege. If you are not the intended
> > recipient, please notify the sender and delete the e-mail from your
> system.
> > This e-mail has been scanned for malicious content but the internet is
> > inherently insecure and EDF Energy Limited can not accept any
> > liability for the integrity of this message or its attachments.
> > No employee or agent of EDF Energy Limited or any related company is
> > authorised to conclude any binding agreement on behalf of EDF Energy
> > Limited or any related company by e-mail.
> >
> > All e-mails sent and received by EDF Energy Limited are monitored to
> > ensure compliance with the company's information security policy.
> > Executable and script files are not permitted through the EDF Energy
> > Limited mail gateway.  EDF Energy does not accept or send mails above
> > 30 Mb in size.
> >
> > EDF Energy Limited
> > Registered in England and Wales No. 2366852 Registered Office: 90
> > Whitfield Street, London W1T 4EZ
> >
>
>
> This e-mail and any files transmitted with it are confidential and may be
> protected by legal privilege. If you are not the intended recipient, please
> notify the sender and delete the e-mail from your system.
> This e-mail has been scanned for malicious content but the internet is
> inherently insecure and EDF Energy Limited cannot accept any liability for
> the integrity of this message or its attachments. No employee or agent of
> EDF Energy Limited or any related company is authorised to conclude any
> binding agreement on behalf of EDF Energy Limited or any related company by
> e-mail.
> All e-mails sent and received by EDF Energy Limited are monitored to
> ensure compliance with the company's information security policy.
> Executable and script files are not permitted through the EDF Energy
> Limited mail gateway. EDF Energy does not accept or send mails above 30 Mb
> in size.
>
> EDF Energy Limited Registered in England and Wales No. 2366852 Registered
> Office: 90 Whitfield Street, London W1T 4EZ
>

RE: compositeTopic and lost messages - broker restart

Posted by "Cannock, Richard" <Ri...@edfenergy.com>.
HI,

Further investigations indicate that the resulting network connector establishes only a non durable topic subscription to the sourceTopic via the CompositeTopic, which is why we do not get any messages queued up at source and subsequently delivered received post the networked broker restart.

However, this does not explain why this is only a non durable topic subscription, as I believe this should be a durable topic subscription by default. 

I have tried this with various ActiveMQ versions upto 5.15.9 and the behaviour is consistent.

Any advice?

Thanks

Richard

-----Original Message-----
From: Cannock, Richard <Ri...@edfenergy.com> 
Sent: 16 October 2019 19:36
To: users@activemq.apache.org
Subject: RE: compositeTopic and lost messages - broker restart

Apologies, mail server issue meant I wasn't sure that the original message had gotten through. Obviously it had!

These are the same issue so only looking for advice on one of them, thanks. 

_______
From: Justin Bertram [jbertram@apache.org]
Sent: 16 October 2019 17:09
To: users@activemq.apache.org
Subject: Re: compositeTopic and lost messages - broker restart

Richard, didn't you just send this same email to the ActiveMQ user list (although with a slightly different subject)? Just want to clarify if they are the same or different issues.


Justin

On Wed, Oct 16, 2019 at 10:40 AM Cannock, Richard < Richard.Cannock@edfenergy.com> wrote:

> Hi,
>
>
>
> We are trying to enlist in a network of brokers arrangement with a 
> third party broker hosted externally to us. We wish to receive 
> messages from a topic (called *sourceTopic*) and forward to internal 
> queues on our local broker for load balancing and throughput purposes 
> using virtual destinations, a compositeTopic forwarding to two local 
> queues on our local broker.
>
>
>
> In order to avoid implementing ingress firewall rules for the third 
> party onto our estate, we have created a duplex network connector so 
> that we initiate the outbound tcp connection. We have excluded our 
> local broker queues from propagating to the other broker using the 
> <excludedDestinations> configuration element.
>
>
>
> Our configuration below works fine, as long as our local broker is up, 
> and we have an active consumer. In this case, messages published to 
> the third party broker *sourceTopic* are successfully received on our 
> composite topic forwarded queues, test1 and test2.  To avoid any 
> confusion, the source message producer publishing to *sourceTopic *is 
> explicitly using PersistentDelivery (even though I believe that is the default anyway).
>
>
>
> If we restart our broker, any messages that were published to the
> *sourceTopic* in the intervening period that our broker was down, are 
> not received onto local forwarded queues test and test2.
>
>
>
> It would appear that the network connector configuration and\or 
> compositeTopic do not result in a durable topic subscription to the
> *sourceTopic* on the other broker.
>
>
>
> I can also successfully recreate this behaviour with two local 
> instances of Active MQ on my development machine running side by side 
> with broker B duplex connected to broker A, so it doesn't appear to be 
> an issue with the third party broker either.
>
>
>
> Our configuration (sensitive data removed) is as below:
>
>
>
> Any ideas on how we can resolve this?
>
>
>
> Many thanks
>
>
> Richard
>
>
>
> <beans
>
>   xmlns="http://www.springframework.org/schema/beans"
>
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>
>   xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd
>
>   http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>
>
>
>     <!-- Allows us to use system properties as variables in this 
> configuration file -->
>
>     <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderCon
> figurer">
>
>         <property name="locations">
>
>             
> <value>file:${activemq.conf}/credentials.properties</value>
>
>         </property>
>
>     </bean>
>
>
>
>    <!-- Allows accessing the server log -->
>
>     <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
>
>           lazy-init="false" scope="singleton"
>
>           init-method="start" destroy-method="stop">
>
>     </bean>
>
>
>
>     <!--
>
>         The <broker> element is used to configure the ActiveMQ broker.
>
>     -->
>
>     <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="localhost" dataDirectory="${activemq.data}"
> useVirtualDestSubs="true">
>
>
>
>         <destinationPolicy>
>
>             <policyMap>
>
>               <policyEntries>
>
>                 <policyEntry topic=">" >
>
>                     <!-- The constantPendingMessageLimitStrategy is 
> used to prevent
>
>                          slow topic consumers to block producers and 
> affect other consumers
>
>                          by limiting the number of messages that are 
> retained
>
>                          For more information, see:
>
>
>
>
> http://activemq.apache.org/slow-consumer-handling.html
>
>
>
>                     -->
>
>                   <pendingMessageLimitStrategy>
>
>                     <constantPendingMessageLimitStrategy 
> limit="1000"/>
>
>                   </pendingMessageLimitStrategy>
>
>                 </policyEntry>
>
>
> <policyEntry queue="test" enableAudit="false">
>
>
> <networkBridgeFilterFactory>
>
>
> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
>
>
> </networkBridgeFilterFactory>
>
>
> </policyEntry>
>
>
> <policyEntry topic="*sourceTopic*" enableAudit="false">
>
>
> <networkBridgeFilterFactory>
>
>
> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
>
>
> </networkBridgeFilterFactory>
>
>
> </policyEntry>
>
>               </policyEntries>
>
>             </policyMap>
>
>         </destinationPolicy>
>
>
>
>
>
>
>
>                                 <destinationInterceptors>
>
>
> <virtualDestinationInterceptor>
>
>                                                   
> <virtualDestinations>
>
>
> <compositeTopic forwardOnly="true" name="* sourceTopic*">
>
>
> <forwardTo>
>
>
> <queue physicalName="test" />
>
>
>                                                                 <queue 
> physicalName="test2" />
>
>
> </forwardTo>
>
>
> </compositeTopic>
>
>                                                   
> </virtualDestinations>
>
>
> </virtualDestinationInterceptor>
>
>                                 </destinationInterceptors>
>
>
>
>         <!--
>
>             The managementContext is used to configure how ActiveMQ is 
> exposed in
>
>             JMX. By default, ActiveMQ uses the MBean server that is 
> started by
>
>             the JVM. For more information, see:
>
>
>
>            http://activemq.apache.org/jmx.html
>
>         -->
>
>         <managementContext>
>
>             <managementContext createConnector="false"/>
>
>         </managementContext>
>
>
>
>         <!--
>
>             Configure message persistence for the broker. The default 
> persistence
>
>             mechanism is the KahaDB store (identified by the kahaDB tag).
>
>             For more information, see:
>
>
>
>             http://activemq.apache.org/persistence.html
>
>         -->
>
>         <persistenceAdapter>
>
>             <kahaDB directory="${activemq.data}/kahadb"/>
>
>         </persistenceAdapter>
>
>
>
>
>
>           <!--
>
>             The systemUsage controls the maximum amount of space the 
> broker will
>
>             use before disabling caching and/or slowing down producers.
> For more information, see:
>
>             http://activemq.apache.org/producer-flow-control.html
>
>           -->
>
>           <systemUsage>
>
>             <systemUsage>
>
>                 <memoryUsage>
>
>                     <memoryUsage percentOfJvmHeap="70" />
>
>                 </memoryUsage>
>
>                 <storeUsage>
>
>                     <storeUsage limit="100 gb"/>
>
>                 </storeUsage>
>
>                 <tempUsage>
>
>                     <tempUsage limit="50 gb"/>
>
>                 </tempUsage>
>
>             </systemUsage>
>
>         </systemUsage>
>
>
>
>         <!--
>
>             The transport connectors expose ActiveMQ over a given 
> protocol to
>
>             clients and other brokers. For more information, see:
>
>
>
>             http://activemq.apache.org/configuring-transports.html
>
>         -->
>
>         <transportConnectors>
>
>             <!-- DOS protection, limit concurrent connections to 1000 
> and frame size to 100MB -->
>
>             <transportConnector name="openwire" uri="tcp://
> 0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048
> 57600
> "/>
>
>             <!--
>
>                                                 <transportConnector 
> name="amqp" uri="amqp://
> 0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=10485
> 7600
> "/>
>
>             <transportConnector name="stomp" uri="stomp://
> 0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048
> 57600
> "/>
>
>             <transportConnector name="mqtt" uri="mqtt://
> 0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=10485
> 7600
> "/>
>
>             <transportConnector name="ws" uri="ws://
> 0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048
> 57600
> "/>
>
>                                                 -->
>
>         </transportConnectors>
>
>
>
>         <!-- destroy the spring context on shutdown to stop jetty -->
>
>         <shutdownHooks>
>
>             <bean xmlns="http://www.springframework.org/schema/beans"
> class="org.apache.activemq.hooks.SpringContextHook" />
>
>         </shutdownHooks>
>
>
>
>                                 <networkConnectors>
>
>                                                 <networkConnector 
> name="edf-to-thirdpartybroker-network-connector"
>
>                                                 useVirtualDestSubs="true"
>
>                                                 duplex="true"
>
>                                                 uri="static:(tcp:// 
> 127.0.0.1:61616)" userName="omitted" password="omitted" >
>
>
> <staticallyIncludedDestinations>
>
>
> <topic physicalName="*sourceTopic*"/>
>
>
> </staticallyIncludedDestinations>
>
>
> <excludedDestinations>
>
>
> <queue physicalName=">"/>
>
>
> </excludedDestinations>
>
>
>                                                 </networkConnector>
>
>                                 </networkConnectors>
>
>
>
>                                 <plugins>
>
>
> <simpleAuthenticationPlugin>
>
>                                                   <users>
>
>
> <authenticationUser username="system" password="omitted"
> groups="users,admins"/>
>
>
> <authenticationUser username="queueAUser" password="omitted"
> groups="queueAusers"/>
>
>
> <authenticationUser username="queueBUser" password="omitted"
> groups="queueBusers"/>
>
>                                                   </users>
>
>
> </simpleAuthenticationPlugin>
>
>
>
>                                                 <authorizationPlugin>
>
>         <map>
>
>           <authorizationMap>
>
>             <authorizationEntries>
>
>               <authorizationEntry queue=">" read="admins" write="admins"
> admin="admins" />
>
>               <authorizationEntry queue="test" read="queueAusers"
> write="queueAusers" admin="queueAUsers" />
>
>                                                   <authorizationEntry 
> queue="test2" read="queueBusers" write="queueBusers" 
> admin="queueBusers" />
>
>
>
>               <authorizationEntry topic=">" read="admins" write="admins"
> admin="admins" />
>
>
>
>
>
>               <authorizationEntry topic="ActiveMQ.Advisory.>"
> read="queueAusers,queueBusers" write="admins,queueAusers,queueBusers"
> admin="admins,queueAusers,queueBusers" />
>
>             </authorizationEntries>
>
>
>
>             <!-- let's assign roles to temporary destinations. comment 
> this entry if we don't want any roles assigned to temp destinations  
> -->
>
>
>
>           </authorizationMap>
>
>         </map>
>
>       </authorizationPlugin>
>
>
>
>
>
>                                 </plugins>
>
>
>
>
>
>
>
>
>
>                 </broker>
>
>
>
>     <!--
>
>         Enable web consoles, REST and Ajax APIs and demos
>
>         The web consoles requires by default login, you can disable 
> this in the jetty.xml file
>
>
>
>         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more 
> details
>
>     -->
>
>     <import resource="jetty.xml"/>
>
>
>
> </beans>
>
> <!-- END SNIPPET: example -->
>
>
>
> *Richard Cannock*
>
> *Solution Architect*
>
> Business Change and IT
>
> Customers
>
>
>
> *T:* +44 (0) 2081862465
>
> *M:* +44 (0)7729 320505
>
>
>
> Barnwood
>
> Barnett Way
>
> Gloucester, Gloucestershire, GL4 3RS
>
>
>
> [image: cid:4E18CA00-3CA9-4B94-970C-09C6DB028C24]
>
>
>
> edfenergy.com <http://www.edfenergy.com/>
>
>
>
> Please consider the environment before printing this email
>
>
>
> This e-mail and any files transmitted with it are confidential and may 
> be protected by legal privilege. If you are not the intended 
> recipient, please notify the sender and delete the e-mail from your system.
> This e-mail has been scanned for malicious content but the internet is 
> inherently insecure and EDF Energy Limited can not accept any 
> liability for the integrity of this message or its attachments.
> No employee or agent of EDF Energy Limited or any related company is 
> authorised to conclude any binding agreement on behalf of EDF Energy 
> Limited or any related company by e-mail.
>
> All e-mails sent and received by EDF Energy Limited are monitored to 
> ensure compliance with the company's information security policy.
> Executable and script files are not permitted through the EDF Energy 
> Limited mail gateway.  EDF Energy does not accept or send mails above 
> 30 Mb in size.
>
> EDF Energy Limited
> Registered in England and Wales No. 2366852 Registered Office: 90 
> Whitfield Street, London W1T 4EZ
>


This e-mail and any files transmitted with it are confidential and may be protected by legal privilege. If you are not the intended recipient, please notify the sender and delete the e-mail from your system.
This e-mail has been scanned for malicious content but the internet is inherently insecure and EDF Energy Limited cannot accept any liability for the integrity of this message or its attachments. No employee or agent of EDF Energy Limited or any related company is authorised to conclude any binding agreement on behalf of EDF Energy Limited or any related company by e-mail.
All e-mails sent and received by EDF Energy Limited are monitored to ensure compliance with the company's information security policy. Executable and script files are not permitted through the EDF Energy Limited mail gateway. EDF Energy does not accept or send mails above 30 Mb in size.

EDF Energy Limited Registered in England and Wales No. 2366852 Registered Office: 90 Whitfield Street, London W1T 4EZ

RE: compositeTopic and lost messages - broker restart

Posted by "Cannock, Richard" <Ri...@edfenergy.com>.
Apologies, mail server issue meant I wasn't sure that the original message had gotten through. Obviously it had!

These are the same issue so only looking for advice on one of them, thanks. 

_______
From: Justin Bertram [jbertram@apache.org]
Sent: 16 October 2019 17:09
To: users@activemq.apache.org
Subject: Re: compositeTopic and lost messages - broker restart

Richard, didn't you just send this same email to the ActiveMQ user list
(although with a slightly different subject)? Just want to clarify if they
are the same or different issues.


Justin

On Wed, Oct 16, 2019 at 10:40 AM Cannock, Richard <
Richard.Cannock@edfenergy.com> wrote:

> Hi,
>
>
>
> We are trying to enlist in a network of brokers arrangement with a third
> party broker hosted externally to us. We wish to receive messages from a
> topic (called *sourceTopic*) and forward to internal queues on our local
> broker for load balancing and throughput purposes using virtual
> destinations, a compositeTopic forwarding to two local queues on our local
> broker.
>
>
>
> In order to avoid implementing ingress firewall rules for the third party
> onto our estate, we have created a duplex network connector so that we
> initiate the outbound tcp connection. We have excluded our local broker
> queues from propagating to the other broker using the
> <excludedDestinations> configuration element.
>
>
>
> Our configuration below works fine, as long as our local broker is up, and
> we have an active consumer. In this case, messages published to the third
> party broker *sourceTopic* are successfully received on our composite
> topic forwarded queues, test1 and test2.  To avoid any confusion, the
> source message producer publishing to *sourceTopic *is explicitly using
> PersistentDelivery (even though I believe that is the default anyway).
>
>
>
> If we restart our broker, any messages that were published to the
> *sourceTopic* in the intervening period that our broker was down, are not
> received onto local forwarded queues test and test2.
>
>
>
> It would appear that the network connector configuration and\or
> compositeTopic do not result in a durable topic subscription to the
> *sourceTopic* on the other broker.
>
>
>
> I can also successfully recreate this behaviour with two local instances
> of Active MQ on my development machine running side by side with broker B
> duplex connected to broker A, so it doesn’t appear to be an issue with the
> third party broker either.
>
>
>
> Our configuration (sensitive data removed) is as below:
>
>
>
> Any ideas on how we can resolve this?
>
>
>
> Many thanks
>
>
> Richard
>
>
>
> <beans
>
>   xmlns="http://www.springframework.org/schema/beans"
>
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>
>   xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd
>
>   http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>
>
>
>     <!-- Allows us to use system properties as variables in this
> configuration file -->
>
>     <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>
>         <property name="locations">
>
>             <value>file:${activemq.conf}/credentials.properties</value>
>
>         </property>
>
>     </bean>
>
>
>
>    <!-- Allows accessing the server log -->
>
>     <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
>
>           lazy-init="false" scope="singleton"
>
>           init-method="start" destroy-method="stop">
>
>     </bean>
>
>
>
>     <!--
>
>         The <broker> element is used to configure the ActiveMQ broker.
>
>     -->
>
>     <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="localhost" dataDirectory="${activemq.data}"
> useVirtualDestSubs="true">
>
>
>
>         <destinationPolicy>
>
>             <policyMap>
>
>               <policyEntries>
>
>                 <policyEntry topic=">" >
>
>                     <!-- The constantPendingMessageLimitStrategy is used
> to prevent
>
>                          slow topic consumers to block producers and
> affect other consumers
>
>                          by limiting the number of messages that are
> retained
>
>                          For more information, see:
>
>
>
>
> http://activemq.apache.org/slow-consumer-handling.html
>
>
>
>                     -->
>
>                   <pendingMessageLimitStrategy>
>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>
>                   </pendingMessageLimitStrategy>
>
>                 </policyEntry>
>
>
> <policyEntry queue="test" enableAudit="false">
>
>
> <networkBridgeFilterFactory>
>
>
> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
>
>
> </networkBridgeFilterFactory>
>
>
> </policyEntry>
>
>
> <policyEntry topic="*sourceTopic*" enableAudit="false">
>
>
> <networkBridgeFilterFactory>
>
>
> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
>
>
> </networkBridgeFilterFactory>
>
>
> </policyEntry>
>
>               </policyEntries>
>
>             </policyMap>
>
>         </destinationPolicy>
>
>
>
>
>
>
>
>                                 <destinationInterceptors>
>
>
> <virtualDestinationInterceptor>
>
>                                                   <virtualDestinations>
>
>
> <compositeTopic forwardOnly="true" name="* sourceTopic*">
>
>
> <forwardTo>
>
>
> <queue physicalName="test" />
>
>
>                                                                 <queue
> physicalName="test2" />
>
>
> </forwardTo>
>
>
> </compositeTopic>
>
>                                                   </virtualDestinations>
>
>
> </virtualDestinationInterceptor>
>
>                                 </destinationInterceptors>
>
>
>
>         <!--
>
>             The managementContext is used to configure how ActiveMQ is
> exposed in
>
>             JMX. By default, ActiveMQ uses the MBean server that is
> started by
>
>             the JVM. For more information, see:
>
>
>
>            http://activemq.apache.org/jmx.html
>
>         -->
>
>         <managementContext>
>
>             <managementContext createConnector="false"/>
>
>         </managementContext>
>
>
>
>         <!--
>
>             Configure message persistence for the broker. The default
> persistence
>
>             mechanism is the KahaDB store (identified by the kahaDB tag).
>
>             For more information, see:
>
>
>
>             http://activemq.apache.org/persistence.html
>
>         -->
>
>         <persistenceAdapter>
>
>             <kahaDB directory="${activemq.data}/kahadb"/>
>
>         </persistenceAdapter>
>
>
>
>
>
>           <!--
>
>             The systemUsage controls the maximum amount of space the
> broker will
>
>             use before disabling caching and/or slowing down producers.
> For more information, see:
>
>             http://activemq.apache.org/producer-flow-control.html
>
>           -->
>
>           <systemUsage>
>
>             <systemUsage>
>
>                 <memoryUsage>
>
>                     <memoryUsage percentOfJvmHeap="70" />
>
>                 </memoryUsage>
>
>                 <storeUsage>
>
>                     <storeUsage limit="100 gb"/>
>
>                 </storeUsage>
>
>                 <tempUsage>
>
>                     <tempUsage limit="50 gb"/>
>
>                 </tempUsage>
>
>             </systemUsage>
>
>         </systemUsage>
>
>
>
>         <!--
>
>             The transport connectors expose ActiveMQ over a given protocol
> to
>
>             clients and other brokers. For more information, see:
>
>
>
>             http://activemq.apache.org/configuring-transports.html
>
>         -->
>
>         <transportConnectors>
>
>             <!-- DOS protection, limit concurrent connections to 1000 and
> frame size to 100MB -->
>
>             <transportConnector name="openwire" uri="tcp://
> 0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>             <!--
>
>                                                 <transportConnector
> name="amqp" uri="amqp://
> 0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>             <transportConnector name="stomp" uri="stomp://
> 0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>             <transportConnector name="mqtt" uri="mqtt://
> 0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>             <transportConnector name="ws" uri="ws://
> 0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>                                                 -->
>
>         </transportConnectors>
>
>
>
>         <!-- destroy the spring context on shutdown to stop jetty -->
>
>         <shutdownHooks>
>
>             <bean xmlns="http://www.springframework.org/schema/beans"
> class="org.apache.activemq.hooks.SpringContextHook" />
>
>         </shutdownHooks>
>
>
>
>                                 <networkConnectors>
>
>                                                 <networkConnector
> name="edf-to-thirdpartybroker-network-connector"
>
>                                                 useVirtualDestSubs="true"
>
>                                                 duplex="true"
>
>                                                 uri="static:(tcp://
> 127.0.0.1:61616)" userName="omitted" password="omitted" >
>
>
> <staticallyIncludedDestinations>
>
>
> <topic physicalName="*sourceTopic*"/>
>
>
> </staticallyIncludedDestinations>
>
>
> <excludedDestinations>
>
>
> <queue physicalName=">"/>
>
>
> </excludedDestinations>
>
>
>                                                 </networkConnector>
>
>                                 </networkConnectors>
>
>
>
>                                 <plugins>
>
>
> <simpleAuthenticationPlugin>
>
>                                                   <users>
>
>
> <authenticationUser username="system" password="omitted"
> groups="users,admins"/>
>
>
> <authenticationUser username="queueAUser" password="omitted"
> groups="queueAusers"/>
>
>
> <authenticationUser username="queueBUser" password="omitted"
> groups="queueBusers"/>
>
>                                                   </users>
>
>
> </simpleAuthenticationPlugin>
>
>
>
>                                                 <authorizationPlugin>
>
>         <map>
>
>           <authorizationMap>
>
>             <authorizationEntries>
>
>               <authorizationEntry queue=">" read="admins" write="admins"
> admin="admins" />
>
>               <authorizationEntry queue="test" read="queueAusers"
> write="queueAusers" admin="queueAUsers" />
>
>                                                   <authorizationEntry
> queue="test2" read="queueBusers" write="queueBusers" admin="queueBusers" />
>
>
>
>               <authorizationEntry topic=">" read="admins" write="admins"
> admin="admins" />
>
>
>
>
>
>               <authorizationEntry topic="ActiveMQ.Advisory.>"
> read="queueAusers,queueBusers" write="admins,queueAusers,queueBusers"
> admin="admins,queueAusers,queueBusers" />
>
>             </authorizationEntries>
>
>
>
>             <!-- let's assign roles to temporary destinations. comment
> this entry if we don't want any roles assigned to temp destinations  -->
>
>
>
>           </authorizationMap>
>
>         </map>
>
>       </authorizationPlugin>
>
>
>
>
>
>                                 </plugins>
>
>
>
>
>
>
>
>
>
>                 </broker>
>
>
>
>     <!--
>
>         Enable web consoles, REST and Ajax APIs and demos
>
>         The web consoles requires by default login, you can disable this
> in the jetty.xml file
>
>
>
>         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
>
>     -->
>
>     <import resource="jetty.xml"/>
>
>
>
> </beans>
>
> <!-- END SNIPPET: example -->
>
>
>
> *Richard Cannock*
>
> *Solution Architect*
>
> Business Change and IT
>
> Customers
>
>
>
> *T:* +44 (0) 2081862465
>
> *M:* +44 (0)7729 320505
>
>
>
> Barnwood
>
> Barnett Way
>
> Gloucester, Gloucestershire, GL4 3RS
>
>
>
> [image: cid:4E18CA00-3CA9-4B94-970C-09C6DB028C24]
>
>
>
> edfenergy.com <http://www.edfenergy.com/>
>
>
>
> Please consider the environment before printing this email
>
>
>
> This e-mail and any files transmitted with it are confidential and may be
> protected by legal privilege. If you are not the intended recipient, please
> notify the sender and delete the e-mail from your system.
> This e-mail has been scanned for malicious content but the internet is
> inherently insecure and EDF Energy Limited can not accept any liability for
> the integrity of this message or its attachments.
> No employee or agent of EDF Energy Limited or any related company is
> authorised to conclude any binding agreement on behalf of EDF Energy
> Limited or any related company by e-mail.
>
> All e-mails sent and received by EDF Energy Limited are monitored to
> ensure compliance with the company's information security policy.
> Executable and script files are not permitted through the EDF Energy
> Limited mail gateway.  EDF Energy does not accept or send mails above 30 Mb
> in size.
>
> EDF Energy Limited
> Registered in England and Wales No. 2366852
> Registered Office: 90 Whitfield Street, London W1T 4EZ
>


This e-mail and any files transmitted with it are confidential and may be protected by legal privilege. If you are not the intended recipient, please notify the sender and delete the e-mail from your system.
This e-mail has been scanned for malicious content but the internet is inherently insecure and EDF Energy Limited cannot accept any liability for the integrity of this message or its attachments. No employee or agent of EDF Energy Limited or any related company is authorised to conclude any binding agreement on behalf of EDF Energy Limited or any related company by e-mail.
All e-mails sent and received by EDF Energy Limited are monitored to ensure compliance with the company's information security policy. Executable and script files are not permitted through the EDF Energy Limited mail gateway. EDF Energy does not accept or send mails above 30 Mb in size.

EDF Energy Limited Registered in England and Wales No. 2366852 Registered Office: 90 Whitfield Street, London W1T 4EZ

Re: compositeTopic and lost messages - broker restart

Posted by Justin Bertram <jb...@apache.org>.
Richard, didn't you just send this same email to the ActiveMQ user list
(although with a slightly different subject)? Just want to clarify if they
are the same or different issues.


Justin

On Wed, Oct 16, 2019 at 10:40 AM Cannock, Richard <
Richard.Cannock@edfenergy.com> wrote:

> Hi,
>
>
>
> We are trying to enlist in a network of brokers arrangement with a third
> party broker hosted externally to us. We wish to receive messages from a
> topic (called *sourceTopic*) and forward to internal queues on our local
> broker for load balancing and throughput purposes using virtual
> destinations, a compositeTopic forwarding to two local queues on our local
> broker.
>
>
>
> In order to avoid implementing ingress firewall rules for the third party
> onto our estate, we have created a duplex network connector so that we
> initiate the outbound tcp connection. We have excluded our local broker
> queues from propagating to the other broker using the
> <excludedDestinations> configuration element.
>
>
>
> Our configuration below works fine, as long as our local broker is up, and
> we have an active consumer. In this case, messages published to the third
> party broker *sourceTopic* are successfully received on our composite
> topic forwarded queues, test1 and test2.  To avoid any confusion, the
> source message producer publishing to *sourceTopic *is explicitly using
> PersistentDelivery (even though I believe that is the default anyway).
>
>
>
> If we restart our broker, any messages that were published to the
> *sourceTopic* in the intervening period that our broker was down, are not
> received onto local forwarded queues test and test2.
>
>
>
> It would appear that the network connector configuration and\or
> compositeTopic do not result in a durable topic subscription to the
> *sourceTopic* on the other broker.
>
>
>
> I can also successfully recreate this behaviour with two local instances
> of Active MQ on my development machine running side by side with broker B
> duplex connected to broker A, so it doesn’t appear to be an issue with the
> third party broker either.
>
>
>
> Our configuration (sensitive data removed) is as below:
>
>
>
> Any ideas on how we can resolve this?
>
>
>
> Many thanks
>
>
> Richard
>
>
>
> <beans
>
>   xmlns="http://www.springframework.org/schema/beans"
>
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>
>   xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd
>
>   http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>
>
>
>     <!-- Allows us to use system properties as variables in this
> configuration file -->
>
>     <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>
>         <property name="locations">
>
>             <value>file:${activemq.conf}/credentials.properties</value>
>
>         </property>
>
>     </bean>
>
>
>
>    <!-- Allows accessing the server log -->
>
>     <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
>
>           lazy-init="false" scope="singleton"
>
>           init-method="start" destroy-method="stop">
>
>     </bean>
>
>
>
>     <!--
>
>         The <broker> element is used to configure the ActiveMQ broker.
>
>     -->
>
>     <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="localhost" dataDirectory="${activemq.data}"
> useVirtualDestSubs="true">
>
>
>
>         <destinationPolicy>
>
>             <policyMap>
>
>               <policyEntries>
>
>                 <policyEntry topic=">" >
>
>                     <!-- The constantPendingMessageLimitStrategy is used
> to prevent
>
>                          slow topic consumers to block producers and
> affect other consumers
>
>                          by limiting the number of messages that are
> retained
>
>                          For more information, see:
>
>
>
>
> http://activemq.apache.org/slow-consumer-handling.html
>
>
>
>                     -->
>
>                   <pendingMessageLimitStrategy>
>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>
>                   </pendingMessageLimitStrategy>
>
>                 </policyEntry>
>
>
> <policyEntry queue="test" enableAudit="false">
>
>
> <networkBridgeFilterFactory>
>
>
> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
>
>
> </networkBridgeFilterFactory>
>
>
> </policyEntry>
>
>
> <policyEntry topic="*sourceTopic*" enableAudit="false">
>
>
> <networkBridgeFilterFactory>
>
>
> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
>
>
> </networkBridgeFilterFactory>
>
>
> </policyEntry>
>
>               </policyEntries>
>
>             </policyMap>
>
>         </destinationPolicy>
>
>
>
>
>
>
>
>                                 <destinationInterceptors>
>
>
> <virtualDestinationInterceptor>
>
>                                                   <virtualDestinations>
>
>
> <compositeTopic forwardOnly="true" name="* sourceTopic*">
>
>
> <forwardTo>
>
>
> <queue physicalName="test" />
>
>
>                                                                 <queue
> physicalName="test2" />
>
>
> </forwardTo>
>
>
> </compositeTopic>
>
>                                                   </virtualDestinations>
>
>
> </virtualDestinationInterceptor>
>
>                                 </destinationInterceptors>
>
>
>
>         <!--
>
>             The managementContext is used to configure how ActiveMQ is
> exposed in
>
>             JMX. By default, ActiveMQ uses the MBean server that is
> started by
>
>             the JVM. For more information, see:
>
>
>
>            http://activemq.apache.org/jmx.html
>
>         -->
>
>         <managementContext>
>
>             <managementContext createConnector="false"/>
>
>         </managementContext>
>
>
>
>         <!--
>
>             Configure message persistence for the broker. The default
> persistence
>
>             mechanism is the KahaDB store (identified by the kahaDB tag).
>
>             For more information, see:
>
>
>
>             http://activemq.apache.org/persistence.html
>
>         -->
>
>         <persistenceAdapter>
>
>             <kahaDB directory="${activemq.data}/kahadb"/>
>
>         </persistenceAdapter>
>
>
>
>
>
>           <!--
>
>             The systemUsage controls the maximum amount of space the
> broker will
>
>             use before disabling caching and/or slowing down producers.
> For more information, see:
>
>             http://activemq.apache.org/producer-flow-control.html
>
>           -->
>
>           <systemUsage>
>
>             <systemUsage>
>
>                 <memoryUsage>
>
>                     <memoryUsage percentOfJvmHeap="70" />
>
>                 </memoryUsage>
>
>                 <storeUsage>
>
>                     <storeUsage limit="100 gb"/>
>
>                 </storeUsage>
>
>                 <tempUsage>
>
>                     <tempUsage limit="50 gb"/>
>
>                 </tempUsage>
>
>             </systemUsage>
>
>         </systemUsage>
>
>
>
>         <!--
>
>             The transport connectors expose ActiveMQ over a given protocol
> to
>
>             clients and other brokers. For more information, see:
>
>
>
>             http://activemq.apache.org/configuring-transports.html
>
>         -->
>
>         <transportConnectors>
>
>             <!-- DOS protection, limit concurrent connections to 1000 and
> frame size to 100MB -->
>
>             <transportConnector name="openwire" uri="tcp://
> 0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>             <!--
>
>                                                 <transportConnector
> name="amqp" uri="amqp://
> 0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>             <transportConnector name="stomp" uri="stomp://
> 0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>             <transportConnector name="mqtt" uri="mqtt://
> 0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>             <transportConnector name="ws" uri="ws://
> 0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>                                                 -->
>
>         </transportConnectors>
>
>
>
>         <!-- destroy the spring context on shutdown to stop jetty -->
>
>         <shutdownHooks>
>
>             <bean xmlns="http://www.springframework.org/schema/beans"
> class="org.apache.activemq.hooks.SpringContextHook" />
>
>         </shutdownHooks>
>
>
>
>                                 <networkConnectors>
>
>                                                 <networkConnector
> name="edf-to-thirdpartybroker-network-connector"
>
>                                                 useVirtualDestSubs="true"
>
>                                                 duplex="true"
>
>                                                 uri="static:(tcp://
> 127.0.0.1:61616)" userName="omitted" password="omitted" >
>
>
> <staticallyIncludedDestinations>
>
>
> <topic physicalName="*sourceTopic*"/>
>
>
> </staticallyIncludedDestinations>
>
>
> <excludedDestinations>
>
>
> <queue physicalName=">"/>
>
>
> </excludedDestinations>
>
>
>                                                 </networkConnector>
>
>                                 </networkConnectors>
>
>
>
>                                 <plugins>
>
>
> <simpleAuthenticationPlugin>
>
>                                                   <users>
>
>
> <authenticationUser username="system" password="omitted"
> groups="users,admins"/>
>
>
> <authenticationUser username="queueAUser" password="omitted"
> groups="queueAusers"/>
>
>
> <authenticationUser username="queueBUser" password="omitted"
> groups="queueBusers"/>
>
>                                                   </users>
>
>
> </simpleAuthenticationPlugin>
>
>
>
>                                                 <authorizationPlugin>
>
>         <map>
>
>           <authorizationMap>
>
>             <authorizationEntries>
>
>               <authorizationEntry queue=">" read="admins" write="admins"
> admin="admins" />
>
>               <authorizationEntry queue="test" read="queueAusers"
> write="queueAusers" admin="queueAUsers" />
>
>                                                   <authorizationEntry
> queue="test2" read="queueBusers" write="queueBusers" admin="queueBusers" />
>
>
>
>               <authorizationEntry topic=">" read="admins" write="admins"
> admin="admins" />
>
>
>
>
>
>               <authorizationEntry topic="ActiveMQ.Advisory.>"
> read="queueAusers,queueBusers" write="admins,queueAusers,queueBusers"
> admin="admins,queueAusers,queueBusers" />
>
>             </authorizationEntries>
>
>
>
>             <!-- let's assign roles to temporary destinations. comment
> this entry if we don't want any roles assigned to temp destinations  -->
>
>
>
>           </authorizationMap>
>
>         </map>
>
>       </authorizationPlugin>
>
>
>
>
>
>                                 </plugins>
>
>
>
>
>
>
>
>
>
>                 </broker>
>
>
>
>     <!--
>
>         Enable web consoles, REST and Ajax APIs and demos
>
>         The web consoles requires by default login, you can disable this
> in the jetty.xml file
>
>
>
>         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
>
>     -->
>
>     <import resource="jetty.xml"/>
>
>
>
> </beans>
>
> <!-- END SNIPPET: example -->
>
>
>
> *Richard Cannock*
>
> *Solution Architect*
>
> Business Change and IT
>
> Customers
>
>
>
> *T:* +44 (0) 2081862465
>
> *M:* +44 (0)7729 320505
>
>
>
> Barnwood
>
> Barnett Way
>
> Gloucester, Gloucestershire, GL4 3RS
>
>
>
> [image: cid:4E18CA00-3CA9-4B94-970C-09C6DB028C24]
>
>
>
> edfenergy.com <http://www.edfenergy.com/>
>
>
>
> Please consider the environment before printing this email
>
>
>
> This e-mail and any files transmitted with it are confidential and may be
> protected by legal privilege. If you are not the intended recipient, please
> notify the sender and delete the e-mail from your system.
> This e-mail has been scanned for malicious content but the internet is
> inherently insecure and EDF Energy Limited can not accept any liability for
> the integrity of this message or its attachments.
> No employee or agent of EDF Energy Limited or any related company is
> authorised to conclude any binding agreement on behalf of EDF Energy
> Limited or any related company by e-mail.
>
> All e-mails sent and received by EDF Energy Limited are monitored to
> ensure compliance with the company's information security policy.
> Executable and script files are not permitted through the EDF Energy
> Limited mail gateway.  EDF Energy does not accept or send mails above 30 Mb
> in size.
>
> EDF Energy Limited
> Registered in England and Wales No. 2366852
> Registered Office: 90 Whitfield Street, London W1T 4EZ
>