You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/04/19 00:07:03 UTC

svn commit: r530180 - in /incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp: qpid/QpidAMQPChannel.java qpid/QpidAMQPDtxDemarcation.java state/AMQPState.java

Author: rajith
Date: Wed Apr 18 15:07:01 2007
New Revision: 530180

URL: http://svn.apache.org/viewvc?view=rev&rev=530180
Log:
added state support for distributed transactions

Modified:
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java?view=diff&rev=530180&r1=530179&r2=530180
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java Wed Apr 18 15:07:01 2007
@@ -130,7 +130,7 @@
 
 			//_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
 			_channelNotOpend.await();
-			checkIfConnectionClosed();
+			checkIfChannelClosed();
 			AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
 			notifyState(AMQPState.CHANNEL_OPENED);
 			_currentState = AMQPState.CHANNEL_OPENED;
@@ -198,7 +198,7 @@
 
 			//_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
 			_channelFlowNotResponded.await();
-			checkIfConnectionClosed();
+			checkIfChannelClosed();
 			AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
 			handleChannelFlowState(_channelFlowOkBody.active);
 			return _channelFlowOkBody;
@@ -228,7 +228,7 @@
 
 			//_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
 			_channelNotResumed.await();
-			checkIfConnectionClosed();
+			checkIfChannelClosed();
 			AMQPValidator.throwExceptionOnNull(_channelOkBody,
 					"The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
 			notifyState(AMQPState.CHANNEL_OPENED);
@@ -330,7 +330,7 @@
 		}
 	}
 
-	private void checkIfConnectionClosed() throws AMQPException
+	private void checkIfChannelClosed() throws AMQPException
 	{
 		if (_channelCloseBody != null)
 		{

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java?view=diff&rev=530180&r1=530179&r2=530180
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java Wed Apr 18 15:07:01 2007
@@ -1,9 +1,11 @@
 package org.apache.qpid.nclient.amqp.qpid;
 
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.DtxDemarcationEndBody;
 import org.apache.qpid.framing.DtxDemarcationEndOkBody;
 import org.apache.qpid.framing.DtxDemarcationSelectBody;
@@ -11,16 +13,24 @@
 import org.apache.qpid.framing.DtxDemarcationStartBody;
 import org.apache.qpid.framing.DtxDemarcationStartOkBody;
 import org.apache.qpid.nclient.amqp.AMQPDtxDemarcation;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
 import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
 import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.config.ClientConfiguration;
 import org.apache.qpid.nclient.core.AMQPException;
 import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.util.AMQPValidator;
 
-public class QpidAMQPDtxDemarcation implements AMQPDtxDemarcation
+public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMethodListener, AMQPDtxDemarcation
 {
 	private static final Logger _logger = Logger.getLogger(QpidAMQPDtxDemarcation.class);
 
-	// the channelId assigned for this channel
+	// the channelId that will be used for transactions
 	private int _channelId;
 
 	private Phase _phase;
@@ -29,34 +39,88 @@
 
 	private AMQPStateManager _stateManager;
 
-	private final AMQPState[] _validCloseStates = new AMQPState[]
-	{ AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
+	private final AMQPState[] _validEndStates = new AMQPState[]
+	{ AMQPState.DTX_STARTED };
 
-	private final AMQPState[] _validResumeStates = new AMQPState[]
-	{ AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
+	private final AMQPState[] _validStartStates = new AMQPState[]
+	{ AMQPState.DTX_NOT_STARTED, AMQPState.DTX_END };
 
 	// The wait period until a server sends a respond
 	private long _serverTimeOut = 1000;
 
 	private final Lock _lock = new ReentrantLock();
 	
+	private final Condition _dtxNotSelected = _lock.newCondition();
+
+	private final Condition _channelNotClosed = _lock.newCondition();
 	
-	public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException
+	private DtxDemarcationSelectOkBody _dtxDemarcationSelectOkBody;
+	
+	protected QpidAMQPDtxDemarcation(int channelId, Phase phase, AMQPStateManager stateManager)
 	{
-		// TODO Auto-generated method stub
-		return null;
+		_channelId = channelId;
+		_phase = phase;
+		_stateManager = stateManager;
+		_currentState = AMQPState.DTX_CHANNEL_NOT_SELECTED;
+		_serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+	}	
+
+	/**
+	 * ------------------------------------------- 
+	 * API Methods
+	 *  --------------------------------------------
+	 */
+	public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			_dtxDemarcationSelectOkBody = null;
+			checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, AMQPState.DTX_NOT_STARTED);
+			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationSelectOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+			_phase.messageSent(msg);
+
+			//_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			_dtxNotSelected.await();
+			AMQPValidator.throwExceptionOnNull(_dtxDemarcationSelectOkBody, "The broker didn't send the DtxDemarcationSelectOkBody in time");
+			notifyState(AMQPState.CHANNEL_OPENED);
+			_currentState = AMQPState.CHANNEL_OPENED;
+			return _dtxDemarcationSelectOkBody;
+		}
+		catch (Exception e)
+		{
+			throw new AMQPException("Error in dtx.select", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
 	}
 
-	public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException
+	public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException
 	{
 		// TODO Auto-generated method stub
 		return null;
 	}
-
-	public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException
+	
+	public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException
 	{
 		// TODO Auto-generated method stub
 		return null;
 	}
+	
+	/**
+	 * ------------------------------------------- 
+	 * AMQPMethodListener methods
+	 * --------------------------------------------
+	 */
+	public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+	{
+		return true;
+	}
 
+	private void notifyState(AMQPState newState) throws AMQPException
+	{
+		_stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE));
+	}
 }

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java?view=diff&rev=530180&r1=530179&r2=530180
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java Wed Apr 18 15:07:01 2007
@@ -57,4 +57,10 @@
     public static final AMQPState CHANNEL_OPENED = new AMQPState(11, "CHANNEL_OPENED");    
     public static final AMQPState CHANNEL_CLOSED = new AMQPState(11, "CHANNEL_CLOSED");
     public static final AMQPState CHANNEL_SUSPEND = new AMQPState(11, "CHANNEL_SUSPEND");
+    
+    // Distributed Transaction state
+    public static final AMQPState DTX_CHANNEL_NOT_SELECTED = new AMQPState(10, "DTX_CHANNEL_NOT_SELECTED");
+    public static final AMQPState DTX_NOT_STARTED = new AMQPState(10, "DTX_NOT_STARTED");
+    public static final AMQPState DTX_STARTED = new AMQPState(10, "DTX_STARTED");
+    public static final AMQPState DTX_END = new AMQPState(10, "DTX_END");
 }