You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Márcio Monteiro (JIRA)" <ji...@apache.org> on 2018/09/13 18:46:00 UTC

[jira] [Created] (AMQ-7051) WebConsole is adding a wrong cache selector to virtualSelectorCacheBrokerPlugin

Márcio Monteiro created AMQ-7051:
------------------------------------

             Summary: WebConsole is adding a wrong cache selector to virtualSelectorCacheBrokerPlugin
                 Key: AMQ-7051
                 URL: https://issues.apache.org/jira/browse/AMQ-7051
             Project: ActiveMQ
          Issue Type: Bug
          Components: Selector
    Affects Versions: 5.15.6
         Environment: {code:java}
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

   <!-- Allows accessing the server log -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker1" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>

                <!-- NEW -->
                <policyEntry queue=">" >
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
        
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        <!-- NEW -->
        <destinationInterceptors>
            <virtualDestinationInterceptor>
                <virtualDestinations>
            <virtualTopic name="VirtualTopic.>" prefix="Consumer.*."   selectorAware="true" />
                </virtualDestinations>
            </virtualDestinationInterceptor>
        </destinationInterceptors>

        <!-- NEW -->
        <plugins>
            <virtualSelectorCacheBrokerPlugin persistFile = "${activemq.data}/kahadb/selectorcache.data"/>
    </plugins>

        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->{code}
            Reporter: Márcio Monteiro


We have topic called VirtualTopic.document and four nodes (n1, n2, n2 and n4) listening to their queues created by the VirtualTopic strategy:
 * Queue n1: Consumer.n1.VirtualTopic.document
 * Queue n2: Consumer.n2.VirtualTopic.document
 * Queue n3: Consumer.n3.VirtualTopic.document
 * Queue n4: Consumer.n4.VirtualTopic.document

Each node has its own selector, so that AMQ learns where to forward each message (selectorAware=true on virtual topic). In order to avoid loss of messages when clients are offline, we adopted the virtualSelectorCacheBrokerPlugin.

It works well, so far. If I send a message matching the selectors of n1 and n2, the message is forwarded to the correct queues (Consumer.n1.VirtualTopic.document and Consumer.n2.VirtualTopic.document).

The problem: if I browse on the web console to the see the active consumers on queue Consumer.n3.VirtualTopic.document ( [http://localhost:8161/admin/queueConsumers.jsp?JMSDestination=Consumer.n3.VirtualTopic.document|http://192.168.132.112:8161/admin/queueConsumers.jsp?JMSDestination=Consumer.n3.VirtualTopic.document] ) and send a message to any other node (n1, n2 or n4), the message is also sent to the unwanted queue Consumer.n3.VirtualTopic.document.

I could also notice that only by browsing on webconsole to check the active consumers of a queue is enough to see a change in the binary content of the persitFile of virtualSelectorCacheBrokerPlugin. It seems also that disabling virtualSelectorCacheBrokerPlugin fixes the problem, but then we start to lose messages for offline nodes, which is unacceptable in our scenario.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)