You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/04/19 00:07:03 UTC
svn commit: r530180 - in
/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp:
qpid/QpidAMQPChannel.java qpid/QpidAMQPDtxDemarcation.java
state/AMQPState.java
Author: rajith
Date: Wed Apr 18 15:07:01 2007
New Revision: 530180
URL: http://svn.apache.org/viewvc?view=rev&rev=530180
Log:
added state support for distributed transactions
Modified:
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java?view=diff&rev=530180&r1=530179&r2=530180
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java Wed Apr 18 15:07:01 2007
@@ -130,7 +130,7 @@
//_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
_channelNotOpend.await();
- checkIfConnectionClosed();
+ checkIfChannelClosed();
AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
notifyState(AMQPState.CHANNEL_OPENED);
_currentState = AMQPState.CHANNEL_OPENED;
@@ -198,7 +198,7 @@
//_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
_channelFlowNotResponded.await();
- checkIfConnectionClosed();
+ checkIfChannelClosed();
AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
handleChannelFlowState(_channelFlowOkBody.active);
return _channelFlowOkBody;
@@ -228,7 +228,7 @@
//_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
_channelNotResumed.await();
- checkIfConnectionClosed();
+ checkIfChannelClosed();
AMQPValidator.throwExceptionOnNull(_channelOkBody,
"The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
notifyState(AMQPState.CHANNEL_OPENED);
@@ -330,7 +330,7 @@
}
}
- private void checkIfConnectionClosed() throws AMQPException
+ private void checkIfChannelClosed() throws AMQPException
{
if (_channelCloseBody != null)
{
Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java?view=diff&rev=530180&r1=530179&r2=530180
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java Wed Apr 18 15:07:01 2007
@@ -1,9 +1,11 @@
package org.apache.qpid.nclient.amqp.qpid;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.DtxDemarcationEndBody;
import org.apache.qpid.framing.DtxDemarcationEndOkBody;
import org.apache.qpid.framing.DtxDemarcationSelectBody;
@@ -11,16 +13,24 @@
import org.apache.qpid.framing.DtxDemarcationStartBody;
import org.apache.qpid.framing.DtxDemarcationStartOkBody;
import org.apache.qpid.nclient.amqp.AMQPDtxDemarcation;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.util.AMQPValidator;
-public class QpidAMQPDtxDemarcation implements AMQPDtxDemarcation
+public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMethodListener, AMQPDtxDemarcation
{
private static final Logger _logger = Logger.getLogger(QpidAMQPDtxDemarcation.class);
- // the channelId assigned for this channel
+ // the channelId that will be used for transactions
private int _channelId;
private Phase _phase;
@@ -29,34 +39,88 @@
private AMQPStateManager _stateManager;
- private final AMQPState[] _validCloseStates = new AMQPState[]
- { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
+ private final AMQPState[] _validEndStates = new AMQPState[]
+ { AMQPState.DTX_STARTED };
- private final AMQPState[] _validResumeStates = new AMQPState[]
- { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
+ private final AMQPState[] _validStartStates = new AMQPState[]
+ { AMQPState.DTX_NOT_STARTED, AMQPState.DTX_END };
// The wait period until a server sends a respond
private long _serverTimeOut = 1000;
private final Lock _lock = new ReentrantLock();
+ private final Condition _dtxNotSelected = _lock.newCondition();
+
+ private final Condition _channelNotClosed = _lock.newCondition();
- public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException
+ private DtxDemarcationSelectOkBody _dtxDemarcationSelectOkBody;
+
+ protected QpidAMQPDtxDemarcation(int channelId, Phase phase, AMQPStateManager stateManager)
{
- // TODO Auto-generated method stub
- return null;
+ _channelId = channelId;
+ _phase = phase;
+ _stateManager = stateManager;
+ _currentState = AMQPState.DTX_CHANNEL_NOT_SELECTED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ }
+
+ /**
+ * -------------------------------------------
+ * API Methods
+ * --------------------------------------------
+ */
+ public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _dtxDemarcationSelectOkBody = null;
+ checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, AMQPState.DTX_NOT_STARTED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationSelectOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _dtxNotSelected.await();
+ AMQPValidator.throwExceptionOnNull(_dtxDemarcationSelectOkBody, "The broker didn't send the DtxDemarcationSelectOkBody in time");
+ notifyState(AMQPState.CHANNEL_OPENED);
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _dtxDemarcationSelectOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in dtx.select", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException
+ public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException
{
// TODO Auto-generated method stub
return null;
}
-
- public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException
+
+ public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException
{
// TODO Auto-generated method stub
return null;
}
+
+ /**
+ * -------------------------------------------
+ * AMQPMethodListener methods
+ * --------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ return true;
+ }
+ private void notifyState(AMQPState newState) throws AMQPException
+ {
+ _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE));
+ }
}
Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java?view=diff&rev=530180&r1=530179&r2=530180
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java Wed Apr 18 15:07:01 2007
@@ -57,4 +57,10 @@
public static final AMQPState CHANNEL_OPENED = new AMQPState(11, "CHANNEL_OPENED");
public static final AMQPState CHANNEL_CLOSED = new AMQPState(11, "CHANNEL_CLOSED");
public static final AMQPState CHANNEL_SUSPEND = new AMQPState(11, "CHANNEL_SUSPEND");
+
+ // Distributed Transaction state
+ public static final AMQPState DTX_CHANNEL_NOT_SELECTED = new AMQPState(10, "DTX_CHANNEL_NOT_SELECTED");
+ public static final AMQPState DTX_NOT_STARTED = new AMQPState(10, "DTX_NOT_STARTED");
+ public static final AMQPState DTX_STARTED = new AMQPState(10, "DTX_STARTED");
+ public static final AMQPState DTX_END = new AMQPState(10, "DTX_END");
}