You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Márcio Monteiro (JIRA)" <ji...@apache.org> on 2018/09/13 18:47:00 UTC

[jira] [Updated] (AMQ-7051) WebConsole is adding a wrong cache selector to virtualSelectorCacheBrokerPlugin

     [ https://issues.apache.org/jira/browse/AMQ-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Márcio Monteiro updated AMQ-7051:
---------------------------------
    Attachment: activemq.xml

> WebConsole is adding a wrong cache selector to virtualSelectorCacheBrokerPlugin
> -------------------------------------------------------------------------------
>
>                 Key: AMQ-7051
>                 URL: https://issues.apache.org/jira/browse/AMQ-7051
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Selector
>    Affects Versions: 5.15.6
>         Environment: {code:java}
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
>   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
>     <!-- Allows us to use system properties as variables in this configuration file -->
>     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         <property name="locations">
>             <value>file:${activemq.conf}/credentials.properties</value>
>         </property>
>     </bean>
>    <!-- Allows accessing the server log -->
>     <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
>           lazy-init="false" scope="singleton"
>           init-method="start" destroy-method="stop">
>     </bean>
>     <!--
>         The <broker> element is used to configure the ActiveMQ broker.
>     -->
>     <broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker1" dataDirectory="${activemq.data}">
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry topic=">" >
>                     <!-- The constantPendingMessageLimitStrategy is used to prevent
>                          slow topic consumers to block producers and affect other consumers
>                          by limiting the number of messages that are retained
>                          For more information, see:
>                          http://activemq.apache.org/slow-consumer-handling.html
>                     -->
>                   <pendingMessageLimitStrategy>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>                   </pendingMessageLimitStrategy>
>                 </policyEntry>
>                 <!-- NEW -->
>                 <policyEntry queue=">" >
>                   <pendingMessageLimitStrategy>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>                   </pendingMessageLimitStrategy>
>                 </policyEntry>
>         
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>         <!-- NEW -->
>         <destinationInterceptors>
>             <virtualDestinationInterceptor>
>                 <virtualDestinations>
>             <virtualTopic name="VirtualTopic.>" prefix="Consumer.*."   selectorAware="true" />
>                 </virtualDestinations>
>             </virtualDestinationInterceptor>
>         </destinationInterceptors>
>         <!-- NEW -->
>         <plugins>
>             <virtualSelectorCacheBrokerPlugin persistFile = "${activemq.data}/kahadb/selectorcache.data"/>
>     </plugins>
>         <!--
>             The managementContext is used to configure how ActiveMQ is exposed in
>             JMX. By default, ActiveMQ uses the MBean server that is started by
>             the JVM. For more information, see:
>             http://activemq.apache.org/jmx.html
>         -->
>         <managementContext>
>             <managementContext createConnector="false"/>
>         </managementContext>
>         <!--
>             Configure message persistence for the broker. The default persistence
>             mechanism is the KahaDB store (identified by the kahaDB tag).
>             For more information, see:
>             http://activemq.apache.org/persistence.html
>         -->
>         <persistenceAdapter>
>             <kahaDB directory="${activemq.data}/kahadb"/>
>         </persistenceAdapter>
>           <!--
>             The systemUsage controls the maximum amount of space the broker will
>             use before disabling caching and/or slowing down producers. For more information, see:
>             http://activemq.apache.org/producer-flow-control.html
>           -->
>           <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage percentOfJvmHeap="70" />
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="100 gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="50 gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>         <!--
>             The transport connectors expose ActiveMQ over a given protocol to
>             clients and other brokers. For more information, see:
>             http://activemq.apache.org/configuring-transports.html
>         -->
>         <transportConnectors>
>             <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
>             <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>             <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>             <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>             <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>             <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>         </transportConnectors>
>         <!-- destroy the spring context on shutdown to stop jetty -->
>         <shutdownHooks>
>             <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
>         </shutdownHooks>
>     </broker>
>     <!--
>         Enable web consoles, REST and Ajax APIs and demos
>         The web consoles requires by default login, you can disable this in the jetty.xml file
>         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
>     -->
>     <import resource="jetty.xml"/>
> </beans>
> <!-- END SNIPPET: example -->{code}
>            Reporter: Márcio Monteiro
>            Priority: Critical
>         Attachments: activemq.xml
>
>
> We have topic called VirtualTopic.document and four nodes (n1, n2, n2 and n4) listening to their queues created by the VirtualTopic strategy:
>  * Queue n1: Consumer.n1.VirtualTopic.document
>  * Queue n2: Consumer.n2.VirtualTopic.document
>  * Queue n3: Consumer.n3.VirtualTopic.document
>  * Queue n4: Consumer.n4.VirtualTopic.document
> Each node has its own selector, so that AMQ learns where to forward each message (selectorAware=true on virtual topic). In order to avoid loss of messages when clients are offline, we adopted the virtualSelectorCacheBrokerPlugin.
> It works well, so far. If I send a message matching the selectors of n1 and n2, the message is forwarded to the correct queues (Consumer.n1.VirtualTopic.document and Consumer.n2.VirtualTopic.document).
> The problem: if I browse on the web console to the see the active consumers on queue Consumer.n3.VirtualTopic.document ( [http://localhost:8161/admin/queueConsumers.jsp?JMSDestination=Consumer.n3.VirtualTopic.document|http://192.168.132.112:8161/admin/queueConsumers.jsp?JMSDestination=Consumer.n3.VirtualTopic.document] ) and send a message to any other node (n1, n2 or n4), the message is also sent to the unwanted queue Consumer.n3.VirtualTopic.document.
> I could also notice that only by browsing on webconsole to check the active consumers of a queue is enough to see a change in the binary content of the persitFile of virtualSelectorCacheBrokerPlugin. It seems also that disabling virtualSelectorCacheBrokerPlugin fixes the problem, but then we start to lose messages for offline nodes, which is unacceptable in our scenario.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)