You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Phillip Henry (Created) (JIRA)" <ji...@apache.org> on 2012/02/22 00:14:48 UTC
[jira] [Created] (AMQ-3732) Different methods synchronizing on
different mutexes when changing the same field
Different methods synchronizing on different mutexes when changing the same field
---------------------------------------------------------------------------------
Key: AMQ-3732
URL: https://issues.apache.org/jira/browse/AMQ-3732
Project: ActiveMQ
Issue Type: Bug
Components: Broker
Affects Versions: 5.5.1
Environment: Darwin phillip.local 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:55:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_I386 i386 i386
Reporter: Phillip Henry
org.apache.activemq.broker.region.PrefetchSubscription.prefetchExtension is changed while guarded by a mutex on this (PrefetchSubscription) in PrefetchSubscription.pullMessage(...) and PrefetchSubscription.dispatchLock in PrefetchSubscription.acknowledge(...).
This can lead to the corruption of the prefetchExtension variable (eg, prefetchExtension++ in pullMessage() is not an atomic operation so prefetchExtension may change in acknowledge() mid-way through this operation).
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (AMQ-3732) Different methods synchronizing on
different mutexes when changing the same field
Posted by "Timothy Bish (Resolved) (JIRA)" <ji...@apache.org>.
[ https://issues.apache.org/jira/browse/AMQ-3732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timothy Bish resolved AMQ-3732.
-------------------------------
Resolution: Fixed
Fix Version/s: 5.6.0
Assignee: Timothy Bish
Fixed on trunk.
> Different methods synchronizing on different mutexes when changing the same field
> ---------------------------------------------------------------------------------
>
> Key: AMQ-3732
> URL: https://issues.apache.org/jira/browse/AMQ-3732
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.5.1
> Environment: Darwin phillip.local 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:55:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_I386 i386 i386
> Reporter: Phillip Henry
> Assignee: Timothy Bish
> Labels: concurrency
> Fix For: 5.6.0
>
>
> org.apache.activemq.broker.region.PrefetchSubscription.prefetchExtension is changed while guarded by a mutex on this (PrefetchSubscription) in PrefetchSubscription.pullMessage(...) and PrefetchSubscription.dispatchLock in PrefetchSubscription.acknowledge(...).
> This can lead to the corruption of the prefetchExtension variable (eg, prefetchExtension++ in pullMessage() is not an atomic operation so prefetchExtension may change in acknowledge() mid-way through this operation).
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (AMQ-3732) Different methods synchronizing on
different mutexes when changing the same field
Posted by "Phillip Henry (Commented) (JIRA)" <ji...@apache.org>.
[ https://issues.apache.org/jira/browse/AMQ-3732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13217443#comment-13217443 ]
Phillip Henry commented on AMQ-3732:
------------------------------------
OK, it's not quite a unit test - it's a stress test that uses JUnit - but it illustrates the point.
This code fails on my Mac (2.66GHz Intel Core 2 Duo) after a non-deterministic number of iterations.
Most of the code is just simple stub implementations of ActiveMQ classes and interfaces with one or two methods implemented or overriden. (I tried using JMock but it didn't seem multi-threaded friendly). Basically, it starts two threads - one to pull a message, one to acknowledge. The pulling thread should increase prefetchExtension in pullMessage() thus:
{code}
synchronized(this) {
prefetchExtension++;
dispatchCounterBeforePull = dispatchCounter;
}
{code}
The other thread calls acknowledge with the stubbed objects and PrefetchSubscription object itself in such a state that this line is executed:
{code}
prefetchExtension = Math.max(prefetchExtension, index + 1);
{code}
where index is the index of the number of dispatched MessageReference collection. This should never be more than 0 since only message is ever dispatch()-ed by the test. All objects are instantiated anew on each iteration.
Given this, prefetchExtension should only ever be 0 (neither test has hit their target code) or 1 (from prefetchExtension++ or Math.max(prefetchExtension, 1) because index is always 0).
However, the test demonstrates that occasionally prefetchExtension is 2. I posit that this is because the threads can sometimes execute their lines at the same time; prefetchExtension++ is not atomic (it's actually prefetchExtension = prefetchExtension + 1); and changing it is not guarded by the same mutex.
For example, the order might look like:
Step 1. PULL THREAD: add 1 to prefetchExtension. To do this, find out what prefetchExtension is. But before this thread can do so, context switches to the thread executing acknowledge() in step 2, immediately below.
Step 2. ACK THREAD: prefetchExtension is the maximum of its current value and 1. It looks like its current value is 0 so set prefetchExtension to 1.
Step 3. PULL THREAD: has found that prefetchExtension is 1 and adds one to it giving a total of 2.
Test case:
{code}
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.Connector;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import junit.framework.TestCase;
public class PrefetchSubscriptionStressTest extends TestCase {
private PrefetchSubscriptionStub toTest;
private final MessageId messageId = new MessageId();
private MessageAck ack;
private MessageReference node;
protected void setUp() throws Exception {
super.setUp();
}
public void testMultiThreadedAccessToPrefetchExtension() throws InterruptedException, InvalidSelectorException {
final int callsPerThread = 1;
final int numIterations = 1000;
for (int i = 0 ; i < numIterations ; i++) {
final MessagePull pull = initializeStubs();
Thread messagePuller = new Thread(new MessagePuller(callsPerThread, pull));
Thread dispatchAndAcknowledger = new Thread(new DispatchAndAcknowledger(ack, node, callsPerThread));
startThreads(messagePuller, dispatchAndAcknowledger);
waitForThreadsToStop(messagePuller, dispatchAndAcknowledger);
int actualPrefetchExtensions = toTest.getPrefetchExtension();
assertEquals("failed on iteration: " + i, 1, actualPrefetchExtensions);
}
}
private void waitForThreadsToStop(Thread messagePuller, Thread dispatchAndAcknowledger) throws InterruptedException {
dispatchAndAcknowledger.join();
messagePuller.join();
}
private void startThreads(Thread messagePuller, Thread dispatchAndAcknowledger) {
messagePuller.start();
dispatchAndAcknowledger.start();
}
private MessagePull initializeStubs() throws InvalidSelectorException {
initialise();
primeMessagesAndDestinations();
final MessagePull pull = primeForPullMessage();
return pull;
}
private void dispatchAndAcknowledge(final MessageAck ack, final MessageReference node, final int numToDispatch)
throws IOException, Exception {
ConnectionContext context = null;
for (int i = 0 ; i < numToDispatch ; i++) {
toTest.dispatch(node);
}
toTest.acknowledge(context , ack);
}
public class MessagePuller implements Runnable {
private final int totalRuns;
private final MessagePull pull;
public MessagePuller(int totalRuns, MessagePull pull) {
super();
this.totalRuns = totalRuns;
this.pull = pull;
}
@Override
public void run() {
for (int i = 0 ; i < totalRuns ; i++) {
try {
pullMessage(pull);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private void pullMessage(final MessagePull pull) throws Exception {
ConnectionContext context = null;
toTest.pullMessage(context, pull);
}
private MessagePull primeForPullMessage() {
final MessagePull pull = new MessagePull() {
@Override
public long getTimeout() {
return -1;
}
};
return pull;
}
public class DispatchAndAcknowledger implements Runnable {
private final MessageAck ack;
private final MessageReference node;
private final int totalRuns;
public DispatchAndAcknowledger(MessageAck ack, MessageReference node, int totalRuns) {
super();
this.ack = ack;
this.node = node;
this.totalRuns = totalRuns;
}
@Override
public void run() {
for (int i = 0 ; i < totalRuns ; i++) {
try {
dispatchAndAcknowledge(ack, node, 1);
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private void initialise() throws InvalidSelectorException {
SystemUsage usageManager = null;
final Connection connection = new Connection() {
@Override
public void stop() throws Exception { }
@Override
public void start() throws Exception { }
@Override
public void updateClient(ConnectionControl control) { }
@Override
public void serviceExceptionAsync(IOException e) { }
@Override
public void serviceException(Throwable error) { }
@Override
public Response service(Command command) { return null; }
@Override
public boolean isSlow() { return false; }
@Override
public boolean isNetworkConnection() { return false; }
@Override
public boolean isManageable() { return false; }
@Override
public boolean isFaultTolerantConnection() { return false; }
@Override
public boolean isConnected() { return false; }
@Override
public boolean isBlocked() { return false; }
@Override
public boolean isActive() { return false; }
@Override
public ConnectionStatistics getStatistics() { return null; }
@Override
public String getRemoteAddress() { return null; }
@Override
public int getDispatchQueueSize() { return 0; }
@Override
public Connector getConnector() { return null; }
@Override
public String getConnectionId() { return null; }
@Override
public void dispatchSync(Command message) { }
@Override
public void dispatchAsync(Command command) { }
};
final ConnectionContext context = new ConnectionContext() {
@Override
public Connection getConnection() {
return connection;
}
};
final Broker broker = new Broker() {
@Override
public void stop() throws Exception { }
@Override
public void start() throws Exception { }
@Override
public void send(ProducerBrokerExchange producerExchange, Message message)
throws Exception { }
@Override
public void removeSubscription(ConnectionContext context,
RemoveSubscriptionInfo info) throws Exception { }
@Override
public void removeDestination(ConnectionContext context,
ActiveMQDestination destination, long timeout) throws Exception { }
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info)
throws Exception { }
@Override
public void processDispatchNotification(
MessageDispatchNotification messageDispatchNotification)
throws Exception { }
@Override
public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
ConsumerControl control) { }
@Override
public Response messagePull(ConnectionContext context, MessagePull pull)
throws Exception { return null; }
@Override
public Set<Destination> getDestinations(ActiveMQDestination destination) { return null; }
@Override
public Map<ActiveMQDestination, Destination> getDestinationMap() { return null; }
@Override
public void gc() { }
@Override
public Destination addDestination(ConnectionContext context,
ActiveMQDestination destination, boolean createIfTemporary)
throws Exception { return null; }
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info)
throws Exception { return null; }
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange,
MessageAck ack) throws Exception { }
@Override
public void slowConsumer(ConnectionContext context,
Destination destination, Subscription subs) { }
@Override
public void setAdminConnectionContext(
ConnectionContext adminConnectionContext) { }
@Override
public void sendToDeadLetterQueue(ConnectionContext context,
MessageReference messageReference, Subscription subscription) { }
@Override
public void rollbackTransaction(ConnectionContext context, TransactionId xid)
throws Exception { }
@Override
public void removeSession(ConnectionContext context, SessionInfo info)
throws Exception { }
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info)
throws Exception { }
@Override
public void removeDestinationInfo(ConnectionContext context,
DestinationInfo info) throws Exception { }
@Override
public void removeConnection(ConnectionContext context,
ConnectionInfo info, Throwable error) throws Exception { }
@Override
public void removeBroker(Connection connection, BrokerInfo info) { }
@Override
public int prepareTransaction(ConnectionContext context, TransactionId xid)
throws Exception { return 0; }
@Override
public void preProcessDispatch(MessageDispatch messageDispatch) { }
@Override
public void postProcessDispatch(MessageDispatch messageDispatch) { }
@Override
public void nowMasterBroker() { }
@Override
public void networkBridgeStopped(BrokerInfo brokerInfo) { }
@Override
public void networkBridgeStarted(BrokerInfo brokerInfo,
boolean createdByDuplex, String remoteIp) { }
@Override
public void messageExpired(ConnectionContext context,
MessageReference messageReference, Subscription subscription) { }
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub,
MessageReference messageReference) { }
@Override
public void messageDelivered(ConnectionContext context,
MessageReference messageReference) { }
@Override
public void messageConsumed(ConnectionContext context,
MessageReference messageReference) { }
@Override
public boolean isStopped() { return false; }
@Override
public void isFull(ConnectionContext context, Destination destination,
Usage usage) { }
@Override
public boolean isFaultTolerantConfiguration() { return false; }
@Override
public boolean isExpired(MessageReference messageReference) { return false; }
@Override
public URI getVmConnectorURI() { return null; }
@Override
public PListStore getTempDataStore() { return null; }
@Override
public Scheduler getScheduler() { return null; }
@Override
public Broker getRoot() { return null; }
@Override
public TransactionId[] getPreparedTransactions(ConnectionContext context)
throws Exception { return null; }
@Override
public BrokerInfo[] getPeerBrokerInfos() { return null; }
@Override
public ThreadPoolExecutor getExecutor() { return null; }
@Override
public Set<ActiveMQDestination> getDurableDestinations() { return null; }
@Override
public ActiveMQDestination[] getDestinations() throws Exception { return null; }
@Override
public Connection[] getClients() throws Exception { return null; }
@Override
public BrokerService getBrokerService() { return null; }
@Override
public long getBrokerSequenceId() { return 0; }
@Override
public String getBrokerName() { return null; }
@Override
public BrokerId getBrokerId() { return null; }
@Override
public ConnectionContext getAdminConnectionContext() { return null; }
@Override
public Broker getAdaptor(Class type) { return null; }
@Override
public void forgetTransaction(ConnectionContext context,
TransactionId transactionId) throws Exception { }
@Override
public void fastProducer(ConnectionContext context,
ProducerInfo producerInfo) { }
@Override
public void commitTransaction(ConnectionContext context, TransactionId xid,
boolean onePhase) throws Exception { }
@Override
public void brokerServiceStarted() { }
@Override
public void beginTransaction(ConnectionContext context, TransactionId xid)
throws Exception { }
@Override
public void addSession(ConnectionContext context, SessionInfo info)
throws Exception { }
@Override
public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception { }
@Override
public void addDestinationInfo(ConnectionContext context,
DestinationInfo info) throws Exception { }
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info)
throws Exception { }
@Override
public void addBroker(Connection connection, BrokerInfo info) { }
};
final ActiveMQDestination activeMQDestination = new ActiveMQDestination() {
@Override
public byte getDataStructureType() {
return 0;
}
@Override
protected String getQualifiedPrefix() {
return null;
}
@Override
public byte getDestinationType() {
return 0;
}
@Override
public String getPhysicalName() {
return "test";
}
@Override
public boolean isComposite() {
return false;
}
};
final ConsumerInfo info = new ConsumerInfo() {
@Override
public ActiveMQDestination getDestination() {
return activeMQDestination;
}
};
toTest = new PrefetchSubscriptionStub(broker, usageManager, context, info);
}
private class PrefetchSubscriptionStub extends PrefetchSubscription {
public PrefetchSubscriptionStub(Broker broker,
SystemUsage usageManager, ConnectionContext context,
ConsumerInfo info) throws InvalidSelectorException {
super(broker, usageManager, context, info);
}
@Override
public void destroy() {
// TODO Auto-generated method stub
}
@Override
protected boolean isDropped(MessageReference node) {
// TODO Auto-generated method stub
return false;
}
@Override
protected boolean canDispatch(MessageReference node)
throws IOException {
// TODO Auto-generated method stub
return false;
}
@Override
protected void acknowledge(ConnectionContext context,
MessageAck ack, MessageReference node) throws IOException {
// TODO Auto-generated method stub
}
@Override
public boolean isSlave() {
return false;
}
@Override
public int getPrefetchSize() {
return 0;
}
@Override
protected void onDispatch(final MessageReference node, final Message message) {
}
@Override
protected void dispatchPending() throws IOException {
}
public int getPrefetchExtension() {
return prefetchExtension;
}
}
private void primeMessagesAndDestinations() {
final Message message = new Message() {
@Override
public byte getDataStructureType() { return 0; }
@Override
public Response visit(CommandVisitor visitor) throws Exception { return null; }
@Override
public Message copy() { return null; }
@Override
public void clearBody() throws JMSException { }
};
final Destination destination = new Destination() {
@Override
public boolean iterate() { return false; }
@Override
public void stop() throws Exception { }
@Override
public void start() throws Exception { }
@Override
public void wakeup() { }
@Override
public void slowConsumer(ConnectionContext context, Subscription subs) { }
@Override
public void setUseCache(boolean useCache) { }
@Override
public void setProducerFlowControl(boolean value) { }
@Override
public void setMinimumMessageSize(int minimumMessageSize) { }
@Override
public void setMaxProducersToAudit(int maxProducersToAudit) { }
@Override
public void setMaxPageSize(int maxPageSize) { }
@Override
public void setMaxBrowsePageSize(int maxPageSize) { }
@Override
public void setMaxAuditDepth(int maxAuditDepth) { }
@Override
public void setLazyDispatch(boolean value) { }
@Override
public void setEnableAudit(boolean enableAudit) { }
@Override
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { }
@Override
public void setBlockedProducerWarningInterval(
long blockedProducerWarningInterval) { }
@Override
public void setAlwaysRetroactive(boolean value) { }
@Override
public void send(ProducerBrokerExchange producerExchange,
Message messageSend) throws Exception { }
@Override
public void removeSubscription(ConnectionContext context, Subscription sub,
long lastDeliveredSequenceId) throws Exception { }
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info)
throws Exception { }
@Override
public void processDispatchNotification(
MessageDispatchNotification messageDispatchNotification)
throws Exception { }
@Override
public void messageExpired(ConnectionContext context, Subscription subs,
MessageReference node) { }
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub,
MessageReference messageReference) { }
@Override
public void messageDelivered(ConnectionContext context,
MessageReference messageReference) { }
@Override
public void messageConsumed(ConnectionContext context,
MessageReference messageReference) { }
@Override
public void markForGC(long timeStamp) { }
@Override
public boolean isUseCache() { return false; }
@Override
public boolean isProducerFlowControl() { return false; }
@Override
public boolean isPrioritizedMessages() { return false; }
@Override
public boolean isLazyDispatch() { return false; }
@Override
public void isFull(ConnectionContext context, Usage<?> usage) { }
@Override
public boolean isEnableAudit() { return false; }
@Override
public boolean isDisposed() { return false; }
@Override
public boolean isAlwaysRetroactive() { return false; }
@Override
public boolean isActive() { return false; }
@Override
public SlowConsumerStrategy getSlowConsumerStrategy() { return null; }
@Override
public String getName() { return null; }
@Override
public int getMinimumMessageSize() { return 0; }
@Override
public MessageStore getMessageStore() { return null; }
@Override
public MemoryUsage getMemoryUsage() { return null; }
@Override
public int getMaxProducersToAudit() { return 0; }
@Override
public int getMaxPageSize() { return 0; }
@Override
public int getMaxBrowsePageSize() { return 0; }
@Override
public int getMaxAuditDepth() { return 0; }
@Override
public long getInactiveTimoutBeforeGC() { return 0; }
@Override
public DestinationStatistics getDestinationStatistics() { return null; }
@Override
public DeadLetterStrategy getDeadLetterStrategy() { return null; }
@Override
public int getCursorMemoryHighWaterMark() { return 0; }
@Override
public List<Subscription> getConsumers() { return null; }
@Override
public long getBlockedProducerWarningInterval() { return 0; }
@Override
public ActiveMQDestination getActiveMQDestination() { return null; }
@Override
public void gc() { }
@Override
public void fastProducer(ConnectionContext context,
ProducerInfo producerInfo) { }
@Override
public void dispose(ConnectionContext context) throws IOException { }
@Override
public boolean canGC() { return false; }
@Override
public Message[] browse() { return null; }
@Override
public void addSubscription(ConnectionContext context, Subscription sub)
throws Exception { }
@Override
public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception { }
@Override
public void acknowledge(ConnectionContext context, Subscription sub,
MessageAck ack, MessageReference node) throws IOException { }
};
ack = new MessageAck() {
@Override
public boolean isDeliveredAck() {
return true;
}
@Override
public MessageId getLastMessageId() {
return messageId;
}
};
node = new MessageReference() {
@Override
public boolean isPersistent() {return false; }
@Override
public boolean isExpired() {
return false;
}
@Override
public boolean isDropped() { return false; }
@Override
public boolean isAdvisory() { return false; }
@Override
public int incrementReferenceCount() { return 0; }
@Override
public void incrementRedeliveryCounter() { }
@Override
public ConsumerId getTargetConsumerId() { return null; }
@Override
public int getSize() { return 0; }
@Override
public Destination getRegionDestination() { return destination; }
@Override
public int getReferenceCount() { return 0; }
@Override
public int getRedeliveryCounter() { return 0; }
@Override
public MessageId getMessageId() { return messageId; }
@Override
public Message getMessageHardRef() { return null; }
@Override
public Message getMessage() { return message; }
@Override
public int getGroupSequence() { return 0; }
@Override
public String getGroupID() { return null; }
@Override
public long getExpiration() { return 0; }
@Override
public int decrementReferenceCount() { return 0; }
};
}
}
{code}
> Different methods synchronizing on different mutexes when changing the same field
> ---------------------------------------------------------------------------------
>
> Key: AMQ-3732
> URL: https://issues.apache.org/jira/browse/AMQ-3732
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.5.1
> Environment: Darwin phillip.local 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:55:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_I386 i386 i386
> Reporter: Phillip Henry
> Labels: concurrency
>
> org.apache.activemq.broker.region.PrefetchSubscription.prefetchExtension is changed while guarded by a mutex on this (PrefetchSubscription) in PrefetchSubscription.pullMessage(...) and PrefetchSubscription.dispatchLock in PrefetchSubscription.acknowledge(...).
> This can lead to the corruption of the prefetchExtension variable (eg, prefetchExtension++ in pullMessage() is not an atomic operation so prefetchExtension may change in acknowledge() mid-way through this operation).
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (AMQ-3732) Different methods synchronizing on
different mutexes when changing the same field
Posted by "Phillip Henry (Commented) (JIRA)" <ji...@apache.org>.
[ https://issues.apache.org/jira/browse/AMQ-3732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13213979#comment-13213979 ]
Phillip Henry commented on AMQ-3732:
------------------------------------
Hard to show in a unit test as it's not deterministic.
I noticed it while I was trying to work out why we had non-deterministic behaviour in our AMQ stack. (Not found what the problem is yet. May very well be a misconfiguration at our end...).
> Different methods synchronizing on different mutexes when changing the same field
> ---------------------------------------------------------------------------------
>
> Key: AMQ-3732
> URL: https://issues.apache.org/jira/browse/AMQ-3732
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.5.1
> Environment: Darwin phillip.local 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:55:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_I386 i386 i386
> Reporter: Phillip Henry
> Labels: concurrency
>
> org.apache.activemq.broker.region.PrefetchSubscription.prefetchExtension is changed while guarded by a mutex on this (PrefetchSubscription) in PrefetchSubscription.pullMessage(...) and PrefetchSubscription.dispatchLock in PrefetchSubscription.acknowledge(...).
> This can lead to the corruption of the prefetchExtension variable (eg, prefetchExtension++ in pullMessage() is not an atomic operation so prefetchExtension may change in acknowledge() mid-way through this operation).
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (AMQ-3732) Different methods synchronizing on
different mutexes when changing the same field
Posted by "Timothy Bish (Commented) (JIRA)" <ji...@apache.org>.
[ https://issues.apache.org/jira/browse/AMQ-3732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13213625#comment-13213625 ]
Timothy Bish commented on AMQ-3732:
-----------------------------------
Do you have a junit test case that shows the issue here.
> Different methods synchronizing on different mutexes when changing the same field
> ---------------------------------------------------------------------------------
>
> Key: AMQ-3732
> URL: https://issues.apache.org/jira/browse/AMQ-3732
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.5.1
> Environment: Darwin phillip.local 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:55:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_I386 i386 i386
> Reporter: Phillip Henry
> Labels: concurrency
>
> org.apache.activemq.broker.region.PrefetchSubscription.prefetchExtension is changed while guarded by a mutex on this (PrefetchSubscription) in PrefetchSubscription.pullMessage(...) and PrefetchSubscription.dispatchLock in PrefetchSubscription.acknowledge(...).
> This can lead to the corruption of the prefetchExtension variable (eg, prefetchExtension++ in pullMessage() is not an atomic operation so prefetchExtension may change in acknowledge() mid-way through this operation).
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira