You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by smalah <sd...@ina.fr> on 2010/02/05 12:12:17 UTC

Pure Master/Slave freeze once flowcontrol

Hi, we are testing Pure Master/Slave component with following configuration :


- brokers are located on Vmware ESX in separate VM. 
Linux 5.4.
Active-MQ 5.3.0

master : 

    <broker xmlns="http://activemq.apache.org/schema/core" 
brokerName="localhost" waitForSlave="true" shutdownOnSlaveFailure="true"
persistent="true" useJmx="true" dataDirectory="${activemq.base}/data">
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>
    <destinations>
      <queue physicalName="TEST.FOO" />
    </destinations>
        <persistenceAdapter>
            <amqPersistenceAdapter 
directory="${activemq.base}/data/amqstore" syncOnWrite="true"
maxFileLength="128mb"  directoryArchive="${activemq.base}/archive/amqstore"
archiveDataLogs="true"/>
        </persistenceAdapter>
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true"
memoryLimit="1mb">
                  <pendingSubscriberPolicy>
                    <fileCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true"
memoryLimit="64mb">
                    <deadLetterStrategy>
                      <individualDeadLetterStrategy
queuePrefix="Test.DLQ."/>
                    </deadLetterStrategy>
                    <pendingQueuePolicy>
                        <fileQueueCursor />
                    </pendingQueuePolicy>
               </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>
        <systemUsage>
            <systemUsage >
                <memoryUsage>
                    <memoryUsage limit="128mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="1 gb" name="foo"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>
        </transportConnectors>
    </broker>

slave :
    <broker xmlns="http://activemq.apache.org/schema/core" 
masterConnectorURI="tcp://mombrks02.ina.fr:61616"  brokerName="localhost"
shutdownOnMasterFailure="true" useJmx="true"
dataDirectory="${activemq.base}/data">

    <destinations>
      <queue physicalName="TEST.FOO" />
    </destinations>
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>
        <persistenceAdapter>
            <amqPersistenceAdapter 
directory="${activemq.base}/data/amqstore" maxFileLength="128mb" 
directoryArchive="${activemq.base}/archive/amqstore"
archiveDataLogs="true"/>
        </persistenceAdapter>

stomp producers post presistent messages until they are suspended by
FlowControl mechanism.
->on "stand alone" broker, when consumers get messages, producers are
unlocked as usual.

-> but on pure master/slave, stomp consumers cannot get messages and as a
consequence, all producers/consumers are freezed. rebooting broker is the
only way to go back to normal behavior (no lost messages). :confused:

producer code :
<...>
        # Connect to the broker.
        my $stomp = Net::Stomp->new(
                        { hostname => "$_",
                          port     => '61613'
                        }
                );
        print "connect to broker $_ \n";
        $stomp->connect(
                                { login    => "$login",
                                      passscode => "$pass",
                                        clientid => 'CLIENT'
                                }
                        );
        my $beg_time=time;
        print "send messages $_ at $beg_time\n";
        my $i=0;

        while ($i<10000)
        {
                # Send test message containing $time timestamp.
                $stomp->send(
                                { destination => "$queue",
                                  body        => "test $i"x32768,
                                  persistent=> "true"
                                }
                        );
                print "message $i ok\n";
                $i++;
        }
<...>


and get_message:

        # Connect to the broker.
        my $stomp = Net::Stomp->new(
                    { hostname => "$_",
                      port     => '61613'
            }
        );
        $stomp->connect(
                                { login    => "$login",
                                      passscode => "$pass",
                                        clientid => 'CLIENT_GET'
                                }
                        );
        print "connected to broker $_\n";
        # Send test message containing $time timestamp.
        # Subscribe to messages from the $queue.
        $stomp->subscribe(
            { destination => "$queue",
              ack => 'client',
              prefetchSize => 1,
                persistent=>"true"
            }
        );
        my $i=0;
        my $can_read=1;
        my $framebody;
print "subscribed to queue $queue\n";
        while ($i>=0 && $can_read)
        {
                # Wait max 5 seconds for message to appear.
                my $can_read = $stomp->can_read({ timeout => "5" });
print "$i:$can_read\n";
                if ( $can_read ) {
                    # There is a message to collect.
                    my $frame = $stomp->receive_frame;
                    $stomp->ack( { frame => $frame } );
                    $framebody=$frame->body;
print "length=".length($framebody);
                }
                else {
                    # There's still to message to collect.
                    print "CRITICAL: Timed out while trying to collect the
message\n";
                    $exitcode="critical";
                }
                # Whatever the outcome, we have managed to connect to given
broker.
                # There's no need to try the other.
                $i++;
        }
        print "nb lus : $i";
        $stomp->disconnect;
        exit $error{"$exitcode"};
}
print "CRITICAL: No connection to ActiveMQ; tried $evalcount out of " .
@hosts . " brokers\n";
exit $error{"critical"};

on producer side:
...
message 340 ok
message 341 ok
message 342 ok
<freeze>

on consumer side, we start consumer only when producers are suspended by
flow control :
[root@momcltls01 client_perl]# perl ./get_massif.pl
connected to broker mombrks02.ina.fr
subscribed to queue /queue/TEST.FOO
0:0
CRITICAL: Timed out while trying to collect the message
1:0
CRITICAL: Timed out while trying to collect the message

what's going wrong??

-- 
View this message in context: http://old.nabble.com/Pure-Master-Slave-freeze-once-flowcontrol-tp27466380p27466380.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.