You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC
svn commit: r447994 [12/46] - in /incubator/qpid/trunk/qpid: ./ cpp/
cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/
cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/
cpp/common/concurrent/ cpp/common/concur...
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.log4j.Logger;
+
+public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<ConnectionCloseOkBody>
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionCloseOkMethodHandler.class);
+
+ private static ConnectionCloseOkMethodHandler _instance = new ConnectionCloseOkMethodHandler();
+
+ public static ConnectionCloseOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ConnectionCloseOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
+ {
+ //todo should this not do more than just log the method?
+ _logger.info("Received Connection-close-ok");
+
+ try
+ {
+ stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ protocolSession.closeSession();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error closing protocol session: " + e, e);
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody>
+{
+ private static ConnectionOpenMethodHandler _instance = new ConnectionOpenMethodHandler();
+
+ public static ConnectionOpenMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ConnectionOpenMethodHandler()
+ {
+ }
+
+ private static String generateClientID()
+ {
+ return Long.toString(System.currentTimeMillis());
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
+ {
+ ConnectionOpenBody body = evt.getMethod();
+ String contextKey = body.virtualHost;
+
+ //todo //FIXME The virtual host must be validated by the server for the connection to open-ok
+ // See Spec (0.8.2). Section 3.1.2 Virtual Hosts
+ if (contextKey == null)
+ {
+ contextKey = generateClientID();
+ }
+ protocolSession.setContextKey(contextKey);
+ AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, contextKey);
+ stateManager.changeState(AMQState.CONNECTION_OPEN);
+ protocolSession.writeFrame(response);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,115 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.HeartbeatConfig;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.security.auth.AuthenticationManager;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.log4j.Logger;
+
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslException;
+
+public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener<ConnectionSecureOkBody>
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionSecureOkMethodHandler.class);
+
+ private static ConnectionSecureOkMethodHandler _instance = new ConnectionSecureOkMethodHandler();
+
+ public static ConnectionSecureOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ConnectionSecureOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException
+ {
+ ConnectionSecureOkBody body = evt.getMethod();
+
+ AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();
+ SaslServer ss = protocolSession.getSaslServer();
+ if (ss == null)
+ {
+ throw new AMQException("No SASL context set up in session");
+ }
+
+ AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+ switch (authResult.status)
+ {
+ case ERROR:
+ // Can't do this as we violate protocol. Need to send Close
+ // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
+ _logger.info("Authentication failed");
+ stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ AMQFrame close = ConnectionCloseBody.createAMQFrame(0, AMQConstant.NOT_ALLOWED.getCode(),
+ AMQConstant.NOT_ALLOWED.getName(),
+ ConnectionCloseBody.CLASS_ID,
+ ConnectionCloseBody.METHOD_ID);
+ protocolSession.writeFrame(close);
+ disposeSaslServer(protocolSession);
+ break;
+ case SUCCESS:
+ _logger.info("Connected as: " + ss.getAuthorizationID());
+ stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+ AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE,
+ ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
+ HeartbeatConfig.getInstance().getDelay());
+ protocolSession.writeFrame(tune);
+ disposeSaslServer(protocolSession);
+ break;
+ case CONTINUE:
+ stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
+ AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge);
+ protocolSession.writeFrame(challenge);
+ }
+ }
+
+ private void disposeSaslServer(AMQProtocolSession ps)
+ {
+ SaslServer ss = ps.getSaslServer();
+ if (ss != null)
+ {
+ ps.setSaslServer(null);
+ try
+ {
+ ss.dispose();
+ }
+ catch (SaslException e)
+ {
+ _logger.error("Error disposing of Sasl server: " + e);
+ }
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,127 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.HeartbeatConfig;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.AuthenticationManager;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+
+public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<ConnectionStartOkBody>
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionStartOkMethodHandler.class);
+
+ private static ConnectionStartOkMethodHandler _instance = new ConnectionStartOkMethodHandler();
+
+ private static final int DEFAULT_FRAME_SIZE = 65536;
+
+ public static StateAwareMethodListener<ConnectionStartOkBody> getInstance()
+ {
+ return _instance;
+ }
+
+ private ConnectionStartOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
+ {
+ final ConnectionStartOkBody body = evt.getMethod();
+ _logger.info("SASL Mechanism selected: " + body.mechanism);
+ _logger.info("Locale selected: " + body.locale);
+
+ AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();
+
+ SaslServer ss = null;
+ try
+ {
+ ss = authMgr.createSaslServer(body.mechanism, protocolSession.getLocalFQDN());
+ protocolSession.setSaslServer(ss);
+
+ AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+
+ switch (authResult.status)
+ {
+ case ERROR:
+ throw new AMQException("Authentication failed");
+ case SUCCESS:
+ _logger.info("Connected as: " + ss.getAuthorizationID());
+ stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+ AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(),
+ HeartbeatConfig.getInstance().getDelay());
+ protocolSession.writeFrame(tune);
+ break;
+ case CONTINUE:
+ stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
+ AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge);
+ protocolSession.writeFrame(challenge);
+ }
+ }
+ catch (SaslException e)
+ {
+ disposeSaslServer(protocolSession);
+ throw new AMQException("SASL error: " + e, e);
+ }
+ }
+
+ private void disposeSaslServer(AMQProtocolSession ps)
+ {
+ SaslServer ss = ps.getSaslServer();
+ if (ss != null)
+ {
+ ps.setSaslServer(null);
+ try
+ {
+ ss.dispose();
+ }
+ catch (SaslException e)
+ {
+ _logger.error("Error disposing of Sasl server: " + e);
+ }
+ }
+ }
+
+ static int getConfiguredFrameSize()
+ {
+ final Configuration config = ApplicationRegistry.getInstance().getConfiguration();
+ final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
+ _logger.info("Framesize set to " + framesize);
+ return framesize;
+ }
+}
+
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<ConnectionTuneOkBody>
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionTuneOkMethodHandler.class);
+
+ private static ConnectionTuneOkMethodHandler _instance = new ConnectionTuneOkMethodHandler();
+
+ public static ConnectionTuneOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
+ {
+ ConnectionTuneOkBody body = evt.getMethod();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(body);
+ }
+ stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+ protocolSession.initHeartbeats(body.heartbeat);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
+{
+ private static final Logger _logger = Logger.getLogger(ExchangeDeclareHandler.class);
+
+ private static final ExchangeDeclareHandler _instance = new ExchangeDeclareHandler();
+
+ public static ExchangeDeclareHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private final ExchangeFactory exchangeFactory;
+
+ private ExchangeDeclareHandler()
+ {
+ exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
+ {
+ final ExchangeDeclareBody body = evt.getMethod();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Request to declare exchange of type " + body.type + " with name " + body.exchange);
+ }
+ synchronized(exchangeRegistry)
+ {
+ Exchange exchange = exchangeRegistry.getExchange(body.exchange);
+
+ if (exchange == null)
+ {
+ exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable,
+ body.passive, body.ticket);
+ exchangeRegistry.registerExchange(exchange);
+ }
+ }
+ if(!body.nowait)
+ {
+ AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId());
+ protocolSession.writeFrame(response);
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ExchangeDeleteBody;
+import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.server.exchange.ExchangeInUseException;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody>
+{
+ private static final ExchangeDeleteHandler _instance = new ExchangeDeleteHandler();
+
+ public static ExchangeDeleteHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ExchangeDeleteHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
+ {
+ ExchangeDeleteBody body = evt.getMethod();
+ try
+ {
+ exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
+ AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId());
+ protocolSession.writeFrame(response);
+ }
+ catch (ExchangeInUseException e)
+ {
+ // TODO: sort out consistent channel close mechanism that does all clean up etc.
+ }
+
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/OnCurrentThreadExecutor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/OnCurrentThreadExecutor.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/OnCurrentThreadExecutor.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/OnCurrentThreadExecutor.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,31 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import java.util.concurrent.Executor;
+
+/**
+ * An executor that executes the task on the current thread.
+ */
+public class OnCurrentThreadExecutor implements Executor
+{
+ public void execute(Runnable command)
+ {
+ command.run();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/OnCurrentThreadExecutor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueBindHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueBindHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueBindHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,94 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
+{
+ private static final Logger _log = Logger.getLogger(QueueBindHandler.class);
+
+ private static final QueueBindHandler _instance = new QueueBindHandler();
+
+ public static QueueBindHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private QueueBindHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ {
+ final QueueBindBody body = evt.getMethod();
+ final AMQQueue queue;
+ if (body.queue == null)
+ {
+ queue = protocolSession.getChannel(evt.getChannelId()).getDefaultQueue();
+ if (queue == null)
+ {
+ throw new AMQException("No default queue defined on channel and queue was null");
+ }
+ if (body.routingKey == null)
+ {
+ body.routingKey = queue.getName();
+ }
+ }
+ else
+ {
+ queue = queueRegistry.getQueue(body.queue);
+ }
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Queue " + body.queue + " does not exist.");
+ }
+ final Exchange exch = exchangeRegistry.getExchange(body.exchange);
+ if (exch == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Exchange " + body.exchange + " does not exist.");
+ }
+ exch.registerQueue(body.routingKey, queue, body.arguments);
+ queue.bind(body.routingKey, exch);
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + body.routingKey);
+ }
+ if (!body.nowait)
+ {
+ final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId());
+ protocolSession.writeFrame(response);
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueBindHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueDeclareHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueDeclareHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,124 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configured;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import java.text.MessageFormat;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
+{
+ private static final Logger _log = Logger.getLogger(QueueDeclareHandler.class);
+
+ private static final QueueDeclareHandler _instance = new QueueDeclareHandler();
+
+ public static QueueDeclareHandler getInstance()
+ {
+ return _instance;
+ }
+
+ @Configured(path = "queue.auto_register", defaultValue = "false")
+ public boolean autoRegister;
+
+ private final AtomicInteger _counter = new AtomicInteger();
+
+ private final MessageStore _store;
+
+ protected QueueDeclareHandler()
+ {
+ Configurator.configure(this);
+ _store = ApplicationRegistry.getInstance().getMessageStore();
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ {
+ QueueDeclareBody body = evt.getMethod();
+
+ // if we aren't given a queue name, we create one which we return to the client
+ if (body.queue == null)
+ {
+ body.queue = createName();
+ }
+ //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
+
+ synchronized (queueRegistry)
+ {
+ AMQQueue queue;
+ if ((queue = queueRegistry.getQueue(body.queue)) == null)
+ {
+ queue = createQueue(body, queueRegistry, protocolSession);
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ _store.createQueue(queue);
+ }
+ queueRegistry.registerQueue(queue);
+ if (autoRegister)
+ {
+ Exchange defaultExchange = exchangeRegistry.getExchange("amq.direct");
+ defaultExchange.registerQueue(body.queue, queue, null);
+ queue.bind(body.queue, defaultExchange);
+ _log.info("Queue " + body.queue + " bound to default exchange");
+ }
+ }
+ //set this as the default queue on the channel:
+ protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue);
+ }
+ if (!body.nowait)
+ {
+ AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(), body.queue, 0L, 0L);
+ _log.info("Queue " + body.queue + " declared successfully");
+ protocolSession.writeFrame(response);
+ }
+ }
+
+ protected String createName()
+ {
+ return "tmp_" + pad(_counter.incrementAndGet());
+ }
+
+ protected static String pad(int value)
+ {
+ return MessageFormat.format("{0,number,0000000000000}", value);
+ }
+
+ protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session)
+ throws AMQException
+ {
+ String owner = body.exclusive ? session.getContextKey() : null;
+ return new AMQQueue(body.queue, body.durable, owner, body.autoDelete, registry);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueDeclareHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueDeleteHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueDeleteHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,84 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.AMQException;
+
+public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
+{
+ private static final QueueDeleteHandler _instance = new QueueDeleteHandler();
+
+ public static QueueDeleteHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private final boolean _failIfNotFound;
+ private final MessageStore _store;
+
+ public QueueDeleteHandler()
+ {
+ this(true);
+ }
+
+ public QueueDeleteHandler(boolean failIfNotFound)
+ {
+ _failIfNotFound = failIfNotFound;
+ _store = ApplicationRegistry.getInstance().getMessageStore();
+
+ }
+
+ public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ {
+ QueueDeleteBody body = evt.getMethod();
+ AMQQueue queue;
+ if(body.queue == null)
+ {
+ queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+ }
+ else
+ {
+ queue = queues.getQueue(body.queue);
+ }
+
+ if(queue == null)
+ {
+ if(_failIfNotFound)
+ {
+ throw body.getChannelException(404, "Queue " + body.queue + " does not exist.");
+ }
+ }
+ else
+ {
+ int purged = queue.delete(body.ifUnused, body.ifEmpty);
+ _store.removeQueue(queue.getName());
+ session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(), purged));
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/QueueDeleteHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.TxCommitBody;
+import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
+{
+ private static TxCommitHandler _instance = new TxCommitHandler();
+
+ public static TxCommitHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private TxCommitHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<TxCommitBody> evt) throws AMQException
+ {
+
+ try{
+ protocolSession.getChannel(evt.getChannelId()).commit();
+ protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId()));
+ }catch(AMQException e){
+ throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxRollbackHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxRollbackHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxRollbackHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.TxRollbackBody;
+import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.AMQChannel;
+
+public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBody>
+{
+ private static TxRollbackHandler _instance = new TxRollbackHandler();
+
+ public static TxRollbackHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private TxRollbackHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<TxRollbackBody> evt) throws AMQException
+ {
+ try{
+ AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ channel.rollback();
+ protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId()));
+ //Now resend all the unacknowledged messages back to the original subscribers.
+ //(Must be done after the TxnRollback-ok response).
+ channel.resend(protocolSession);
+ }catch(AMQException e){
+ throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxRollbackHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxSelectHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxSelectHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxSelectHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
+{
+ private static TxSelectHandler _instance = new TxSelectHandler();
+
+ public static TxSelectHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private TxSelectHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<TxSelectBody> evt) throws AMQException
+ {
+ protocolSession.getChannel(evt.getChannelId()).setTransactional(true);
+ protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId()));
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/TxSelectHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/jms/JmsConsumer.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/jms/JmsConsumer.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/jms/JmsConsumer.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,107 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.jms;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+
+public class JmsConsumer
+{
+ private int _prefetchValue;
+
+ private PrefetchUnits _prefetchUnits;
+
+ private boolean _noLocal;
+
+ private boolean _autoAck;
+
+ private boolean _exclusive;
+
+ private AMQProtocolSession _protocolSession;
+
+ public enum PrefetchUnits
+ {
+ OCTETS,
+ MESSAGES
+ }
+
+ public int getPrefetchValue()
+ {
+ return _prefetchValue;
+ }
+
+ public void setPrefetchValue(int prefetchValue)
+ {
+ _prefetchValue = prefetchValue;
+ }
+
+ public PrefetchUnits getPrefetchUnits()
+ {
+ return _prefetchUnits;
+ }
+
+ public void setPrefetchUnits(PrefetchUnits prefetchUnits)
+ {
+ _prefetchUnits = prefetchUnits;
+ }
+
+ public boolean isNoLocal()
+ {
+ return _noLocal;
+ }
+
+ public void setNoLocal(boolean noLocal)
+ {
+ _noLocal = noLocal;
+ }
+
+ public boolean isAutoAck()
+ {
+ return _autoAck;
+ }
+
+ public void setAutoAck(boolean autoAck)
+ {
+ _autoAck = autoAck;
+ }
+
+ public boolean isExclusive()
+ {
+ return _exclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ _exclusive = exclusive;
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void setProtocolSession(AMQProtocolSession protocolSession)
+ {
+ _protocolSession = protocolSession;
+ }
+
+ public void deliverMessage() throws AMQException
+ {
+
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/jms/JmsConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/AMQManagedObject.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/AMQManagedObject.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/AMQManagedObject.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/AMQManagedObject.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.management;
+
+import javax.management.ListenerNotFoundException;
+import javax.management.NotificationBroadcaster;
+import javax.management.NotificationBroadcasterSupport;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+
+/**
+ * This class provides additinal feature of Notification Broadcaster to the
+ * DefaultManagedObject.
+ * @author Bhupendra Bhardwaj
+ * @version 0.1
+ */
+public abstract class AMQManagedObject extends DefaultManagedObject
+ implements NotificationBroadcaster
+{
+ /**
+ * broadcaster support class
+ */
+ protected NotificationBroadcasterSupport _broadcaster = new NotificationBroadcasterSupport();
+
+ /**
+ * sequence number for notifications
+ */
+ protected long _notificationSequenceNumber = 0;
+
+ protected AMQManagedObject(Class<?> managementInterface, String typeName)
+ {
+ super(managementInterface, typeName);
+ }
+
+
+ // notification broadcaster implementation
+
+ public void addNotificationListener(NotificationListener listener,
+ NotificationFilter filter,
+ Object handback)
+ {
+ _broadcaster.addNotificationListener(listener, filter, handback);
+ }
+
+ public void removeNotificationListener(NotificationListener listener)
+ throws ListenerNotFoundException
+ {
+ _broadcaster.removeNotificationListener(listener);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/AMQManagedObject.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/DefaultManagedObject.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/DefaultManagedObject.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/DefaultManagedObject.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/DefaultManagedObject.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,126 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.management;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import javax.management.ObjectName;
+import javax.management.MalformedObjectNameException;
+
+/**
+ * Provides implementation of the boilerplate ManagedObject interface. Most managed objects should find it useful
+ * to extend this class rather than implementing ManagedObject from scratch.
+ *
+ */
+public abstract class DefaultManagedObject implements ManagedObject
+{
+ private Class<?> _managementInterface;
+
+ private String _typeName;
+
+ protected DefaultManagedObject(Class<?> managementInterface, String typeName)
+ {
+ _managementInterface = managementInterface;
+ _typeName = typeName;
+ }
+
+ public String getType()
+ {
+ return _typeName;
+ }
+
+ public Class<?> getManagementInterface()
+ {
+ return _managementInterface;
+ }
+
+ public void register() throws AMQException
+ {
+ try
+ {
+ ApplicationRegistry.getInstance().getManagedObjectRegistry().registerObject(this);
+ }
+ catch (Exception e)
+ {
+ throw new AMQException("Error registering managed object " + this + ": " + e, e);
+ }
+ }
+
+ public void unregister() throws AMQException
+ {
+ try
+ {
+ ApplicationRegistry.getInstance().getManagedObjectRegistry().unregisterObject(this);
+ }
+ catch (Exception e)
+ {
+ throw new AMQException("Error unregistering managed object: " + this + ": " + e, e);
+ }
+ }
+
+ public String toString()
+ {
+ return getObjectInstanceName() + "[" + getType() + "]";
+ }
+
+ /**
+ * Created the ObjectName as per the JMX Specs
+ * @return ObjectName
+ * @throws MalformedObjectNameException
+ */
+ public ObjectName getObjectName()
+ throws MalformedObjectNameException
+ {
+ String name = jmxEncode(new StringBuffer(getObjectInstanceName()), 0).toString();
+ StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
+ objectName.append(":type=").append(getType());
+ objectName.append(",name=").append(name);
+
+ return new ObjectName(objectName.toString());
+ }
+
+ private static StringBuffer jmxEncode(StringBuffer jmxName, int attrPos)
+ {
+ for (int i = attrPos; i < jmxName.length(); i++)
+ {
+ if (jmxName.charAt(i) == ',')
+ {
+ jmxName.setCharAt(i, ';');
+ }
+ else if (jmxName.charAt(i) == ':')
+ {
+ jmxName.setCharAt(i, '-');
+ }
+ else if (jmxName.charAt(i) == '?' ||
+ jmxName.charAt(i) == '*' ||
+ jmxName.charAt(i) == '\\')
+ {
+ jmxName.insert(i, '\\');
+ i++;
+ }
+ else if (jmxName.charAt(i) == '\n')
+ {
+ jmxName.insert(i, '\\');
+ i++;
+ jmxName.setCharAt(i, 'n');
+ }
+ }
+ return jmxName;
+ }
+}
\ No newline at end of file
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/DefaultManagedObject.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/JMXManagedObjectRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/JMXManagedObjectRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/JMXManagedObjectRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,66 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.management;
+
+import org.apache.log4j.Logger;
+
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.NotCompliantMBeanException;
+import javax.management.StandardMBean;
+import java.lang.management.ManagementFactory;
+
+public class JMXManagedObjectRegistry implements ManagedObjectRegistry
+{
+ private static final Logger _log = Logger.getLogger(JMXManagedObjectRegistry.class);
+
+ private final MBeanServer _mbeanServer;
+
+ public JMXManagedObjectRegistry()
+ {
+ _log.info("Initialising managed object registry using platform MBean server");
+ // we use the platform MBean server currently but this must be changed or at least be configuurable
+ _mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ }
+
+ public void registerObject(ManagedObject managedObject) throws JMException
+ {
+ try
+ {
+ _mbeanServer.registerMBean(managedObject, managedObject.getObjectName());
+ }
+ catch(NotCompliantMBeanException ex)
+ {
+ // The following is a hack due to a silly change to StandardMBean in JDK 1.6
+ // They have added in generics to get compile time safety which reduces the
+ // flexibility
+ Object o = managedObject;
+ Class<Object> clazz = (Class<Object>) managedObject.getManagementInterface();
+ StandardMBean mbean = new StandardMBean(o, clazz);
+
+ _mbeanServer.registerMBean(mbean, managedObject.getObjectName());
+ }
+
+ }
+
+ public void unregisterObject(ManagedObject managedObject) throws JMException
+ {
+ _mbeanServer.unregisterMBean(managedObject.getObjectName());
+ }
+
+}
\ No newline at end of file
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/Managable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/Managable.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/Managable.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/Managable.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,31 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.management;
+
+/**
+ * Any object that can return a related MBean should implement this interface.
+ *
+ * This enables other classes to get the managed object, which in turn is useful when
+ * constructing relationships between managed objects without having to maintain
+ * separate data structures containing MBeans.
+ *
+ */
+public interface Managable
+{
+ ManagedObject getManagedObject();
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/Managable.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedBroker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedBroker.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedBroker.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedBroker.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,78 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid.server.management;
+
+import javax.management.JMException;
+import java.io.IOException;
+
+/**
+ * The ManagedBroker is the management interface to expose management
+ * features of the Broker.
+ *
+ * @author Bhupendra Bhardwaj
+ * @version 0.1
+ */
+public interface ManagedBroker
+{
+ static final String TYPE = "BrokerManager";
+
+ /**
+ * Creates a new Exchange.
+ * @param name
+ * @param type
+ * @param durable
+ * @param passive
+ * @throws IOException
+ * @throws JMException
+ */
+ void createNewExchange(String name, String type, boolean durable, boolean passive)
+ throws IOException, JMException;
+
+ /**
+ * unregisters all the channels, queuebindings etc and unregisters
+ * this exchange from managed objects.
+ * @param exchange
+ * @throws IOException
+ * @throws JMException
+ */
+ void unregisterExchange(String exchange)
+ throws IOException, JMException;
+
+ /**
+ * Create a new Queue on the Broker server
+ * @param queueName
+ * @param durable
+ * @param owner
+ * @param autoDelete
+ * @throws IOException
+ * @throws JMException
+ */
+ void createQueue(String queueName, boolean durable, String owner, boolean autoDelete)
+ throws IOException, JMException;
+
+ /**
+ * Unregisters the Queue bindings, removes the subscriptions and unregisters
+ * from the managed objects.
+ * @param queueName
+ * @throws IOException
+ * @throws JMException
+ */
+ void deleteQueue(String queueName)
+ throws IOException, JMException;
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedBroker.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedObject.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedObject.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedObject.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedObject.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.management;
+
+import org.apache.qpid.AMQException;
+
+import javax.management.ObjectName;
+import javax.management.MalformedObjectNameException;
+
+/**
+ * This should be implemented by all Managable objects.
+ */
+public interface ManagedObject
+{
+ static final String DOMAIN = "org.apache.qpid";
+
+ /**
+ * @return the name that uniquely identifies this object instance. It must be
+ * unique only among objects of this type at this level in the hierarchy so
+ * the uniqueness should not be too difficult to ensure.
+ */
+ String getObjectInstanceName();
+
+ String getType();
+
+ Class<?> getManagementInterface();
+
+ void register() throws AMQException;
+
+ void unregister() throws AMQException;
+
+ /**
+ * Returns the ObjectName required for the mbeanserver registration.
+ * @return ObjectName
+ * @throws MalformedObjectNameException
+ */
+ ObjectName getObjectName() throws MalformedObjectNameException;
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedObject.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedObjectRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedObjectRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedObjectRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.management;
+
+import javax.management.JMException;
+
+/**
+ * Handles the registration (and unregistration and so on) of managed objects.
+ *
+ * Managed objects are responsible for exposting attributes, operations and notifications. They will expose
+ * these outside the JVM therefore it is important not to use implementation objects directly as managed objects.
+ * Instead, creating inner classes and exposing those is an effective way of exposing internal state in a
+ * controlled way.
+ *
+ * Although we do not explictly use them while targetting Java 5, the enhanced MXBean approach in Java 6 will
+ * be the obvious choice for managed objects.
+ *
+ */
+public interface ManagedObjectRegistry
+{
+ void registerObject(ManagedObject managedObject) throws JMException;
+
+ void unregisterObject(ManagedObject managedObject) throws JMException;
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagedObjectRegistry.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagementConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagementConfiguration.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagementConfiguration.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagementConfiguration.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,27 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.management;
+
+import org.apache.qpid.configuration.Configured;
+
+public class ManagementConfiguration
+{
+ @Configured(path = "management.enabled",
+ defaultValue = "true")
+ public boolean enabled;
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/ManagementConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/NoopManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/NoopManagedObjectRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/NoopManagedObjectRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/NoopManagedObjectRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.management;
+
+import org.apache.log4j.Logger;
+
+import javax.management.JMException;
+
+/**
+ * This managed object registry does not actually register MBeans. This can be used in tests when management is
+ * not required or when management has been disabled.
+ *
+ */
+public class NoopManagedObjectRegistry implements ManagedObjectRegistry
+{
+ private static final Logger _log = Logger.getLogger(NoopManagedObjectRegistry.class);
+
+ public NoopManagedObjectRegistry()
+ {
+ _log.info("Management is disabled");
+ }
+
+ public void registerObject(ManagedObject managedObject) throws JMException
+ {
+ }
+
+ public void unregisterObject(ManagedObject managedObject) throws JMException
+ {
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/management/NoopManagedObjectRegistry.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/protocol/AMQMethodEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/protocol/AMQMethodEvent.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/protocol/AMQMethodEvent.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/protocol/AMQMethodEvent.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+/**
+ * An event that is passed to AMQMethodListeners describing a particular method.
+ * It supplies the:
+ * <ul><li>channel id</li>
+ * <li>protocol method</li>
+ * to listeners. This means that listeners do not need to be stateful.
+ *
+ * In the StateAwareMethodListener, other useful objects such as the protocol session
+ * are made available.
+ *
+ */
+public class AMQMethodEvent<M extends AMQMethodBody>
+{
+ private final M _method;
+
+ private final int _channelId;
+
+ public AMQMethodEvent(int channelId, M method)
+ {
+ _channelId = channelId;
+ _method = method;
+ }
+
+ public M getMethod()
+ {
+ return _method;
+ }
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("Method event: ");
+ buf.append("\nChannel id: ").append(_channelId);
+ buf.append("\nMethod: ").append(_method);
+ return buf.toString();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/protocol/AMQMethodEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/protocol/AMQMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/protocol/AMQMethodListener.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/protocol/AMQMethodListener.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/protocol/AMQMethodListener.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
+
+/**
+ * Interface that allows classes to register for interest in protocol method frames.
+ *
+ */
+public interface AMQMethodListener
+{
+ /**
+ * Invoked when a method frame has been received
+ * @param evt the event that contains the method and channel
+ * @param protocolSession the protocol session associated with the event
+ * @return true if the handler has processed the method frame, false otherwise. Note
+ * that this does not prohibit the method event being delivered to subsequent listeners
+ * but can be used to determine if nobody has dealt with an incoming method frame.
+ * @throws AMQException if an error has occurred. This exception will be delivered
+ * to all registered listeners using the error() method (see below) allowing them to
+ * perform cleanup if necessary.
+ */
+ <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
+ AMQProtocolSession protocolSession,
+ QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry) throws AMQException;
+
+ /**
+ * Callback when an error has occurred. Allows listeners to clean up.
+ * @param e
+ */
+ void error(AMQException e);
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/protocol/AMQMethodListener.java
------------------------------------------------------------------------------
svn:eol-style = native