You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by S Sharma <ss...@gmail.com> on 2011/01/26 23:34:47 UTC

Persistent messages drops in pure master-slave configuration

Hi all,

I am getting significant message drops in activemq-5.4.2 pure master-slave
configuration. I am testing topics using the simple producer & asynchronous
consumer from the activemq-cpp-library-3.2.4 (code attached).

The master and slave configurations are also attached. I am testing
(persistent) message drops by:

1. Create four producers from SimpleProducer.cpp, having line 135 as:
string text = (string)("App1 Seq No: ") + ix_str;
string text = (string)("App2 Seq No: ") + ix_str;
string text = (string)("App3 Seq No: ") + ix_str; and
string text = (string)("App4 Seq No: ") + ix_str;

2. Create four consumers (identical binaries) from SimpleAsyncConsumer.cpp
all listening to topic "TEST.FOO".

3. Start all consumers:
./simple_async_consumer1 > consumer1, ./simple_async_consumer2 > consumer2,
./simple_async_consumer3 > consumer3, ./simple_async_consumer4 > consumer4

4. Then start all producers:
./simple_producer1  > producer1 & ./simple_producer2 > producer2  &
./simple_producer3 > producer3 & ./simple_producer4 > producer4 &

5. Terminate master broker: kill -9 <MasterPID>

In both slave configurations: shutdownOnMasterFailure="true" and
shutdownOnMasterFailure="false" with manual copying kahadb, I am noticing
that the four consumers do not receive several messages which are sent
(likely during the failover process). For instance, attached consumer1.txt
file has <40001 messages received.

Can someone please help me decipher what is going wrong? If you need any
clarification on the test case, please let me know.

Master:

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">

    <broker
        xmlns="http://activemq.apache.org/schema/core"
        brokerName="master"
        dataDirectory="/home/activemq/activemq/data"
        waitForSlave="false" >
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="false"
memoryLimit="1500mb">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="false"
memoryLimit="1500mb">
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>


        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/kahadb"
                cleanupInterval="300000" checkpointInterval="50000"
                journalMaxWriteBatchSize="6400k"
                journalMaxFileLength="100g"
                indexCacheSize="100000" indexWriteBatchSize="100000"
             />
        </persistenceAdapter>

        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://
0.0.0.0:61616?wireFormat.maxInactivityDuration=0"/>
        </transportConnectors>

    </broker>

    <import resource="jetty.xml"/>

</beans>


Slave:

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
  http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">

  <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>

  <broker masterConnectorURI="tcp://masterIPAddress:61616"
brokerName="slave" xmlns="http://activemq.apache.org/schema/core"
shutdownOnMasterFailure="false" >

        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry queue=">" optimizedDispatch="true" />
                    <policyEntry topic=">" optimizedDispatch="true" />
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/kahadb"
                cleanupInterval="300000" checkpointInterval="50000"
                journalMaxWriteBatchSize="6400k"
                journalMaxFileLength="100g"
                indexCacheSize="100000" indexWriteBatchSize="100000"
             />
        </persistenceAdapter>
    <transportConnectors>
      <transportConnector uri="tcp://0.0.0.0:61616"/>
    </transportConnectors>
  </broker>
</beans>

Re: Persistent messages drops in pure master-slave configuration

Posted by Gary Tully <ga...@gmail.com>.
you would need durable subs to guarantee delivery, if the producers
failover before all the consumers they will miss messages sent while
they were missing.

On 28 January 2011 15:39, S Sharma <ss...@gmail.com> wrote:
> Bumping this request. This issue is hindering us using ActiveMQ in our
> enterprise.
>
> Thanks so much!
>
> Sharma
>
>
> On Wed, Jan 26, 2011 at 5:34 PM, S Sharma <ss...@gmail.com> wrote:
>
>> Hi all,
>>
>> I am getting significant message drops in activemq-5.4.2 pure master-slave
>> configuration. I am testing topics using the simple producer & asynchronous
>> consumer from the activemq-cpp-library-3.2.4 (code attached).
>>
>> The master and slave configurations are also attached. I am testing
>> (persistent) message drops by:
>>
>> 1. Create four producers from SimpleProducer.cpp, having line 135 as:
>> string text = (string)("App1 Seq No: ") + ix_str;
>> string text = (string)("App2 Seq No: ") + ix_str;
>> string text = (string)("App3 Seq No: ") + ix_str; and
>> string text = (string)("App4 Seq No: ") + ix_str;
>>
>> 2. Create four consumers (identical binaries) from SimpleAsyncConsumer.cpp
>> all listening to topic "TEST.FOO".
>>
>> 3. Start all consumers:
>> ./simple_async_consumer1 > consumer1, ./simple_async_consumer2 > consumer2,
>> ./simple_async_consumer3 > consumer3, ./simple_async_consumer4 > consumer4
>>
>> 4. Then start all producers:
>> ./simple_producer1  > producer1 & ./simple_producer2 > producer2  &
>> ./simple_producer3 > producer3 & ./simple_producer4 > producer4 &
>>
>> 5. Terminate master broker: kill -9 <MasterPID>
>>
>> In both slave configurations: shutdownOnMasterFailure="true" and
>> shutdownOnMasterFailure="false" with manual copying kahadb, I am noticing
>> that the four consumers do not receive several messages which are sent
>> (likely during the failover process). For instance, attached consumer1.txt
>> file has <40001 messages received.
>>
>> Can someone please help me decipher what is going wrong? If you need any
>> clarification on the test case, please let me know.
>>
>> Master:
>>
>> <beans
>>   xmlns="http://www.springframework.org/schema/beans"
>>   xmlns:amq="http://activemq.apache.org/schema/core"
>>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>   xsi:schemaLocation="http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>>   http://activemq.apache.org/schema/core
>> http://activemq.apache.org/schema/core/activemq-core.xsd">
>>
>>     <broker
>>         xmlns="http://activemq.apache.org/schema/core"
>>         brokerName="master"
>>         dataDirectory="/home/activemq/activemq/data"
>>         waitForSlave="false" >
>>         <destinationPolicy>
>>             <policyMap>
>>               <policyEntries>
>>                 <policyEntry topic=">" producerFlowControl="false"
>> memoryLimit="1500mb">
>>                   <pendingSubscriberPolicy>
>>                     <vmCursor />
>>                   </pendingSubscriberPolicy>
>>                 </policyEntry>
>>                 <policyEntry queue=">" producerFlowControl="false"
>> memoryLimit="1500mb">
>>                 </policyEntry>
>>               </policyEntries>
>>             </policyMap>
>>         </destinationPolicy>
>>
>>
>>         <managementContext>
>>             <managementContext createConnector="false"/>
>>         </managementContext>
>>
>>
>>         <persistenceAdapter>
>>             <kahaDB directory="${activemq.base}/data/kahadb"
>>                 cleanupInterval="300000" checkpointInterval="50000"
>>                 journalMaxWriteBatchSize="6400k"
>>                 journalMaxFileLength="100g"
>>                 indexCacheSize="100000" indexWriteBatchSize="100000"
>>              />
>>         </persistenceAdapter>
>>
>>         <transportConnectors>
>>             <transportConnector name="openwire" uri="tcp://
>> 0.0.0.0:61616?wireFormat.maxInactivityDuration=0"/>
>>         </transportConnectors>
>>
>>     </broker>
>>
>>     <import resource="jetty.xml"/>
>>
>> </beans>
>>
>>
>> Slave:
>>
>> <beans
>>   xmlns="http://www.springframework.org/schema/beans"
>>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>   xsi:schemaLocation="http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>>   http://activemq.apache.org/schema/core
>> http://activemq.apache.org/schema/core/activemq-core.xsd
>>   http://camel.apache.org/schema/spring
>> http://camel.apache.org/schema/spring/camel-spring.xsd">
>>
>>   <bean
>> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
>>
>>   <broker masterConnectorURI="tcp://masterIPAddress:61616"
>> brokerName="slave" xmlns="http://activemq.apache.org/schema/core"
>> shutdownOnMasterFailure="false" >
>>
>>         <destinationPolicy>
>>             <policyMap>
>>                 <policyEntries>
>>                     <policyEntry queue=">" optimizedDispatch="true" />
>>                     <policyEntry topic=">" optimizedDispatch="true" />
>>                 </policyEntries>
>>             </policyMap>
>>         </destinationPolicy>
>>
>>         <persistenceAdapter>
>>             <kahaDB directory="${activemq.base}/data/kahadb"
>>                 cleanupInterval="300000" checkpointInterval="50000"
>>                 journalMaxWriteBatchSize="6400k"
>>                 journalMaxFileLength="100g"
>>                 indexCacheSize="100000" indexWriteBatchSize="100000"
>>              />
>>         </persistenceAdapter>
>>     <transportConnectors>
>>       <transportConnector uri="tcp://0.0.0.0:61616"/>
>>     </transportConnectors>
>>   </broker>
>> </beans>
>>
>>
>



-- 
http://blog.garytully.com
http://fusesource.com

Re: Persistent messages drops in pure master-slave configuration

Posted by S Sharma <ss...@gmail.com>.
Bumping this request. This issue is hindering us using ActiveMQ in our
enterprise.

Thanks so much!

Sharma


On Wed, Jan 26, 2011 at 5:34 PM, S Sharma <ss...@gmail.com> wrote:

> Hi all,
>
> I am getting significant message drops in activemq-5.4.2 pure master-slave
> configuration. I am testing topics using the simple producer & asynchronous
> consumer from the activemq-cpp-library-3.2.4 (code attached).
>
> The master and slave configurations are also attached. I am testing
> (persistent) message drops by:
>
> 1. Create four producers from SimpleProducer.cpp, having line 135 as:
> string text = (string)("App1 Seq No: ") + ix_str;
> string text = (string)("App2 Seq No: ") + ix_str;
> string text = (string)("App3 Seq No: ") + ix_str; and
> string text = (string)("App4 Seq No: ") + ix_str;
>
> 2. Create four consumers (identical binaries) from SimpleAsyncConsumer.cpp
> all listening to topic "TEST.FOO".
>
> 3. Start all consumers:
> ./simple_async_consumer1 > consumer1, ./simple_async_consumer2 > consumer2,
> ./simple_async_consumer3 > consumer3, ./simple_async_consumer4 > consumer4
>
> 4. Then start all producers:
> ./simple_producer1  > producer1 & ./simple_producer2 > producer2  &
> ./simple_producer3 > producer3 & ./simple_producer4 > producer4 &
>
> 5. Terminate master broker: kill -9 <MasterPID>
>
> In both slave configurations: shutdownOnMasterFailure="true" and
> shutdownOnMasterFailure="false" with manual copying kahadb, I am noticing
> that the four consumers do not receive several messages which are sent
> (likely during the failover process). For instance, attached consumer1.txt
> file has <40001 messages received.
>
> Can someone please help me decipher what is going wrong? If you need any
> clarification on the test case, please let me know.
>
> Master:
>
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:amq="http://activemq.apache.org/schema/core"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>
>     <broker
>         xmlns="http://activemq.apache.org/schema/core"
>         brokerName="master"
>         dataDirectory="/home/activemq/activemq/data"
>         waitForSlave="false" >
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry topic=">" producerFlowControl="false"
> memoryLimit="1500mb">
>                   <pendingSubscriberPolicy>
>                     <vmCursor />
>                   </pendingSubscriberPolicy>
>                 </policyEntry>
>                 <policyEntry queue=">" producerFlowControl="false"
> memoryLimit="1500mb">
>                 </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>
>
>         <managementContext>
>             <managementContext createConnector="false"/>
>         </managementContext>
>
>
>         <persistenceAdapter>
>             <kahaDB directory="${activemq.base}/data/kahadb"
>                 cleanupInterval="300000" checkpointInterval="50000"
>                 journalMaxWriteBatchSize="6400k"
>                 journalMaxFileLength="100g"
>                 indexCacheSize="100000" indexWriteBatchSize="100000"
>              />
>         </persistenceAdapter>
>
>         <transportConnectors>
>             <transportConnector name="openwire" uri="tcp://
> 0.0.0.0:61616?wireFormat.maxInactivityDuration=0"/>
>         </transportConnectors>
>
>     </broker>
>
>     <import resource="jetty.xml"/>
>
> </beans>
>
>
> Slave:
>
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd
>   http://camel.apache.org/schema/spring
> http://camel.apache.org/schema/spring/camel-spring.xsd">
>
>   <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
>
>   <broker masterConnectorURI="tcp://masterIPAddress:61616"
> brokerName="slave" xmlns="http://activemq.apache.org/schema/core"
> shutdownOnMasterFailure="false" >
>
>         <destinationPolicy>
>             <policyMap>
>                 <policyEntries>
>                     <policyEntry queue=">" optimizedDispatch="true" />
>                     <policyEntry topic=">" optimizedDispatch="true" />
>                 </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>
>         <persistenceAdapter>
>             <kahaDB directory="${activemq.base}/data/kahadb"
>                 cleanupInterval="300000" checkpointInterval="50000"
>                 journalMaxWriteBatchSize="6400k"
>                 journalMaxFileLength="100g"
>                 indexCacheSize="100000" indexWriteBatchSize="100000"
>              />
>         </persistenceAdapter>
>     <transportConnectors>
>       <transportConnector uri="tcp://0.0.0.0:61616"/>
>     </transportConnectors>
>   </broker>
> </beans>
>
>