You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/07/10 12:56:33 UTC
svn commit: r1359595 - in
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server:
AMQChannel.java protocol/AMQProtocolEngine.java
protocol/AMQProtocolSession.java
Author: kwall
Date: Tue Jul 10 10:56:32 2012
New Revision: 1359595
URL: http://svn.apache.org/viewvc?rev=1359595&view=rev
Log:
QPID-4121: AMQProtocolEngine now uses lock to prevent the thread-unsafe use of AMQChannel's transaction which caused it to throw a ConcurrentModificationException.
Applied patch from Philip Harvey <ph...@philharveyonline.com> and Keith Wall <kw...@apache.org>.
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1359595&r1=1359594&r2=1359595&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Jul 10 10:56:32 2012
@@ -1532,7 +1532,7 @@ public class AMQChannel implements Sessi
_logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms");
}
- // Close connection for idle or open transactions that have timed out
+ // Close session for idle or open transactions that have timed out
if (idleClose > 0L && idleTime > idleClose)
{
getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1359595&r1=1359594&r2=1359595&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Tue Jul 10 10:56:32 2012
@@ -34,6 +34,9 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
@@ -152,8 +155,11 @@ public class AMQProtocolEngine implement
private long _lastReceivedTime;
private boolean _blocking;
+ private final Lock _receivedLock;
+
public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
{
+ _receivedLock = new ReentrantLock();
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_codecFactory = new AMQCodecFactory(true, this);
@@ -225,6 +231,8 @@ public class AMQProtocolEngine implement
final long arrivalTime = System.currentTimeMillis();
_lastReceivedTime = arrivalTime;
_lastIoTime = arrivalTime;
+
+ _receivedLock.lock();
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
@@ -249,6 +257,10 @@ public class AMQProtocolEngine implement
_logger.error("Unexpected exception when processing datablock", e);
closeProtocolSession();
}
+ finally
+ {
+ _receivedLock.unlock();
+ }
}
private void receiveComplete()
@@ -815,7 +827,7 @@ public class AMQProtocolEngine implement
}
}
- public void closeConnection(int channelId, AMQConnectionException e) throws AMQException
+ private void closeConnection(int channelId, AMQConnectionException e) throws AMQException
{
try
{
@@ -1308,17 +1320,25 @@ public class AMQProtocolEngine implement
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
{
- int channelId = ((AMQChannel)session).getChannelId();
- closeChannel(channelId);
+ _receivedLock.lock();
+ try
+ {
+ int channelId = ((AMQChannel)session).getChannelId();
+ closeChannel(channelId);
- MethodRegistry methodRegistry = getMethodRegistry();
- ChannelCloseBody responseBody =
- methodRegistry.createChannelCloseBody(
- cause.getCode(),
- new AMQShortString(message),
- 0,0);
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ChannelCloseBody responseBody =
+ methodRegistry.createChannelCloseBody(
+ cause.getCode(),
+ new AMQShortString(message),
+ 0,0);
- writeFrame(responseBody.generateFrame(channelId));
+ writeFrame(responseBody.generateFrame(channelId));
+ }
+ finally
+ {
+ _receivedLock.unlock();
+ }
}
public void close(AMQConstant cause, String message) throws AMQException
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1359595&r1=1359594&r2=1359595&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Tue Jul 10 10:56:32 2012
@@ -154,10 +154,6 @@ public interface AMQProtocolSession exte
void closeProtocolSession();
- /** This must be called to close the session in order to free up any resources managed by the session. */
- void closeConnection(int channelId, AMQConnectionException e) throws AMQException;
-
-
/** @return a key that uniquely identifies this session */
Object getKey();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org