You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/04/20 01:08:20 UTC

svn commit: r530586 - in /incubator/qpid/branches/client_restructure/java: client/ newclient/src/main/java/org/apache/qpid/nclient/amqp/ newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/

Author: rajith
Date: Thu Apr 19 16:08:19 2007
New Revision: 530586

URL: http://svn.apache.org/viewvc?view=rev&rev=530586
Log:
added cordination support

Added:
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java
Modified:
    incubator/qpid/branches/client_restructure/java/client/pom.xml
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java

Modified: incubator/qpid/branches/client_restructure/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/client/pom.xml?view=diff&rev=530586&r1=530585&r2=530586
==============================================================================
--- incubator/qpid/branches/client_restructure/java/client/pom.xml (original)
+++ incubator/qpid/branches/client_restructure/java/client/pom.xml Thu Apr 19 16:08:19 2007
@@ -53,6 +53,11 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-jta_1.1_spec</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>commons-collections</groupId>
             <artifactId>commons-collections</artifactId>
         </dependency>

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java?view=diff&rev=530586&r1=530585&r2=530586
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java Thu Apr 19 16:08:19 2007
@@ -21,32 +21,27 @@
 package org.apache.qpid.nclient.amqp;
 
 import org.apache.qpid.framing.DtxCoordinationCommitBody;
-import org.apache.qpid.framing.DtxCoordinationCommitOkBody;
 import org.apache.qpid.framing.DtxCoordinationForgetBody;
-import org.apache.qpid.framing.DtxCoordinationForgetOkBody;
 import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody;
-import org.apache.qpid.framing.DtxCoordinationGetTimeoutOkBody;
 import org.apache.qpid.framing.DtxCoordinationPrepareBody;
-import org.apache.qpid.framing.DtxCoordinationPrepareOkBody;
 import org.apache.qpid.framing.DtxCoordinationRecoverBody;
-import org.apache.qpid.framing.DtxCoordinationRecoverOkBody;
 import org.apache.qpid.framing.DtxCoordinationRollbackBody;
-import org.apache.qpid.framing.DtxCoordinationRollbackOkBody;
+import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody;
 import org.apache.qpid.nclient.core.AMQPException;
 
 public interface AMQPDtxCoordination
 {
-	public DtxCoordinationCommitOkBody commit(DtxCoordinationCommitBody dtxCoordinationCommitBody) throws AMQPException;
+	public void commit(DtxCoordinationCommitBody dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException;
 	
-	public DtxCoordinationForgetOkBody forget(DtxCoordinationForgetBody dtxCoordinationForgetBody) throws AMQPException;
+	public void forget(DtxCoordinationForgetBody dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException;
 	
-	public DtxCoordinationGetTimeoutOkBody getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody) throws AMQPException;
+	public void getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException;
 	
-	public DtxCoordinationPrepareOkBody prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody) throws AMQPException;
+	public void prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException;
 	
-	public DtxCoordinationRecoverOkBody recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody) throws AMQPException;
+	public void recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException;
 	
-	public DtxCoordinationRollbackOkBody getTimeOut(DtxCoordinationRollbackBody dtxCoordinationRollbackBody) throws AMQPException;
+	public void rollback(DtxCoordinationRollbackBody dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException;
 	
-	//public DtxCoordinationSetTimeoutOkBody getTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody) throws AMQPException;
+	public void setTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException;
 }

Added: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java?view=auto&rev=530586
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java (added)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java Thu Apr 19 16:08:19 2007
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.DtxCoordinationCommitBody;
+import org.apache.qpid.framing.DtxCoordinationCommitOkBody;
+import org.apache.qpid.framing.DtxCoordinationForgetBody;
+import org.apache.qpid.framing.DtxCoordinationForgetOkBody;
+import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody;
+import org.apache.qpid.framing.DtxCoordinationGetTimeoutOkBody;
+import org.apache.qpid.framing.DtxCoordinationPrepareBody;
+import org.apache.qpid.framing.DtxCoordinationPrepareOkBody;
+import org.apache.qpid.framing.DtxCoordinationRecoverBody;
+import org.apache.qpid.framing.DtxCoordinationRecoverOkBody;
+import org.apache.qpid.framing.DtxCoordinationRollbackBody;
+import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
+import org.apache.qpid.nclient.amqp.AMQPDtxCoordination;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+public class QpidAMQPDtxCoordination extends AMQPCallBackSupport implements AMQPMethodListener, AMQPDtxCoordination
+{
+	private Phase _phase;
+
+	protected QpidAMQPDtxCoordination(int channelId,Phase phase)
+	{
+		super(channelId);
+		_phase = phase;
+	}
+
+	public void commit(DtxCoordinationCommitBody dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationCommitBody,cb);
+		_phase.messageSent(msg);
+	}
+
+	public void forget(DtxCoordinationForgetBody dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationForgetBody,cb);
+		_phase.messageSent(msg);
+	}
+
+	public void getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationGetTimeoutBody,cb);
+		_phase.messageSent(msg);
+	}
+
+	public void rollback(DtxCoordinationRollbackBody dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationRollbackBody,cb);
+		_phase.messageSent(msg);
+	}
+
+	public void prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationPrepareBody,cb);
+		_phase.messageSent(msg);
+	}
+
+	public void recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationRecoverBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	public void setTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationSetTimeoutBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	/**-------------------------------------------
+     * AMQPMethodListener methods
+     *--------------------------------------------
+     */
+	public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+    {
+    	long localCorrelationId = evt.getLocalCorrelationId();
+    	AMQMethodBody methodBody = evt.getMethod(); 
+    	if ( methodBody instanceof DtxCoordinationCommitOkBody ||
+    		 methodBody instanceof DtxCoordinationForgetOkBody	  ||
+    		 methodBody instanceof DtxCoordinationGetTimeoutOkBody  ||
+    		 methodBody instanceof DtxCoordinationPrepareOkBody	  ||
+    		 methodBody instanceof DtxCoordinationRecoverOkBody	  
+    	    )
+    	{
+    		invokeCallBack(localCorrelationId,methodBody);
+    		return true;
+    	}    	
+    	else
+    	{
+    		return false;
+    	}
+    }	
+
+}

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java?view=diff&rev=530586&r1=530585&r2=530586
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java Thu Apr 19 16:08:19 2007
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.nclient.amqp.qpid;
 
 import java.util.concurrent.locks.Condition;
@@ -52,10 +72,17 @@
 	
 	private final Condition _dtxNotSelected = _lock.newCondition();
 
-	private final Condition _channelNotClosed = _lock.newCondition();
+	private final Condition _dtxNotStarted = _lock.newCondition();
+	
+	// maybe it needs a better name
+	private final Condition _dtxNotEnd = _lock.newCondition();
 	
 	private DtxDemarcationSelectOkBody _dtxDemarcationSelectOkBody;
 	
+	private DtxDemarcationStartOkBody _dtxDemarcationStartOkBody;
+	
+	private DtxDemarcationEndOkBody _dtxDemarcationEndOkBody;
+	
 	protected QpidAMQPDtxDemarcation(int channelId, Phase phase, AMQPStateManager stateManager)
 	{
 		_channelId = channelId;
@@ -77,14 +104,14 @@
 		{
 			_dtxDemarcationSelectOkBody = null;
 			checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, AMQPState.DTX_NOT_STARTED);
-			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationSelectOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, dtxDemarcationSelectBody, QpidConstants.EMPTY_CORRELATION_ID);
 			_phase.messageSent(msg);
 
-			//_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			//_dtxNotSelected.await(_serverTimeOut, TimeUnit.MILLISECONDS);
 			_dtxNotSelected.await();
 			AMQPValidator.throwExceptionOnNull(_dtxDemarcationSelectOkBody, "The broker didn't send the DtxDemarcationSelectOkBody in time");
-			notifyState(AMQPState.CHANNEL_OPENED);
-			_currentState = AMQPState.CHANNEL_OPENED;
+			notifyState(AMQPState.DTX_NOT_STARTED);
+			_currentState = AMQPState.DTX_NOT_STARTED;
 			return _dtxDemarcationSelectOkBody;
 		}
 		catch (Exception e)
@@ -99,14 +126,56 @@
 
 	public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException
 	{
-		// TODO Auto-generated method stub
-		return null;
+		_lock.lock();
+		try
+		{
+			_dtxDemarcationStartOkBody = null;
+			checkIfValidStateTransition(_validStartStates, _currentState, AMQPState.DTX_STARTED);
+			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationStartOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+			_phase.messageSent(msg);
+
+			//_dtxNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			_dtxNotStarted.await();
+			AMQPValidator.throwExceptionOnNull(_dtxDemarcationStartOkBody, "The broker didn't send the DtxDemarcationStartOkBody in time");
+			notifyState(AMQPState.DTX_STARTED);
+			_currentState = AMQPState.DTX_STARTED;
+			return _dtxDemarcationStartOkBody;
+		}
+		catch (Exception e)
+		{
+			throw new AMQPException("Error in dtx.start", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
 	}
 	
 	public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException
 	{
-		// TODO Auto-generated method stub
-		return null;
+		_lock.lock();
+		try
+		{
+			_dtxDemarcationEndOkBody = null;
+			checkIfValidStateTransition(_validEndStates, _currentState, AMQPState.DTX_END);
+			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationEndOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+			_phase.messageSent(msg);
+
+			//_dtxNotEnd.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			_dtxNotEnd.await();
+			AMQPValidator.throwExceptionOnNull(_dtxDemarcationEndOkBody, "The broker didn't send the DtxDemarcationEndOkBody in time");
+			notifyState(AMQPState.DTX_END);
+			_currentState = AMQPState.DTX_END;
+			return _dtxDemarcationEndOkBody;
+		}
+		catch (Exception e)
+		{
+			throw new AMQPException("Error in dtx.start", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
 	}
 	
 	/**
@@ -116,7 +185,36 @@
 	 */
 	public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
 	{
-		return true;
+		_lock.lock();
+		try
+		{
+			if (evt.getMethod() instanceof DtxDemarcationSelectOkBody)
+			{
+				_dtxDemarcationEndOkBody = (DtxDemarcationEndOkBody) evt.getMethod();
+				_dtxNotSelected.signal();
+				return true;
+			}
+			else if (evt.getMethod() instanceof DtxDemarcationStartOkBody)
+			{
+				_dtxDemarcationStartOkBody = (DtxDemarcationStartOkBody) evt.getMethod();
+				_dtxNotStarted.signal();
+				return true;
+			}
+			else if (evt.getMethod() instanceof DtxDemarcationEndOkBody)
+			{
+				_dtxDemarcationEndOkBody = (DtxDemarcationEndOkBody) evt.getMethod();
+				_dtxNotEnd.signal();
+				return true;
+			}
+			else
+			{
+				return false;
+			}
+		}
+		finally
+		{
+			_lock.unlock();
+		}
 	}
 
 	private void notifyState(AMQPState newState) throws AMQPException