You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/03/30 00:24:22 UTC
svn commit: r523854 [1/3] - in
/incubator/qpid/branches/qpid.0-9/java/newclient: ./ src/main/java/
src/main/java/org/apache/qpid/nclient/amqp/
src/main/java/org/apache/qpid/nclient/amqp/event/
src/main/java/org/apache/qpid/nclient/amqp/sample/ src/main...
Author: rajith
Date: Thu Mar 29 15:24:20 2007
New Revision: 523854
URL: http://svn.apache.org/viewvc?view=rev&rev=523854
Log:
First cut of the AMQP java API
Added:
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java
Removed:
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java
Modified:
incubator/qpid/branches/qpid.0-9/java/newclient/.project
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/client.log4j
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/.project
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/.project?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/.project (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/.project Thu Mar 29 15:24:20 2007
@@ -1,17 +1,14 @@
-<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
- <name>qpid-newclient</name>
- <comment></comment>
- <projects>
- </projects>
- <buildSpec>
- <buildCommand>
- <name>org.eclipse.jdt.core.javabuilder</name>
- <arguments>
- </arguments>
- </buildCommand>
- </buildSpec>
- <natures>
- <nature>org.eclipse.jdt.core.javanature</nature>
- </natures>
-</projectDescription>
+ <name>qpid-newclient</name>
+ <comment/>
+ <projects/>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments/>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
\ No newline at end of file
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/client.log4j
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/client.log4j?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/client.log4j (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/client.log4j Thu Mar 29 15:24:20 2007
@@ -16,10 +16,11 @@
# specific language governing permissions and limitations
# under the License.
#
-log4j.rootLogger=${root.logging.level}
+log4j.rootLogger=DEBUG
-log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+#log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.logger.org.apache.qpid=DEBUG, console
log4j.additivity.org.apache.qpid=false
log4j.appender.console=org.apache.log4j.ConsoleAppender
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java Thu Mar 29 15:24:20 2007
@@ -1,11 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.nclient.amqp;
import java.security.SecureRandom;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
public abstract class AMQPCallBackSupport
{
@@ -31,16 +52,16 @@
{
if(noWait)
{
- // u only need to register if u are expecting a response
- long localCorrelationId = getNextCorrelationId();
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
- _cbMap.put(localCorrelationId, cb);
- return msg;
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID);
+ return msg;
}
else
{
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID);
- return msg;
+ // u only need to register if u are expecting a response
+ long localCorrelationId = getNextCorrelationId();
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
+ _cbMap.put(localCorrelationId, cb);
+ return msg;
}
}
@@ -52,12 +73,25 @@
return msg;
}
- protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody)
+ protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody)throws AMQPException
{
- if(_cbMap.contains(localCorrelationId))
+ if(_cbMap.containsKey(localCorrelationId))
{
AMQPCallBack cb = (AMQPCallBack)_cbMap.get(localCorrelationId);
- cb.brokerResponded(methodBody);
+ if(cb == null)
+ {
+ throw new AMQPException("Unable to find the callback object responsible for handling " + methodBody);
+ }
+ else
+ {
+ cb.setIsComplete(true);
+ cb.brokerResponded(methodBody);
+ }
+ _cbMap.remove(localCorrelationId);
+ }
+ else
+ {
+ //ignore, as this event is for another class instance
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java Thu Mar 29 15:24:20 2007
@@ -36,249 +36,317 @@
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.ChannelResumeBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.amqp.state.AMQPState;
import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
import org.apache.qpid.nclient.util.AMQPValidator;
/**
- * This represents the Channel class defined in the AMQP protocol.
- * This class is a finite state machine and is thread safe by design.
- * Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown.
- * Only one thread can enter the methods that change state, at a given time.
- * The AMQP protocol recommends one thread per channel by design.
- *
- * A JMS Session can wrap an instance of this class.
+ * This represents the Channel class defined in the AMQP protocol. This class is a finite state machine and is thread
+ * safe by design. Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. Only
+ * one thread can enter the methods that change state, at a given time. The AMQP protocol recommends one thread per
+ * channel by design.
+ *
+ * A JMS Session can wrap an instance of this class.
*/
public class AMQPChannel extends AMQPStateMachine implements AMQPMethodListener
{
-private static final Logger _logger = Logger.getLogger(AMQPChannel.class);
-
- //the channelId assigned for this channel
- private int _channelId;
- private Phase _phase;
- private AMQPState _currentState;
- private final AMQPState[] _validCloseStates = new AMQPState[]{AMQPState.CHANNEL_OPENED,AMQPState.CHANNEL_SUSPEND};
- private final AMQPState[] _validResumeStates = new AMQPState[]{AMQPState.CHANNEL_CLOSED,AMQPState.CHANNEL_NOT_OPENED};
-
- // The wait period until a server sends a respond
- private long _serverTimeOut = 1000;
- private final Lock _lock = new ReentrantLock();
- private final Condition _channelNotOpend = _lock.newCondition();
- private final Condition _channelNotClosed = _lock.newCondition();
- private final Condition _channelFlowNotResponded = _lock.newCondition();
- private final Condition _channelNotResumed = _lock.newCondition();
-
- private ChannelOpenOkBody _channelOpenOkBody;
- private ChannelCloseOkBody _channelCloseOkBody;
+ private static final Logger _logger = Logger.getLogger(AMQPChannel.class);
+
+ // the channelId assigned for this channel
+ private int _channelId;
+
+ private Phase _phase;
+
+ private AMQPState _currentState;
+
+ private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
+
+ private final AMQPState[] _validResumeStates = new AMQPState[] { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
+
+ // The wait period until a server sends a respond
+ private long _serverTimeOut = 1000;
+
+ private final Lock _lock = new ReentrantLock();
+
+ private final Condition _channelNotOpend = _lock.newCondition();
+
+ private final Condition _channelNotClosed = _lock.newCondition();
+
+ private final Condition _channelFlowNotResponded = _lock.newCondition();
+
+ private final Condition _channelNotResumed = _lock.newCondition();
+
+ private ChannelOpenOkBody _channelOpenOkBody;
+
+ private ChannelCloseOkBody _channelCloseOkBody;
+
private ChannelFlowOkBody _channelFlowOkBody;
+
private ChannelOkBody _channelOkBody;
- public AMQPChannel(int channelId)
+ private ChannelCloseBody _channelCloseBody;
+
+ protected AMQPChannel(int channelId, Phase phase)
{
- _channelId = channelId;
- _currentState = AMQPState.CHANNEL_NOT_OPENED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ _channelId = channelId;
+ _phase = phase;
+ _currentState = AMQPState.CHANNEL_NOT_OPENED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
}
-
- /**-------------------------------------------
+
+ /**
+ * -------------------------------------------
* API Methods
- *--------------------------------------------
+ * --------------------------------------------
*/
-
- /**
- * Opens the channel
- */
- public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
- {
- _lock.lock();
- try {
- _channelOpenOkBody = null;
- checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED,_currentState,AMQPState.CHANNEL_OPENED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelOpenBody,QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
- _channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
- _currentState = AMQPState.CHANNEL_OPENED;
- return _channelOpenOkBody;
- }
- catch(Exception e)
- {
- throw new AMQPException("XXX");
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * Close the channel
- */
- public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
- {
- _lock.lock();
- try {
- _channelCloseOkBody = null;
- checkIfValidStateTransition(_validCloseStates,_currentState,AMQPState.CHANNEL_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelCloseBody,QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
- _channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
- _currentState = AMQPState.CHANNEL_CLOSED;
- return _channelCloseOkBody;
- }
- catch(Exception e)
- {
- throw new AMQPException("XXX");
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * Channel Flow
- */
- public ChannelFlowOkBody close(ChannelFlowBody channelFlowBody) throws AMQPException
- {
- _lock.lock();
- try {
- _channelFlowOkBody = null;
- if(channelFlowBody.active)
- {
- checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND,_currentState,AMQPState.CHANNEL_OPENED);
- }
- else
- {
- checkIfValidStateTransition(AMQPState.CHANNEL_OPENED,_currentState,AMQPState.CHANNEL_SUSPEND);
- }
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelFlowBody,QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
- _channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
- handleChannelFlowState(_channelFlowOkBody.active);
- return _channelFlowOkBody;
- }
- catch(Exception e)
- {
- throw new AMQPException("XXX");
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * Close the channel
- */
- public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
- {
- _lock.lock();
- try {
- _channelOkBody = null;
- checkIfValidStateTransition(_validResumeStates,_currentState,AMQPState.CHANNEL_OPENED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelResumeBody,QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
- _channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_channelOkBody, "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
- _currentState = AMQPState.CHANNEL_OPENED;
- return _channelOkBody;
- }
- catch(Exception e)
- {
- throw new AMQPException("XXX");
- }
- finally
- {
- _lock.unlock();
- }
+
+ /**
+ * Opens the channel
+ */
+ public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelOpenOkBody = null;
+ checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotOpend.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _channelOpenOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.open", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Close the channel
+ */
+ public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelCloseOkBody = null;
+ checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotClosed.await();
+ AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
+ _currentState = AMQPState.CHANNEL_CLOSED;
+ return _channelCloseOkBody;
}
-
- /**-------------------------------------------
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.close", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Channel Flow
+ */
+ public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelFlowOkBody = null;
+ if (channelFlowBody.active)
+ {
+ checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED);
+ }
+ else
+ {
+ checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND);
+ }
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelFlowNotResponded.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
+ handleChannelFlowState(_channelFlowOkBody.active);
+ return _channelFlowOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.flow", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Close the channel
+ */
+ public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelOkBody = null;
+ checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotResumed.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelOkBody,
+ "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _channelOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.resume", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * -------------------------------------------
* AMQPMethodListener methods
- *--------------------------------------------
+ * --------------------------------------------
*/
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- if (evt.getMethod() instanceof ChannelOpenOkBody)
- {
- _channelOpenOkBody = (ChannelOpenOkBody)evt.getMethod();
- _channelNotOpend.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelCloseOkBody)
- {
- _channelCloseOkBody = (ChannelCloseOkBody)evt.getMethod();
- _channelNotClosed.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelCloseBody)
- {
- handleChannelClose((ChannelCloseBody)evt.getMethod());
- return true;
- }
- else if (evt.getMethod() instanceof ChannelFlowOkBody)
- {
- _channelFlowOkBody = (ChannelFlowOkBody)evt.getMethod();
- _channelFlowNotResponded.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelFlowBody)
- {
- handleChannelFlow((ChannelFlowBody)evt.getMethod());
- return true;
- }
- else if (evt.getMethod() instanceof ChannelOkBody)
- {
- _channelOkBody = (ChannelOkBody)evt.getMethod();
- //In this case the only method expecting channel-ok is channel-resume
- // haven't implemented ping and pong.
- _channelNotResumed.signal();
- return true;
- }
- else
- {
- return false;
- }
- }
-
- private void handleChannelClose(ChannelCloseBody channelCloseBody)
- {
- try
- {
- _lock.lock();
- _currentState = AMQPState.CHANNEL_CLOSED;
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- private void handleChannelFlow(ChannelFlowBody channelFlowBody)
- {
- _lock.lock();
- try
- {
- handleChannelFlowState(channelFlowBody.active);
- }
- finally
- {
- _lock.unlock();
- }
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ if (evt.getMethod() instanceof ChannelOpenOkBody)
+ {
+ _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod();
+ _channelNotOpend.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelCloseOkBody)
+ {
+ _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod();
+ _channelNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelCloseBody)
+ {
+ _channelCloseBody = (ChannelCloseBody)evt.getMethod();
+ // release the correct lock as u may have some conditions waiting.
+ // while an error occured and the broker has sent a close.
+ releaseLocks();
+ handleChannelClose(_channelCloseBody);
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelFlowOkBody)
+ {
+ _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod();
+ _channelFlowNotResponded.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelFlowBody)
+ {
+ handleChannelFlow((ChannelFlowBody) evt.getMethod());
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelOkBody)
+ {
+ _channelOkBody = (ChannelOkBody) evt.getMethod();
+ // In this case the only method expecting channel-ok is channel-resume
+ // haven't implemented ping and pong.
+ _channelNotResumed.signal();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ private void handleChannelClose(ChannelCloseBody channelCloseBody)
+ {
+ _currentState = AMQPState.CHANNEL_CLOSED;
+ // handle channel related cleanup
+ }
+
+ private void releaseLocks()
+ {
+ if(_currentState == AMQPState.CHANNEL_NOT_OPENED)
+ {
+ _channelNotOpend.signal();
+ _channelNotResumed.signal(); // It could be a channel.resume call
+ }
+ else if(_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND)
+ {
+ _channelFlowNotResponded.signal();
+ }
+ else if(_currentState == AMQPState.CHANNEL_CLOSED)
+ {
+ _channelNotResumed.signal();
+ }
+ }
+
+ private void checkIfConnectionClosed()throws AMQPException
+ {
+ if (_channelCloseBody != null)
+ {
+ String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() +
+ " with reply code (" + _channelCloseBody.getReplyCode() + ") " +
+ "caused by class " + _channelCloseBody.getClassId() +
+ " and method " + _channelCloseBody.getMethod();
+
+ throw new AMQPException(error);
}
-
- private void handleChannelFlowState(boolean flow)
+ }
+
+ private void handleChannelFlow(ChannelFlowBody channelFlowBody)
+ {
+ _lock.lock();
+ try
{
- _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
+ handleChannelFlowState(channelFlowBody.active);
}
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ private void handleChannelFlowState(boolean flow)
+ {
+ _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
+ }
}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,255 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelOkBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageOffsetBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageQosBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.DefaultPhaseContext;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.transport.AMQPConnectionURL;
+import org.apache.qpid.nclient.transport.ConnectionURL;
+import org.apache.qpid.nclient.transport.TransportConnection;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * The Class Factory creates AMQP Class
+ * equivalents defined in the spec.
+ *
+ * There should one instance per connection.
+ * The factory class creates all the support
+ * classes and provides an instance of the
+ * AMQP class in ready-to-use state.
+ *
+ */
+public class AMQPClassFactory
+{
+ //Need an event manager per connection
+ private AMQPEventManager _eventManager = new QpidEventManager();
+
+ // Need a state manager per connection
+ private AMQPStateManager _stateManager = new QpidStateManager();
+
+ //Need a phase pipe per connection
+ private Phase _phase;
+
+ //One instance per connection
+ private AMQPConnection _amqpConnection;
+
+ public AMQPClassFactory()
+ {
+
+ }
+
+ public AMQPConnection createConnection(String urlStr,ConnectionType type)throws AMQPException, URLSyntaxException
+ {
+ AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
+ return createConnectionClass(url,type);
+ }
+
+ public AMQPConnection createConnectionClass(ConnectionURL url,ConnectionType type)throws AMQPException
+ {
+ if (_amqpConnection == null)
+ {
+ PhaseContext ctx = new DefaultPhaseContext();
+ ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
+
+ TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type,ctx);
+ _amqpConnection = new AMQPConnection(conn);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class,_amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class,_amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class,_amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class,_amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class,_amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class,_amqpConnection);
+ }
+ return _amqpConnection;
+ }
+
+ public AMQPChannel createChannelClass(int channel)throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPChannel amqpChannel = new AMQPChannel(channel,_phase);
+ _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelCloseBody.class,amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelFlowBody.class,amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelOkBody.class,amqpChannel);
+ return amqpChannel;
+ }
+
+ public void destroyChannelClass(int channel,AMQPChannel amqpChannel)throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class,amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class,amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelOkBody.class,amqpChannel);
+ }
+
+ public AMQPExchange createExchangeClass(int channel)throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPExchange amqpExchange = new AMQPExchange(channel,_phase);
+ _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange);
+ _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange);
+ return amqpExchange;
+ }
+
+ public void destoryExchangeClass(int channel, AMQPExchange amqpExchange)throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange);
+ _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange);
+ }
+
+ public AMQPQueue createQueueClass(int channel)throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPQueue amqpQueue = new AMQPQueue(channel,_phase);
+ _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueBindOkBody.class,amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue);
+ return amqpQueue;
+ }
+
+ public void destroyQueueClass(int channel,AMQPQueue amqpQueue)throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class,amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue);
+ }
+
+ public AMQPMessage createMessageClass(int channel,AMQPMessageCallBack messageCb)throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPMessage amqpMessage = new AMQPMessage(channel,_phase,messageCb);
+ _eventManager.addMethodEventListener(channel, MessageAppendBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCancelBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCloseBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageGetBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOffsetBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOkBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOpenBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageRecoverBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageRejectBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageResumeBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageQosBody.class,amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageTransferBody.class,amqpMessage);
+
+ return amqpMessage;
+ }
+
+ public void destoryMessageClass(int channel,AMQPMessage amqpMessage)throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, MessageAppendBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCancelBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCloseBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageGetBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOkBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOpenBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRejectBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageResumeBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageQosBody.class,amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageTransferBody.class,amqpMessage);
+ }
+
+ //This class should register as a state listener for AMQPConnection
+ private void checkIfConnectionStarted() throws AMQPException
+ {
+ if (_phase == null)
+ {
+ _phase = _amqpConnection.getPhasePipe();
+
+ if (_phase == null)
+ {
+ throw new AMQPException("Cannot create a channel until connection is ready");
+ }
+ }
+ }
+
+ /**
+ * Extention point
+ * Other interested parties can obtain a reference to the event manager
+ * and add listeners to get notified of events
+ *
+ */
+ public AMQPEventManager getEventManager()
+ {
+ return _eventManager;
+ }
+
+ /**
+ * Extention point
+ * Other interested parties can obtain a reference to the state manager
+ * and add listeners to get notified of state changes
+ *
+ */
+ public AMQPStateManager getStateManager()
+ {
+ return null;
+ }
+}
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java Thu Mar 29 15:24:20 2007
@@ -37,14 +37,14 @@
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.amqp.state.AMQPState;
import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
import org.apache.qpid.nclient.transport.TransportConnection;
import org.apache.qpid.nclient.util.AMQPValidator;
@@ -65,9 +65,8 @@
private AMQPState _currentState;
- private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED,
- AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED,
- AMQPState.CONNECTION_OPEN, };
+ private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE,
+ AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, AMQPState.CONNECTION_OPEN, };
// The wait period until a server sends a respond
private long _serverTimeOut = 1000;
@@ -93,8 +92,10 @@
private ConnectionOpenOkBody _connectionOpenOkBody;
private ConnectionCloseOkBody _connectionCloseOkBody;
+
+ private ConnectionCloseBody _connectionCloseBody;
- public AMQPConnection(TransportConnection connection)
+ protected AMQPConnection(TransportConnection connection)
{
_connection = connection;
_currentState = AMQPState.CONNECTION_UNDEFINED;
@@ -102,12 +103,14 @@
}
/**
- * ------------------------------------------- API Methods --------------------------------------------
- */
+ * -------------------------------------------
+ * API Methods
+ * --------------------------------------------
+ */
/**
- * Opens the TCP connection and let the formalities begin.
- */
+ * Opens the TCP connection and let the formalities begin.
+ */
public ConnectionStartBody openTCPConnection() throws AMQPException
{
_lock.lock();
@@ -119,15 +122,17 @@
_phase = _connection.connect();
// waiting for ConnectionStartBody or error in connection
- _connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_connectionStartBody,
- "The broker didn't send the ConnectionStartBody in time");
+ //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotStarted.await();
+
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
_currentState = AMQPState.CONNECTION_NOT_STARTED;
return _connectionStartBody;
}
- catch (Exception e)
+ catch (InterruptedException e)
{
- throw new AMQPException("XXX");
+ throw new AMQPException("Error opening connection to broker", e);
}
finally
{
@@ -135,25 +140,43 @@
}
}
- public ConnectionSecureBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
+ /**
+ * The current java broker implementation can send a connection tune body
+ * as a response to the startOk. Not sure if that is the correct behaviour.
+ */
+ public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
{
_lock.lock();
try
{
_connectionSecureBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState,
- AMQPState.CONNECTION_NOT_SECURE);
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
_phase.messageSent(msg);
- _connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_connectionSecureBody,
- "The broker didn't send the ConnectionSecureBody in time");
- _currentState = AMQPState.CONNECTION_NOT_SECURE;
- return _connectionSecureBody;
+ //_connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotSecure.await();
+ //AMQPValidator.throwExceptionOnNull(_connectionSecureBody, "The broker didn't send the ConnectionSecureBody in time");
+ //_currentState = AMQPState.CONNECTION_NOT_SECURE;
+
+ checkIfConnectionClosed();
+ if (_connectionTuneBody != null)
+ {
+ _currentState = AMQPState.CONNECTION_NOT_TUNED;
+ return _connectionTuneBody;
+ }
+ else if (_connectionSecureBody != null)
+ { // oops the server sent another challenge
+ _currentState = AMQPState.CONNECTION_NOT_SECURE;
+ return _connectionSecureBody;
+ }
+ else
+ {
+ throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+ }
}
- catch (Exception e)
+ catch (InterruptedException e)
{
- throw new AMQPException("XXX");
+ throw new AMQPException("Error in connection.startOk", e);
}
finally
{
@@ -162,9 +185,9 @@
}
/**
- * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
- * issue a new challenge
- */
+ * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
+ * issue a new challenge
+ */
public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
{
_lock.lock();
@@ -173,11 +196,15 @@
_connectionTuneBody = null;
_connectionSecureBody = null;
checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
+
_connectionSecureBody = null; // The server could send a fresh challenge
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody,
- _correlationId);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
_phase.messageSent(msg);
- _connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+
+ //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotTuned.await();
+ checkIfConnectionClosed();
+
if (_connectionTuneBody != null)
{
_currentState = AMQPState.CONNECTION_NOT_TUNED;
@@ -193,9 +220,9 @@
throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
}
}
- catch (Exception e)
+ catch (InterruptedException e)
{
- throw new AMQPException("XXX");
+ throw new AMQPException("Error in connection.secureOk", e);
}
finally
{
@@ -214,10 +241,6 @@
_phase.messageSent(msg);
_currentState = AMQPState.CONNECTION_NOT_OPENED;
}
- catch (Exception e)
- {
- throw new AMQPException("XXX");
- }
finally
{
_lock.unlock();
@@ -229,20 +252,26 @@
_lock.lock();
try
{
+ // If the broker sends a connection close due to an error with the
+ // Connection tune ok, then this call will verify that
+ checkIfConnectionClosed();
+
_connectionOpenOkBody = null;
checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody,
- QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
_phase.messageSent(msg);
- _connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody,
- "The broker didn't send the ConnectionOpenOkBody in time");
+
+ //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotOpened.await();
+
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
_currentState = AMQPState.CONNECTION_OPEN;
return _connectionOpenOkBody;
}
- catch (Exception e)
+ catch (InterruptedException e)
{
- throw new AMQPException("XXX");
+ throw new AMQPException("Error in connection.open", e);
}
finally
{
@@ -257,18 +286,16 @@
{
_connectionCloseOkBody = null;
checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody,
- QpidConstants.EMPTY_CORRELATION_ID);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
_phase.messageSent(msg);
_connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody,
- "The broker didn't send the ConnectionCloseOkBody in time");
+ AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
_currentState = AMQPState.CONNECTION_CLOSED;
return _connectionCloseOkBody;
}
- catch (Exception e)
+ catch (InterruptedException e)
{
- throw new AMQPException("XXX");
+ throw new AMQPException("Error in connection.close", e);
}
finally
{
@@ -277,72 +304,117 @@
}
/**
- * ------------------------------------------- AMQMethodListener methods
- * --------------------------------------------
- */
+ * ------------------------------------------- AMQMethodListener methods
+ * --------------------------------------------
+ */
public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
{
- _correlationId = evt.getCorrelationId();
-
- if (evt.getMethod() instanceof ConnectionStartBody)
+ _lock.lock();
+ try
{
- _connectionStartBody = (ConnectionStartBody) evt.getMethod();
- _connectionNotStarted.signal();
- return true;
+ _correlationId = evt.getCorrelationId();
+
+ if (evt.getMethod() instanceof ConnectionStartBody)
+ {
+ _connectionStartBody = (ConnectionStartBody) evt.getMethod();
+ _connectionNotStarted.signalAll();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionSecureBody)
+ {
+ _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
+ _connectionNotSecure.signal();
+ _connectionNotTuned.signal(); // in case the server has sent another chanllenge
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionTuneBody)
+ {
+ _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
+ _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk
+ _connectionNotTuned.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionOpenOkBody)
+ {
+ _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
+ _connectionNotOpened.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseOkBody)
+ {
+ _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
+ _connectionNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseBody)
+ {
+ _connectionCloseBody = (ConnectionCloseBody)evt.getMethod();
+ // release the correct lock as u may have some conditions waiting.
+ // while an error occured and the broker has sent a close.
+ releaseLocks();
+ handleClose();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
- else if (evt.getMethod() instanceof ConnectionSecureBody)
+ finally
{
- _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
- _connectionNotSecure.signal();
- _connectionNotTuned.signal(); // in case the server has sent another chanllenge
- return true;
+ _lock.unlock();
}
- else if (evt.getMethod() instanceof ConnectionTuneBody)
+ }
+
+ private void handleClose() throws AMQPException
+ {
+ try
+ {
+ _currentState = AMQPState.CONNECTION_CLOSING;
+ // do the required cleanup and send a ConnectionCloseOkBody
+ }
+ catch (Exception e)
{
- _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
- _connectionNotTuned.signal();
- return true;
+ throw new AMQPException("Error handling connection.close from broker", e);
}
- else if (evt.getMethod() instanceof ConnectionOpenOkBody)
+ }
+
+ private void checkIfConnectionClosed()throws AMQPException
+ {
+ if (_connectionCloseBody != null)
+ {
+ String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() +
+ " with reply code (" + _connectionCloseBody.getReplyCode() + ") " +
+ "caused by class " + _connectionCloseBody.getClassId() +
+ " and method " + _connectionCloseBody.getMethod();
+
+ throw new AMQPException(error);
+ }
+ }
+
+ private void releaseLocks()
+ {
+ if(_currentState == AMQPState.CONNECTION_NOT_OPENED)
{
- _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
_connectionNotOpened.signal();
- return true;
}
- else if (evt.getMethod() instanceof ConnectionCloseOkBody)
+ else if(_currentState == AMQPState.CONNECTION_UNDEFINED)
{
- _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
- _connectionNotClosed.signal();
- return true;
+ _connectionNotStarted.signal();
}
- else if (evt.getMethod() instanceof ConnectionCloseBody)
+ else if(_currentState == AMQPState.CONNECTION_NOT_STARTED)
{
- handleClose();
- return true;
+ _connectionNotSecure.signal();
}
- else
+ else if(_currentState == AMQPState.CONNECTION_NOT_SECURE)
{
- return false;
+ _connectionNotTuned.signal();
}
}
- public void handleClose() throws AMQPException
+ public Phase getPhasePipe()
{
- _lock.lock();
- try
- {
- checkIfValidStateTransition(AMQPState.CONNECTION_OPEN, _currentState, AMQPState.CONNECTION_CLOSING);
- _currentState = AMQPState.CONNECTION_CLOSING;
- // do the required cleanup and send a ConnectionCloseOkBody
- }
- catch (Exception e)
- {
- throw new AMQPException("XXX");
- }
- finally
- {
- _lock.unlock();
- }
+ return _phase;
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java Thu Mar 29 15:24:20 2007
@@ -26,10 +26,10 @@
import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.ExchangeDeleteBody;
import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
/**
*
@@ -43,7 +43,7 @@
{
private Phase _phase;
- public AMQPExchange(int channelId,Phase phase)
+ protected AMQPExchange(int channelId,Phase phase)
{
super(channelId);
_phase = phase;
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java Thu Mar 29 15:24:20 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.framing.MessageCancelBody;
import org.apache.qpid.framing.MessageCheckpointBody;
import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageConsumeBody;
import org.apache.qpid.framing.MessageEmptyBody;
import org.apache.qpid.framing.MessageGetBody;
import org.apache.qpid.framing.MessageOffsetBody;
@@ -35,10 +36,10 @@
import org.apache.qpid.framing.MessageRejectBody;
import org.apache.qpid.framing.MessageResumeBody;
import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
/**
* This class represents the AMQP Message class.
@@ -59,7 +60,7 @@
private Phase _phase;
private AMQPMessageCallBack _messageCb;
- public AMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb)
+ protected AMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb)
{
super(channelId);
_phase = phase;
@@ -78,9 +79,9 @@
_phase.messageSent(msg);
}
- public void consume(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException
+ public void consume(MessageConsumeBody messageConsumeBody,AMQPCallBack cb) throws AMQPException
{
- AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb);
+ AMQPMethodEvent msg = handleAsynchronousCall(messageConsumeBody,cb);
_phase.messageSent(msg);
}
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java Thu Mar 29 15:24:20 2007
@@ -21,10 +21,8 @@
package org.apache.qpid.nclient.amqp;
import org.apache.qpid.framing.MessageAppendBody;
-import org.apache.qpid.framing.MessageCancelBody;
import org.apache.qpid.framing.MessageCheckpointBody;
import org.apache.qpid.framing.MessageCloseBody;
-import org.apache.qpid.framing.MessageGetBody;
import org.apache.qpid.framing.MessageOpenBody;
import org.apache.qpid.framing.MessageRecoverBody;
import org.apache.qpid.framing.MessageResumeBody;
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java Thu Mar 29 15:24:20 2007
@@ -31,10 +31,10 @@
import org.apache.qpid.framing.QueuePurgeOkBody;
import org.apache.qpid.framing.QueueUnbindBody;
import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
/**
*
@@ -48,7 +48,7 @@
{
private Phase _phase;
- public AMQPQueue(int channelId,Phase phase)
+ protected AMQPQueue(int channelId,Phase phase)
{
super(channelId);
_phase = phase;
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+
+/**
+ * This class registeres with the ModelPhase as a AMQMethodListener,
+ * to receive method events and then it distributes methods to other listerners
+ * using a filtering criteria. The criteria is channel id and method body class.
+ * The method listeners are added and removed dynamically
+ *
+ * <p/>
+ */
+public class QpidEventManager implements AMQPEventManager
+{
+ private static final Logger _logger = Logger.getLogger(QpidEventManager.class);
+
+ private Map<Integer, Map> _channelMap = new ConcurrentHashMap<Integer, Map>();
+
+ /**
+ * ------------------------------------------------
+ * methods introduced by AMQEventManager
+ * ------------------------------------------------
+ */
+ public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l)
+ {
+ Map<Class, List> _methodListenerMap;
+ if (_channelMap.containsKey(channelId))
+ {
+ _methodListenerMap = _channelMap.get(channelId);
+
+ }
+ else
+ {
+ _methodListenerMap = new ConcurrentHashMap<Class, List>();
+ _channelMap.put(channelId, _methodListenerMap);
+ }
+
+ List<AMQPMethodListener> _listeners;
+ if (_methodListenerMap.containsKey(clazz))
+ {
+ _listeners = _methodListenerMap.get(clazz);
+ }
+ else
+ {
+ _listeners = new ArrayList<AMQPMethodListener>();
+ _methodListenerMap.put(clazz, _listeners);
+ }
+
+ _listeners.add(l);
+
+ }
+
+ public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l)
+ {
+ if (_channelMap.containsKey(channelId))
+ {
+ Map<Class, List> _methodListenerMap = _channelMap.get(channelId);
+
+ if (_methodListenerMap.containsKey(clazz))
+ {
+ List<AMQPMethodListener> _listeners = _methodListenerMap.get(clazz);
+ _listeners.remove(l);
+ }
+
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.model.AMQStateManager#methodReceived(org.apache.qpid.protocol.AMQMethodEvent)
+ */
+ public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ if (_channelMap.containsKey(evt.getChannelId()))
+ {
+ Map<Class, List> _methodListenerMap = _channelMap.get(evt.getChannelId());
+
+ if (_methodListenerMap.containsKey(evt.getMethod().getClass()))
+ {
+
+ List<AMQPMethodListener> _listeners = _methodListenerMap.get(evt.getMethod().getClass());
+ for (AMQPMethodListener l : _listeners)
+ {
+ l.methodReceived(evt);
+ }
+
+ return (_listeners.size() > 0);
+ }
+
+ }
+
+ return false;
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+
+public class QpidStateManager implements AMQPStateManager
+{
+
+ public void addListener(AMQPStateListener l) throws AMQException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void removeListener(AMQPStateListener l) throws AMQException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,11 @@
+package org.apache.qpid.nclient.amqp.event;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPEventManager
+{
+ public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l);
+ public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l);
+ public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException;
+}
\ No newline at end of file
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,78 @@
+package org.apache.qpid.nclient.amqp.event;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+/**
+ * This class is exactly the same as the AMQMethod event.
+ * Except I renamed requestId to corelationId, so I could use it both ways.
+ *
+ * I didn't want to modify anything in common so that there is no
+ * impact on the existing code.
+ *
+ */
+public class AMQPMethodEvent<M extends AMQMethodBody>
+{
+
+ private final M _method;
+
+ private final int _channelId;
+
+ /**
+ * This is the rquest id from the broker when it sent me a request
+ * when I respond I remember this id and copy this to the outgoing
+ * response.
+ */
+ private final long _correlationId;
+
+ /**
+ * I could use _correlationId, bcos when I send a request
+ * this field is blank and is only used internally. But I
+ * used a seperate field to make it more clear.
+ */
+ private long _localCorrletionId = 0;
+
+ public AMQPMethodEvent(int channelId, M method, long correlationId, long localCorrletionId)
+ {
+ _channelId = channelId;
+ _method = method;
+ _correlationId = correlationId;
+ _localCorrletionId = localCorrletionId;
+ }
+
+ public AMQPMethodEvent(int channelId, M method, long correlationId)
+ {
+ _channelId = channelId;
+ _method = method;
+ _correlationId = correlationId;
+ }
+
+ public M getMethod()
+ {
+ return _method;
+ }
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+ public long getCorrelationId()
+ {
+ return _correlationId;
+ }
+
+ public long getLocalCorrelationId()
+ {
+ return _localCorrletionId;
+ }
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("Method event: \n");
+ buf.append("Channel id: \n").append(_channelId);
+ buf.append("Method: \n").append(_method);
+ buf.append("Request Id: ").append(_correlationId);
+ buf.append("Local Correlation Id: ").append(_localCorrletionId);
+ return buf.toString();
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,11 @@
+package org.apache.qpid.nclient.amqp.event;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPMethodListener
+{
+
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException;
+
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,57 @@
+package org.apache.qpid.nclient.amqp.sample;
+
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class MessageHelper implements AMQPMessageCallBack
+{
+
+ public void append(MessageAppendBody messageAppendBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void checkpoint(MessageCheckpointBody messageCheckpointBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void close(MessageCloseBody messageCloseBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void open(MessageOpenBody messageOpenBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void recover(MessageRecoverBody messageRecoverBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void resume(MessageResumeBody messageResumeBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void transfer(MessageTransferBody messageTransferBody, long correlationId) throws AMQPException
+ {
+ System.out.println("The Broker has sent a message" + messageTransferBody.toString());
+ }
+
+}