You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by smalah <sd...@ina.fr> on 2010/02/05 12:12:17 UTC
Pure Master/Slave freeze once flowcontrol
Hi, we are testing Pure Master/Slave component with following configuration :
- brokers are located on Vmware ESX in separate VM.
Linux 5.4.
Active-MQ 5.3.0
master :
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" waitForSlave="true" shutdownOnSlaveFailure="true"
persistent="true" useJmx="true" dataDirectory="${activemq.base}/data">
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<destinations>
<queue physicalName="TEST.FOO" />
</destinations>
<persistenceAdapter>
<amqPersistenceAdapter
directory="${activemq.base}/data/amqstore" syncOnWrite="true"
maxFileLength="128mb" directoryArchive="${activemq.base}/archive/amqstore"
archiveDataLogs="true"/>
</persistenceAdapter>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true"
memoryLimit="1mb">
<pendingSubscriberPolicy>
<fileCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true"
memoryLimit="64mb">
<deadLetterStrategy>
<individualDeadLetterStrategy
queuePrefix="Test.DLQ."/>
</deadLetterStrategy>
<pendingQueuePolicy>
<fileQueueCursor />
</pendingQueuePolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<systemUsage>
<systemUsage >
<memoryUsage>
<memoryUsage limit="128mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" name="foo"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>
</transportConnectors>
</broker>
slave :
<broker xmlns="http://activemq.apache.org/schema/core"
masterConnectorURI="tcp://mombrks02.ina.fr:61616" brokerName="localhost"
shutdownOnMasterFailure="true" useJmx="true"
dataDirectory="${activemq.base}/data">
<destinations>
<queue physicalName="TEST.FOO" />
</destinations>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<amqPersistenceAdapter
directory="${activemq.base}/data/amqstore" maxFileLength="128mb"
directoryArchive="${activemq.base}/archive/amqstore"
archiveDataLogs="true"/>
</persistenceAdapter>
stomp producers post presistent messages until they are suspended by
FlowControl mechanism.
->on "stand alone" broker, when consumers get messages, producers are
unlocked as usual.
-> but on pure master/slave, stomp consumers cannot get messages and as a
consequence, all producers/consumers are freezed. rebooting broker is the
only way to go back to normal behavior (no lost messages). :confused:
producer code :
<...>
# Connect to the broker.
my $stomp = Net::Stomp->new(
{ hostname => "$_",
port => '61613'
}
);
print "connect to broker $_ \n";
$stomp->connect(
{ login => "$login",
passscode => "$pass",
clientid => 'CLIENT'
}
);
my $beg_time=time;
print "send messages $_ at $beg_time\n";
my $i=0;
while ($i<10000)
{
# Send test message containing $time timestamp.
$stomp->send(
{ destination => "$queue",
body => "test $i"x32768,
persistent=> "true"
}
);
print "message $i ok\n";
$i++;
}
<...>
and get_message:
# Connect to the broker.
my $stomp = Net::Stomp->new(
{ hostname => "$_",
port => '61613'
}
);
$stomp->connect(
{ login => "$login",
passscode => "$pass",
clientid => 'CLIENT_GET'
}
);
print "connected to broker $_\n";
# Send test message containing $time timestamp.
# Subscribe to messages from the $queue.
$stomp->subscribe(
{ destination => "$queue",
ack => 'client',
prefetchSize => 1,
persistent=>"true"
}
);
my $i=0;
my $can_read=1;
my $framebody;
print "subscribed to queue $queue\n";
while ($i>=0 && $can_read)
{
# Wait max 5 seconds for message to appear.
my $can_read = $stomp->can_read({ timeout => "5" });
print "$i:$can_read\n";
if ( $can_read ) {
# There is a message to collect.
my $frame = $stomp->receive_frame;
$stomp->ack( { frame => $frame } );
$framebody=$frame->body;
print "length=".length($framebody);
}
else {
# There's still to message to collect.
print "CRITICAL: Timed out while trying to collect the
message\n";
$exitcode="critical";
}
# Whatever the outcome, we have managed to connect to given
broker.
# There's no need to try the other.
$i++;
}
print "nb lus : $i";
$stomp->disconnect;
exit $error{"$exitcode"};
}
print "CRITICAL: No connection to ActiveMQ; tried $evalcount out of " .
@hosts . " brokers\n";
exit $error{"critical"};
on producer side:
...
message 340 ok
message 341 ok
message 342 ok
<freeze>
on consumer side, we start consumer only when producers are suspended by
flow control :
[root@momcltls01 client_perl]# perl ./get_massif.pl
connected to broker mombrks02.ina.fr
subscribed to queue /queue/TEST.FOO
0:0
CRITICAL: Timed out while trying to collect the message
1:0
CRITICAL: Timed out while trying to collect the message
what's going wrong??
--
View this message in context: http://old.nabble.com/Pure-Master-Slave-freeze-once-flowcontrol-tp27466380p27466380.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.