You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC
svn commit: r686136 [13/17] - in /incubator/qpid/branches/qpid.0-10/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/bin/ broker/etc/ broker...
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java Thu Aug 14 20:40:49 2008
@@ -27,31 +27,32 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.AMQMethodNotImplementedException;
-
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ClientMethodDispatcherImpl implements MethodDispatcher
{
-
- private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance();
- private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance();
- private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
- private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
- private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
- private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
- private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
+ private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance();
+ private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance();
+ private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
+ private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
+ private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
+ private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
+ private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
- private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance();
- private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance();
- private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance();
- private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance();
- private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance();
-
+ private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance();
+ private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance();
+ private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance();
+ private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance();
+ private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance();
+ private static final Logger _logger = LoggerFactory.getLogger(ClientMethodDispatcherImpl.class);
private static interface DispatcherFactory
{
- public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager);
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session);
}
private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
@@ -62,44 +63,44 @@
_dispatcherFactories.put(ProtocolVersion.v8_0,
new DispatcherFactory()
{
- public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session)
{
- return new ClientMethodDispatcherImpl_8_0(stateManager);
+ return new ClientMethodDispatcherImpl_8_0(session);
}
});
_dispatcherFactories.put(ProtocolVersion.v0_9,
new DispatcherFactory()
{
- public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session)
{
- return new ClientMethodDispatcherImpl_0_9(stateManager);
+ return new ClientMethodDispatcherImpl_0_9(session);
}
});
}
-
- public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager)
+ public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQProtocolSession session)
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("New Method Dispatcher:" + session);
+ }
+
DispatcherFactory factory = _dispatcherFactories.get(version);
- return factory.createMethodDispatcher(stateManager);
+ return factory.createMethodDispatcher(session);
}
-
+ AMQProtocolSession _session;
-
- private AMQStateManager _stateManager;
-
- public ClientMethodDispatcherImpl(AMQStateManager stateManager)
+ public ClientMethodDispatcherImpl(AMQProtocolSession session)
{
- _stateManager = stateManager;
+ _session = session;
}
-
public AMQStateManager getStateManager()
{
- return _stateManager;
+ return _session.getStateManager();
}
public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
@@ -109,7 +110,7 @@
public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
{
- _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ _basicCancelOkMethodHandler.methodReceived(_session, body, channelId);
return true;
}
@@ -120,7 +121,7 @@
public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
{
- _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId);
+ _basicDeliverMethodHandler.methodReceived(_session, body, channelId);
return true;
}
@@ -141,13 +142,13 @@
public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
{
- _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId);
+ _basicReturnMethodHandler.methodReceived(_session, body, channelId);
return true;
}
public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
{
- _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+ _channelCloseMethodHandler.methodReceived(_session, body, channelId);
return true;
}
@@ -163,7 +164,7 @@
public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
{
- _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ _channelFlowOkMethodHandler.methodReceived(_session, body, channelId);
return true;
}
@@ -174,7 +175,7 @@
public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
{
- _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+ _connectionCloseMethodHandler.methodReceived(_session, body, channelId);
return true;
}
@@ -185,37 +186,37 @@
public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
{
- _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ _connectionOpenOkMethodHandler.methodReceived(_session, body, channelId);
return true;
}
public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
{
- _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId);
+ _connectionRedirectMethodHandler.methodReceived(_session, body, channelId);
return true;
}
public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
{
- _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId);
+ _connectionSecureMethodHandler.methodReceived(_session, body, channelId);
return true;
}
public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
{
- _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId);
+ _connectionStartMethodHandler.methodReceived(_session, body, channelId);
return true;
}
public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
{
- _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId);
+ _connectionTuneMethodHandler.methodReceived(_session, body, channelId);
return true;
}
public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
{
- _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ _queueDeleteOkMethodHandler.methodReceived(_session, body, channelId);
return true;
}
@@ -431,7 +432,7 @@
public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
{
- _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ _exchangeBoundOkMethodHandler.methodReceived(_session, body, channelId);
return true;
}
@@ -522,7 +523,7 @@
public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
{
- return false;
+ return false;
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java Thu Aug 14 20:40:49 2008
@@ -26,16 +26,15 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.AMQMethodNotImplementedException;
-
+import org.apache.qpid.client.protocol.AMQProtocolSession;
public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9
{
- public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+ public ClientMethodDispatcherImpl_0_9(AMQProtocolSession session)
{
- super(stateManager);
+ super(session);
}
-
public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
{
return false;
@@ -148,8 +147,7 @@
public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
{
- return false;
+ return false;
}
-
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java Thu Aug 14 20:40:49 2008
@@ -24,13 +24,13 @@
import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0
{
- public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager)
+ public ClientMethodDispatcherImpl_8_0(AMQProtocolSession session)
{
- super(stateManager);
+ super(session);
}
public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -25,13 +25,11 @@
import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,14 +46,13 @@
}
private ConnectionCloseMethodHandler()
- { }
+ {
+ }
- public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody method, int channelId)
- throws AMQException
+ public void methodReceived(AMQProtocolSession session, ConnectionCloseBody method, int channelId)
+ throws AMQException
{
_logger.info("ConnectionClose frame received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
-
// does it matter
// stateManager.changeState(AMQState.CONNECTION_CLOSING);
@@ -63,6 +60,8 @@
AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
AMQShortString reason = method.getReplyText();
+ AMQException error = null;
+
try
{
@@ -75,35 +74,33 @@
{
if (errorCode == AMQConstant.NOT_ALLOWED || (errorCode == AMQConstant.ACCESS_REFUSED))
{
- _logger.info("Error :" + errorCode +":"+ Thread.currentThread().getName());
-
- // todo ritchiem : Why do this here when it is going to be done in the finally block?
- session.closeProtocolSession();
+ _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName());
- // todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
- stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
-
- throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null);
+ error = new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null);
}
else
{
_logger.info("Connection close received with error code " + errorCode);
- throw new AMQConnectionClosedException(errorCode, "Error: " + reason, null);
+ error = new AMQConnectionClosedException(errorCode, "Error: " + reason, null);
}
}
}
finally
{
- // this actually closes the connection in the case where it is not an error.
+ if (error != null)
+ {
+ session.notifyError(error);
+ }
+
+ // Close the protocol Session, including any open TCP connections
session.closeProtocolSession();
- // ritchiem: Doing this though will cause any waiting connection start to be released without being able to
- // see what the cause was.
- stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ // Closing the session should not introduce a race condition as this thread will continue to propgate any
+ // exception in to the exceptionCaught method of the SessionHandler.
+ // Any sessionClosed event should occur after this.
}
}
-
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -24,9 +24,7 @@
import org.apache.qpid.framing.ConnectionOpenOkBody;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.protocol.AMQMethodEvent;
public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<ConnectionOpenOkBody>
{
@@ -41,10 +39,10 @@
{
}
- public void methodReceived(AMQStateManager stateManager, ConnectionOpenOkBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, ConnectionOpenOkBody body, int channelId)
throws AMQException
{
- stateManager.changeState(AMQState.CONNECTION_OPEN);
+ session.getStateManager().changeState(AMQState.CONNECTION_OPEN);
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -22,10 +22,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ConnectionRedirectBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,11 +44,10 @@
private ConnectionRedirectMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, ConnectionRedirectBody method, int channelId)
+ public void methodReceived(AMQProtocolSession session, ConnectionRedirectBody method, int channelId)
throws AMQException
{
_logger.info("ConnectionRedirect frame received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
String host = method.getHost().toString();
// the host is in the form hostname:port with the port being optional
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -25,12 +25,9 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
public class ConnectionSecureMethodHandler implements StateAwareMethodListener<ConnectionSecureBody>
{
@@ -41,10 +38,9 @@
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, ConnectionSecureBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, ConnectionSecureBody body, int channelId)
throws AMQException
{
- final AMQProtocolSession session = stateManager.getProtocolSession();
SaslClient client = session.getSaslClient();
if (client == null)
{
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -25,7 +25,6 @@
import org.apache.qpid.client.security.AMQCallbackHandler;
import org.apache.qpid.client.security.CallbackHandlerRegistry;
import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.common.QpidProperties;
@@ -35,7 +34,6 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,15 +60,12 @@
private ConnectionStartMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, ConnectionStartBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, ConnectionStartBody body, int channelId)
throws AMQException
{
_log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, "
+ "AMQMethodEvent evt): called");
- final AMQProtocolSession session = stateManager.getProtocolSession();
-
-
ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(), (byte) body.getVersionMinor());
// For the purposes of interop, we can make the client accept the broker's version string.
@@ -145,7 +140,7 @@
throw new AMQException(null, "No locales sent from server, passed: " + locales, null);
}
- stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+ session.getStateManager().changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()),
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -24,10 +24,8 @@
import org.apache.qpid.client.ConnectionTuneParameters;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.*;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,11 +44,10 @@
protected ConnectionTuneMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, ConnectionTuneBody frame, int channelId)
+ public void methodReceived(AMQProtocolSession session, ConnectionTuneBody frame, int channelId)
throws AMQException
{
_logger.debug("ConnectionTune frame received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
final MethodRegistry methodRegistry = session.getMethodRegistry();
@@ -65,7 +62,7 @@
params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat()));
session.setConnectionTuneParameters(params);
- stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+ session.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED);
ConnectionTuneOkBody tuneOkBody = methodRegistry.createConnectionTuneOkBody(params.getChannelMax(),
params.getFrameMax(),
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -22,10 +22,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +44,7 @@
private ExchangeBoundOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, ExchangeBoundOkBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, ExchangeBoundOkBody body, int channelId)
throws AMQException
{
if (_logger.isDebugEnabled())
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -22,10 +22,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +44,7 @@
private QueueDeleteOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, QueueDeleteOkBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, QueueDeleteOkBody body, int channelId)
throws AMQException
{
if (_logger.isDebugEnabled())
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java Thu Aug 14 20:40:49 2008
@@ -31,7 +31,6 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
/**
* @author Apache Software Foundation
@@ -44,21 +43,21 @@
*/
private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
- AbstractBytesMessage()
+ AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory)
{
- this(null);
+ this(delegateFactory, null);
}
/**
* Construct a bytes message with existing data.
*
+ * @param delegateFactory
* @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- * set to auto expand
*/
- AbstractBytesMessage(ByteBuffer data)
+ AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
{
- super(data); // this instanties a content header
- getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
+ super(delegateFactory, data); // this instanties a content header
+ setContentType(getMimeType());
if (_data == null)
{
@@ -72,13 +71,12 @@
_data.setAutoExpand(true);
}
- AbstractBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
- {
- // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
- super(messageNbr, contentHeader, exchange, routingKey, data);
- getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
- }
+ AbstractBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+ {
+ super(delegate, data);
+ setContentType(getMimeType());
+ }
+
public void clearBodyImpl() throws JMSException
{
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java Thu Aug 14 20:40:49 2008
@@ -33,7 +33,6 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
/**
@@ -70,27 +69,28 @@
*/
private int _byteArrayRemaining = -1;
- AbstractBytesTypedMessage()
+ AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory)
{
- this(null);
+
+ this(delegateFactory, null);
}
/**
* Construct a stream message with existing data.
*
+ * @param delegateFactory
* @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- * set to auto expand
*/
- AbstractBytesTypedMessage(ByteBuffer data)
+ AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
{
- super(data); // this instanties a content header
- }
+ super(delegateFactory, data); // this instanties a content header
+ }
- AbstractBytesTypedMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ AbstractBytesTypedMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, exchange, routingKey, data);
+
+ super(delegate, data);
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Thu Aug 14 20:40:49 2008
@@ -21,10 +21,7 @@
package org.apache.qpid.client.message;
import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.Collections;
import java.util.Enumeration;
-import java.util.Map;
import java.util.UUID;
import javax.jms.Destination;
@@ -32,122 +29,48 @@
import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
-import org.apache.commons.collections.map.ReferenceMap;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQUndefinedDestination;
-import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.client.CustomJMSXProperty;
-import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.BindingURL;
-public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
+public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
{
- private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
- public static final String JMS_TYPE = "x-jms-type";
- protected boolean _redelivered;
protected ByteBuffer _data;
- private boolean _readableProperties = false;
protected boolean _readableMessage = false;
protected boolean _changedData = true;
- private Destination _destination;
- private JMSHeaderAdapter _headerAdapter;
- private static final boolean STRICT_AMQP_COMPLIANCE =
- Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
- /**
- * This is 0_10 specific
- */
- private org.apache.qpidity.api.Message _010message = null;
+ /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
- public void set010Message(org.apache.qpidity.api.Message m )
- {
- _010message = m;
- }
-
- public void dataChanged()
- {
- if (_010message != null)
- {
- _010message.clearData();
- try
- {
- if (_data != null)
- {
- _010message.appendData(_data.buf().slice());
- }
- else
- {
- _010message.appendData(java.nio.ByteBuffer.allocate(0));
- }
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
- /**
- * End 010 specific
- */
- public org.apache.qpidity.api.Message get010Message()
- {
- return _010message;
- }
+ private AMQMessageDelegate _delegate;
+ private boolean _redelivered;
- protected AbstractJMSMessage(ByteBuffer data)
+ protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
{
- super(new BasicContentHeaderProperties());
+ _delegate = delegateFactory.createDelegate();
_data = data;
if (_data != null)
{
_data.acquire();
}
- _readableProperties = false;
+
_readableMessage = (data != null);
_changedData = (data == null);
- _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
}
- protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ protected AbstractJMSMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- this(contentHeader, deliveryTag);
-
- Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
- AMQDestination dest;
-
- if (AMQDestination.QUEUE_TYPE.equals(type))
- {
- dest = new AMQQueue(exchange, routingKey, routingKey);
- }
- else if (AMQDestination.TOPIC_TYPE.equals(type))
- {
- dest = new AMQTopic(exchange, routingKey, null);
- }
- else
- {
- dest = new AMQUndefinedDestination(exchange, routingKey, null);
- }
- // Destination dest = AMQDestination.createDestination(url);
- setJMSDestination(dest);
+ _delegate = delegate;
_data = data;
if (_data != null)
@@ -159,126 +82,82 @@
}
- protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag)
+ public String getJMSMessageID() throws JMSException
{
- super(contentHeader, deliveryTag);
- _readableProperties = (_contentHeaderProperties != null);
- _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+ return _delegate.getJMSMessageID();
}
- public String getJMSMessageID() throws JMSException
+ public void setJMSMessageID(String messageId) throws JMSException
{
- return getContentHeaderProperties().getMessageIdAsString();
+ _delegate.setJMSMessageID(messageId);
}
- public void setJMSMessageID(String messageId) throws JMSException
+ public void setJMSMessageID(UUID messageId) throws JMSException
{
- getContentHeaderProperties().setMessageId(messageId);
+ _delegate.setJMSMessageID(messageId);
}
+
public long getJMSTimestamp() throws JMSException
{
- return getContentHeaderProperties().getTimestamp();
+ return _delegate.getJMSTimestamp();
}
public void setJMSTimestamp(long timestamp) throws JMSException
{
- getContentHeaderProperties().setTimestamp(timestamp);
+ _delegate.setJMSTimestamp(timestamp);
}
public byte[] getJMSCorrelationIDAsBytes() throws JMSException
{
- return getContentHeaderProperties().getCorrelationIdAsString().getBytes();
+ return _delegate.getJMSCorrelationIDAsBytes();
}
public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
{
- getContentHeaderProperties().setCorrelationId(new String(bytes));
+ _delegate.setJMSCorrelationIDAsBytes(bytes);
}
public void setJMSCorrelationID(String correlationId) throws JMSException
{
- getContentHeaderProperties().setCorrelationId(correlationId);
+ _delegate.setJMSCorrelationID(correlationId);
}
public String getJMSCorrelationID() throws JMSException
{
- return getContentHeaderProperties().getCorrelationIdAsString();
+ return _delegate.getJMSCorrelationID();
}
public Destination getJMSReplyTo() throws JMSException
{
- String replyToEncoding = getContentHeaderProperties().getReplyToAsString();
- if (replyToEncoding == null)
- {
- return null;
- }
- else
- {
- Destination dest = (Destination) _destinationCache.get(replyToEncoding);
- if (dest == null)
- {
- try
- {
- BindingURL binding = new AMQBindingURL(replyToEncoding);
- dest = AMQDestination.createDestination(binding);
- }
- catch (URISyntaxException e)
- {
- throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
- }
-
- _destinationCache.put(replyToEncoding, dest);
- }
-
- return dest;
- }
+ return _delegate.getJMSReplyTo();
}
public void setJMSReplyTo(Destination destination) throws JMSException
{
- if (destination == null)
- {
- throw new IllegalArgumentException("Null destination not allowed");
- }
-
- if (!(destination instanceof AMQDestination))
- {
- throw new IllegalArgumentException(
- "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
- }
-
- final AMQDestination amqd = (AMQDestination) destination;
-
- final AMQShortString encodedDestination = amqd.getEncodedName();
- _destinationCache.put(encodedDestination, destination);
- getContentHeaderProperties().setReplyTo(encodedDestination);
+ _delegate.setJMSReplyTo(destination);
}
public Destination getJMSDestination() throws JMSException
{
- return _destination;
+ return _delegate.getJMSDestination();
}
public void setJMSDestination(Destination destination)
{
- _destination = destination;
+ _delegate.setJMSDestination(destination);
}
public int getJMSDeliveryMode() throws JMSException
{
- return getContentHeaderProperties().getDeliveryMode();
+ return _delegate.getJMSDeliveryMode();
}
public void setJMSDeliveryMode(int i) throws JMSException
{
- getContentHeaderProperties().setDeliveryMode((byte) i);
+ _delegate.setJMSDeliveryMode(i);
}
- public BasicContentHeaderProperties getContentHeaderProperties()
- {
- return (BasicContentHeaderProperties) _contentHeaderProperties;
- }
public boolean getJMSRedelivered() throws JMSException
{
@@ -290,318 +169,180 @@
_redelivered = b;
}
+
public String getJMSType() throws JMSException
{
- return getContentHeaderProperties().getTypeAsString();
+ return _delegate.getJMSType();
}
public void setJMSType(String string) throws JMSException
{
- getContentHeaderProperties().setType(string);
+ _delegate.setJMSType(string);
}
public long getJMSExpiration() throws JMSException
{
- return getContentHeaderProperties().getExpiration();
+ return _delegate.getJMSExpiration();
}
public void setJMSExpiration(long l) throws JMSException
{
- getContentHeaderProperties().setExpiration(l);
+ _delegate.setJMSExpiration(l);
}
public int getJMSPriority() throws JMSException
{
- return getContentHeaderProperties().getPriority();
+ return _delegate.getJMSPriority();
}
public void setJMSPriority(int i) throws JMSException
{
- getContentHeaderProperties().setPriority((byte) i);
- }
-
- public void clearProperties() throws JMSException
- {
- getJmsHeaders().clear();
-
- _readableProperties = false;
- }
-
- public void clearBody() throws JMSException
- {
- clearBodyImpl();
- _readableMessage = false;
+ _delegate.setJMSPriority(i);
}
- public boolean propertyExists(AMQShortString propertyName) throws JMSException
- {
- return getJmsHeaders().propertyExists(propertyName);
- }
public boolean propertyExists(String propertyName) throws JMSException
{
- return getJmsHeaders().propertyExists(propertyName);
+ return _delegate.propertyExists(propertyName);
}
- public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException
+ public boolean getBooleanProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getBoolean(propertyName);
+ return _delegate.getBooleanProperty(s);
}
- public boolean getBooleanProperty(String propertyName) throws JMSException
+ public byte getByteProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getBoolean(propertyName);
+ return _delegate.getByteProperty(s);
}
- public byte getByteProperty(String propertyName) throws JMSException
+ public short getShortProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getByte(propertyName);
+ return _delegate.getShortProperty(s);
}
- public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
+ public int getIntProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getBytes(propertyName);
+ return _delegate.getIntProperty(s);
}
- public short getShortProperty(String propertyName) throws JMSException
+ public long getLongProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getShort(propertyName);
+ return _delegate.getLongProperty(s);
}
- public int getIntProperty(String propertyName) throws JMSException
+ public float getFloatProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getInteger(propertyName);
+ return _delegate.getFloatProperty(s);
}
- public long getLongProperty(String propertyName) throws JMSException
+ public double getDoubleProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getLong(propertyName);
- }
-
- public float getFloatProperty(String propertyName) throws JMSException
- {
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getFloat(propertyName);
+ return _delegate.getDoubleProperty(s);
}
- public double getDoubleProperty(String propertyName) throws JMSException
+ public String getStringProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getDouble(propertyName);
+ return _delegate.getStringProperty(s);
}
- public String getStringProperty(String propertyName) throws JMSException
+ public Object getObjectProperty(final String s)
+ throws JMSException
{
- //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
- if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
- {
- return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
- }
- else
- {
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getString(propertyName);
- }
+ return _delegate.getObjectProperty(s);
}
- public Object getObjectProperty(String propertyName) throws JMSException
+ public Enumeration getPropertyNames()
+ throws JMSException
{
- return getJmsHeaders().getObject(propertyName);
+ return _delegate.getPropertyNames();
}
- public Enumeration getPropertyNames() throws JMSException
+ public void setBooleanProperty(final String s, final boolean b)
+ throws JMSException
{
- return getJmsHeaders().getPropertyNames();
+ _delegate.setBooleanProperty(s, b);
}
- public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException
+ public void setByteProperty(final String s, final byte b)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- checkWritableProperties();
- getJmsHeaders().setBoolean(propertyName, b);
+ _delegate.setByteProperty(s, b);
}
- public void setBooleanProperty(String propertyName, boolean b) throws JMSException
+ public void setShortProperty(final String s, final short i)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- checkWritableProperties();
- getJmsHeaders().setBoolean(propertyName, b);
+ _delegate.setShortProperty(s, i);
}
- public void setByteProperty(String propertyName, byte b) throws JMSException
+ public void setIntProperty(final String s, final int i)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- checkWritableProperties();
- getJmsHeaders().setByte(propertyName, new Byte(b));
+ _delegate.setIntProperty(s, i);
}
- public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException
+ public void setLongProperty(final String s, final long l)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- checkWritableProperties();
- getJmsHeaders().setBytes(propertyName, bytes);
+ _delegate.setLongProperty(s, l);
}
- public void setShortProperty(String propertyName, short i) throws JMSException
+ public void setFloatProperty(final String s, final float v)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- checkWritableProperties();
- getJmsHeaders().setShort(propertyName, new Short(i));
+ _delegate.setFloatProperty(s, v);
}
- public void setIntProperty(String propertyName, int i) throws JMSException
+ public void setDoubleProperty(final String s, final double v)
+ throws JMSException
{
- checkWritableProperties();
- JMSHeaderAdapter.checkPropertyName(propertyName);
- super.setIntProperty(new AMQShortString(propertyName), new Integer(i));
+ _delegate.setDoubleProperty(s, v);
}
- public void setLongProperty(String propertyName, long l) throws JMSException
+ public void setStringProperty(final String s, final String s1)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- checkWritableProperties();
- getJmsHeaders().setLong(propertyName, new Long(l));
+ _delegate.setStringProperty(s, s1);
}
- public void setFloatProperty(String propertyName, float f) throws JMSException
+ public void setObjectProperty(final String s, final Object o)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- checkWritableProperties();
- getJmsHeaders().setFloat(propertyName, new Float(f));
+ _delegate.setObjectProperty(s, o);
}
- public void setDoubleProperty(String propertyName, double v) throws JMSException
- {
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
- checkWritableProperties();
- getJmsHeaders().setDouble(propertyName, new Double(v));
- }
- public void setStringProperty(String propertyName, String value) throws JMSException
+ public void clearProperties() throws JMSException
{
- checkWritableProperties();
- JMSHeaderAdapter.checkPropertyName(propertyName);
- super.setLongStringProperty(new AMQShortString(propertyName), value);
+ _delegate.clearProperties();
}
- public void setObjectProperty(String propertyName, Object object) throws JMSException
+ public void clearBody() throws JMSException
{
- checkWritableProperties();
- getJmsHeaders().setObject(propertyName, object);
- }
+ clearBodyImpl();
+ _readableMessage = false;
- protected void removeProperty(AMQShortString propertyName) throws JMSException
- {
- getJmsHeaders().remove(propertyName);
}
- protected void removeProperty(String propertyName) throws JMSException
- {
- getJmsHeaders().remove(propertyName);
- }
public void acknowledgeThis() throws JMSException
{
- // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
- // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
- if (_session != null)
- {
- if (_session.getAMQConnection().isClosed())
- {
- throw new javax.jms.IllegalStateException("Connection is already closed");
- }
-
- // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
- // received on the session
- _session.acknowledgeMessage(_deliveryTag, true);
- }
+ _delegate.acknowledgeThis();
}
public void acknowledge() throws JMSException
{
- if (_session != null)
- {
- _session.acknowledge();
- }
+ _delegate.acknowledge();
}
/**
@@ -617,12 +358,9 @@
*/
public abstract String toBodyString() throws JMSException;
- public String getMimeType()
- {
- return getMimeTypeAsShortString().toString();
- }
+ protected abstract String getMimeType();
+
- public abstract AMQShortString getMimeTypeAsShortString();
public String toString()
{
@@ -640,16 +378,23 @@
buf.append("\nJMS Destination: ").append(getJMSDestination());
buf.append("\nJMS Type: ").append(getJMSType());
buf.append("\nJMS MessageID: ").append(getJMSMessageID());
- buf.append("\nAMQ message number: ").append(_deliveryTag);
+ buf.append("\nAMQ message number: ").append(getDeliveryTag());
buf.append("\nProperties:");
- if (getJmsHeaders().isEmpty())
+ final Enumeration propertyNames = getPropertyNames();
+ if (!propertyNames.hasMoreElements())
{
buf.append("<NONE>");
}
else
{
- buf.append('\n').append(getJmsHeaders().getHeaders());
+ buf.append('\n');
+ while(propertyNames.hasMoreElements())
+ {
+ String propertyName = (String) propertyNames.nextElement();
+ buf.append(propertyName).append(":\t").append(getObjectProperty(propertyName));
+ }
+
}
return buf.toString();
@@ -660,14 +405,10 @@
}
}
- public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties)
- {
- getContentHeaderProperties().setHeaders(messageProperties);
- }
- public JMSHeaderAdapter getJmsHeaders()
+ public AMQMessageDelegate getDelegate()
{
- return _headerAdapter;
+ return _delegate;
}
public ByteBuffer getData()
@@ -698,25 +439,6 @@
}
}
- protected void checkWritableProperties() throws MessageNotWriteableException
- {
- if (_readableProperties)
- {
- throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
- }
- _contentHeaderProperties.updated();
- }
-
- public boolean isReadable()
- {
- return _readableMessage;
- }
-
- public boolean isWritable()
- {
- return !_readableMessage;
- }
-
public void reset()
{
if (!_changedData)
@@ -726,7 +448,6 @@
else
{
_data.flip();
- dataChanged();
_changedData = false;
}
}
@@ -748,4 +469,66 @@
_changedData = false;
}
+ /**
+ * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+ * acknowledge()
+ *
+ * @param s the AMQ session that delivered this message
+ */
+ public void setAMQSession(AMQSession s)
+ {
+ _delegate.setAMQSession(s);
+ }
+
+ public AMQSession getAMQSession()
+ {
+ return _delegate.getAMQSession();
+ }
+
+ /**
+ * Get the AMQ message number assigned to this message
+ *
+ * @return the message number
+ */
+ public long getDeliveryTag()
+ {
+ return _delegate.getDeliveryTag();
+ }
+
+ /** Invoked prior to sending the message. Allows the message to be modified if necessary before sending. */
+ public void prepareForSending() throws JMSException
+ {
+ }
+
+
+ public void setContentType(String contentType)
+ {
+ _delegate.setContentType(contentType);
+ }
+
+ public String getContentType()
+ {
+ return _delegate.getContentType();
+ }
+
+ public void setEncoding(String encoding)
+ {
+ _delegate.setEncoding(encoding);
+ }
+
+ public String getEncoding()
+ {
+ return _delegate.getEncoding();
+ }
+
+ public String getReplyToString()
+ {
+ return _delegate.getReplyToString();
+ }
+
+ protected void removeProperty(final String propertyName) throws JMSException
+ {
+ _delegate.removeProperty(propertyName);
+ }
+
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Thu Aug 14 20:40:49 2008
@@ -27,9 +27,9 @@
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpidity.transport.Struct;
-import org.apache.qpidity.transport.MessageProperties;
-import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.DeliveryProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,17 +38,11 @@
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.UUID;
public abstract class AbstractJMSMessageFactory implements MessageFactory
{
private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
- protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, AMQShortString exchange,
- AMQShortString routingKey,
- BasicContentHeaderProperties contentHeader) throws AMQException;
-
protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
AMQShortString exchange, AMQShortString routingKey,
List bodies) throws AMQException
@@ -105,23 +99,28 @@
.remaining());
}
- return createMessage(messageNbr, data, exchange, routingKey,
- (BasicContentHeaderProperties) contentHeader.properties);
+ AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
+ (BasicContentHeaderProperties) contentHeader.properties,
+ exchange, routingKey);
+
+ return createMessage(delegate, data);
}
+ protected abstract AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException;
+
+
protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader,
- AMQShortString exchange, AMQShortString routingKey,
- List bodies, String replyToURL) throws AMQException
+ java.nio.ByteBuffer body) throws AMQException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
- // we optimise the non-fragmented case to avoid copying
- if ((bodies != null))
+
+ if (body != null)
{
- data = ByteBuffer.wrap((java.nio.ByteBuffer) bodies.get(0));
+ data = ByteBuffer.wrap(body);
}
- else // bodies == null
+ else // body == null
{
data = ByteBuffer.allocate(0);
}
@@ -131,40 +130,13 @@
_logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data
.remaining());
}
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
// set the properties of this message
MessageProperties mprop = (MessageProperties) contentHeader[0];
DeliveryProperties devprop = (DeliveryProperties) contentHeader[1];
- props.setContentType(mprop.getContentType());
- props.setCorrelationId(asString(mprop.getCorrelationId()));
- String encoding = mprop.getContentEncoding();
- if (encoding != null && !encoding.equals(""))
- {
- props.setEncoding(encoding);
- }
- if (devprop.hasDeliveryMode())
- {
- props.setDeliveryMode((byte) devprop.getDeliveryMode().getValue());
- }
- props.setExpiration(devprop.getExpiration());
- UUID mid = mprop.getMessageId();
- props.setMessageId(mid == null ? null : "ID:" + mid.toString());
- if (devprop.hasPriority())
- {
- props.setPriority((byte) devprop.getPriority().getValue());
- }
- props.setReplyTo(replyToURL);
- props.setTimestamp(devprop.getTimestamp());
- String type = null;
- Map<String,Object> map = mprop.getApplicationHeaders();
- if (map != null)
- {
- type = (String) map.get(AbstractJMSMessage.JMS_TYPE);
- }
- props.setType(type);
- props.setUserId(asString(mprop.getUserId()));
- props.setHeaders(FiledTableSupport.convertToFieldTable(mprop.getApplicationHeaders()));
- AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props);
+
+ AMQMessageDelegate delegate = new AMQMessageDelegate_0_10(mprop, devprop, messageNbr);
+
+ AbstractJMSMessage message = createMessage(delegate, data);
return message;
}
@@ -192,12 +164,11 @@
}
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader,
- AMQShortString exchange, AMQShortString routingKey, List bodies,
- String replyToURL)
+ java.nio.ByteBuffer body)
throws JMSException, AMQException
{
final AbstractJMSMessage msg =
- create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, replyToURL);
+ create010MessageWithBody(messageNbr, contentHeader, body);
msg.setJMSRedelivered(redelivered);
msg.receivedFromServer();
return msg;
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Thu Aug 14 20:40:49 2008
@@ -33,46 +33,47 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
{
public static final String MIME_TYPE = "application/octet-stream";
- private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
- public JMSBytesMessage()
+
+ public JMSBytesMessage(AMQMessageDelegateFactory delegateFactory)
{
- this(null);
+ this(delegateFactory,null);
+
}
/**
* Construct a bytes message with existing data.
*
+ * @param delegateFactory
* @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- * set to auto expand
*/
- JMSBytesMessage(ByteBuffer data)
+ JMSBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
{
- super(data); // this instanties a content header
+
+ super(delegateFactory, data); // this instanties a content header
}
- JMSBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ JMSBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, exchange, routingKey, data);
+ super(delegate, data);
}
+
public void reset()
{
super.reset();
_readableMessage = true;
}
- public AMQShortString getMimeTypeAsShortString()
+ protected String getMimeType()
{
- return MIME_TYPE_SHORT_STRING;
+ return MIME_TYPE;
}
public long getBodyLength() throws JMSException
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java Thu Aug 14 20:40:49 2008
@@ -25,21 +25,18 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
{
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
- AMQShortString exchange, AMQShortString routingKey,
- BasicContentHeaderProperties contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+ return new JMSBytesMessage(delegate, data);
}
- public AbstractJMSMessage createMessage() throws JMSException
+ public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- return new JMSBytesMessage();
+ return new JMSBytesMessage(delegateFactory);
}
// 0_10 specific
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java Thu Aug 14 20:40:49 2008
@@ -467,7 +467,7 @@
return getPropertyNames();
}
- protected static void checkPropertyName(CharSequence propertyName)
+ protected void checkPropertyName(CharSequence propertyName)
{
if (propertyName == null)
{
@@ -481,7 +481,7 @@
checkIdentiferFormat(propertyName);
}
- protected static void checkIdentiferFormat(CharSequence propertyName)
+ protected void checkIdentiferFormat(CharSequence propertyName)
{
// JMS requirements 3.5.1 Property Names
// Identifiers:
@@ -492,14 +492,14 @@
// '_' and '$'. An identifier part character is any character for which the
// method Character.isJavaIdentifierPart returns true.
// - Identifiers cannot be the names NULL, TRUE, or FALSE.
-// � Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
+// Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
// ESCAPE.
-// � Identifiers are either header field references or property references. The
+// Identifiers are either header field references or property references. The
// type of a property value in a message selector corresponds to the type
// used to set the property. If a property that does not exist in a message is
// referenced, its value is NULL. The semantics of evaluating NULL values
-// in a selector are described in Section 3.8.1.2, �Null Values.�
-// � The conversions that apply to the get methods for properties do not
+// in a selector are described in Section 3.8.1.2, Null Values.
+// The conversions that apply to the get methods for properties do not
// apply when a property is used in a message selector expression. For
// example, suppose you set a property as a string value, as in the
// following:
@@ -507,8 +507,8 @@
// The following expression in a message selector would evaluate to false,
// because a string cannot be used in an arithmetic expression:
// "NumberOfOrders > 1"
-// � Identifiers are case sensitive.
-// � Message header field references are restricted to JMSDeliveryMode,
+// Identifiers are case sensitive.
+// Message header field references are restricted to JMSDeliveryMode,
// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
// null and if so are treated as a NULL value.
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Thu Aug 14 20:40:49 2008
@@ -24,7 +24,6 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.slf4j.Logger;
@@ -44,18 +43,19 @@
private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class);
public static final String MIME_TYPE = "jms/map-message";
- private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
private Map<String, Object> _map = new HashMap<String, Object>();
- public JMSMapMessage() throws JMSException
+ public JMSMapMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- this(null);
+ this(delegateFactory, null);
}
- JMSMapMessage(ByteBuffer data) throws JMSException
+ JMSMapMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException
{
- super(data); // this instantiates a content header
+
+ super(delegateFactory, data); // this instantiates a content header
if(data != null)
{
populateMapFromData();
@@ -63,10 +63,10 @@
}
- JMSMapMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey,
- ByteBuffer data) throws AMQException
+ JMSMapMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, exchange, routingKey, data);
+
+ super(delegate, data);
try
{
populateMapFromData();
@@ -79,14 +79,15 @@
}
+
public String toBodyString() throws JMSException
{
return _map == null ? "" : _map.toString();
}
- public AMQShortString getMimeTypeAsShortString()
+ protected String getMimeType()
{
- return MIME_TYPE_SHORT_STRING;
+ return MIME_TYPE;
}
public ByteBuffer getData()
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java Thu Aug 14 20:40:49 2008
@@ -25,21 +25,18 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSMapMessageFactory extends AbstractJMSMessageFactory
{
- public AbstractJMSMessage createMessage() throws JMSException
+ public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- return new JMSMapMessage();
+ return new JMSMapMessage(delegateFactory);
}
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
- AMQShortString exchange, AMQShortString routingKey,
- BasicContentHeaderProperties contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+ return new JMSMapMessage(delegate, data);
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Thu Aug 14 20:40:49 2008
@@ -37,63 +37,65 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
{
public static final String MIME_TYPE = "application/java-object-stream";
- private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
private static final int DEFAULT_BUFFER_SIZE = 1024;
/**
* Creates empty, writable message for use by producers
+ * @param delegateFactory
*/
- public JMSObjectMessage()
+ public JMSObjectMessage(AMQMessageDelegateFactory delegateFactory)
{
- this(null);
+ this(delegateFactory, null);
}
- private JMSObjectMessage(ByteBuffer data)
+ private JMSObjectMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
{
- super(data);
+ super(delegateFactory, data);
if (data == null)
{
_data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
_data.setAutoExpand(true);
}
- getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
+ setContentType(getMimeType());
}
/**
* Creates read only message for delivery to consumers
*/
- JMSObjectMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey,
- ByteBuffer data) throws AMQException
- {
- super(messageNbr, contentHeader, exchange, routingKey, data);
- }
+
+ JMSObjectMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+ {
+ super(delegate, data);
+ }
+
public void clearBodyImpl() throws JMSException
{
if (_data != null)
{
_data.release();
+ _data = null;
}
- _data = null;
+
}
public String toBodyString() throws JMSException
{
- return toString(_data);
+ return String.valueOf(getObject());
}
- public AMQShortString getMimeTypeAsShortString()
+ public String getMimeType()
{
- return MIME_TYPE_SHORT_STRING;
+ return MIME_TYPE;
}
public void setObject(Serializable serializable) throws JMSException
@@ -172,26 +174,4 @@
catch (IOException ignore)
{ }
}
-
- private static String toString(ByteBuffer data)
- {
- if (data == null)
- {
- return null;
- }
-
- int pos = data.position();
- try
- {
- return data.getString(Charset.forName("UTF8").newDecoder());
- }
- catch (CharacterCodingException e)
- {
- return null;
- }
- finally
- {
- data.position(pos);
- }
- }
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java Thu Aug 14 20:40:49 2008
@@ -25,20 +25,17 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
{
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
- AMQShortString exchange, AMQShortString routingKey,
- BasicContentHeaderProperties contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+ return new JMSObjectMessage(delegate, data);
}
- public AbstractJMSMessage createMessage() throws JMSException
+ public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- return new JMSObjectMessage();
+ return new JMSObjectMessage(delegateFactory);
}
}