You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC

svn commit: r686136 [4/17] - in /incubator/qpid/branches/qpid.0-10/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/bin/ broker/etc/ broker/...

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -1,101 +1,187 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- *
- *
- */
-
-package org.apache.qpid.server.handler;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicGetBody;
-import org.apache.qpid.framing.BasicGetEmptyBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.security.access.Permission;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
-{
-    private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
-
-    private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler();
-
-    public static BasicGetMethodHandler getInstance()
-    {
-        return _instance;
-    }
-
-    private BasicGetMethodHandler()
-    {
-    }
-
-    public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException
-    {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-
-
-        VirtualHost vHost = session.getVirtualHost();
-
-        AMQChannel channel = session.getChannel(channelId);
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId);
-        }
-        else
-        {
-            AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
-
-            if (queue == null)
-            {
-                _log.info("No queue for '" + body.getQueue() + "'");
-                if(body.getQueue()!=null)
-                {
-                    throw body.getConnectionException(AMQConstant.NOT_FOUND,
-                                                      "No such queue, '" + body.getQueue()+ "'");
-                }
-                else
-                {
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "No queue name provided, no default queue defined.");
-                }
-            }
-            else
-            {
-
-                //Perform ACLs
-                vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue);
-
-                if (!queue.performGet(session, channel, !body.getNoAck()))
-                {
-                    MethodRegistry methodRegistry = session.getMethodRegistry();
-                    // TODO - set clusterId
-                    BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
-
-
-                    session.writeFrame(responseBody.generateFrame(channelId));
-                }
-            }
-        }
-    }
-}
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicGetBody;
+import org.apache.qpid.framing.BasicGetEmptyBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.RecordDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.security.access.Permission;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
+{
+    private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
+
+    private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler();
+
+    public static BasicGetMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private BasicGetMethodHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException
+    {
+        AMQProtocolSession session = stateManager.getProtocolSession();
+
+
+        VirtualHost vHost = session.getVirtualHost();
+
+        AMQChannel channel = session.getChannel(channelId);
+        if (channel == null)
+        {
+            throw body.getChannelNotFoundException(channelId);
+        }
+        else
+        {
+            AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
+            if (queue == null)
+            {
+                _log.info("No queue for '" + body.getQueue() + "'");
+                if(body.getQueue()!=null)
+                {
+                    throw body.getConnectionException(AMQConstant.NOT_FOUND,
+                                                      "No such queue, '" + body.getQueue()+ "'");
+                }
+                else
+                {
+                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+                                                      "No queue name provided, no default queue defined.");
+                }
+            }
+            else
+            {
+
+                //Perform ACLs
+                vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue);
+
+                if (!performGet(queue,session, channel, !body.getNoAck()))
+                {
+                    MethodRegistry methodRegistry = session.getMethodRegistry();
+                    // TODO - set clusterId
+                    BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
+
+
+                    session.writeFrame(responseBody.generateFrame(channelId));
+                }
+            }
+        }
+    }
+
+    public static boolean performGet(final AMQQueue queue,
+                                     final AMQProtocolSession session,
+                                     final AMQChannel channel,
+                                     final boolean acks)
+            throws AMQException
+    {
+
+        final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
+
+        final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
+        {
+
+            int _msg;
+
+            public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+            throws AMQException
+            {
+                singleMessageCredit.useCreditForMessage(entry.getMessage());
+                session.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
+                                                                        deliveryTag, queue.getMessageCount());
+
+            }
+        };
+        final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
+        {
+
+            public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+            {
+                channel.addUnacknowledgedMessage(entry, deliveryTag, null);
+            }
+        };
+
+        Subscription sub;
+        if(acks)
+        {
+            sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+        }
+        else
+        {
+            sub = new GetNoAckSubscription(channel,
+                                                 session,
+                                                 null,
+                                                 null,
+                                                 false,
+                                                 singleMessageCredit,
+                                                 getDeliveryMethod,
+                                                 getRecordMethod);
+        }
+
+        queue.registerSubscription(sub,false);
+        queue.flushSubscription(sub);
+        queue.unregisterSubscription(sub);
+        return(!singleMessageCredit.hasCredit());
+
+
+    }
+
+    public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
+    {
+        public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+                               AMQShortString consumerTag, FieldTable filters,
+                               boolean noLocal, FlowCreditManager creditManager,
+                                   ClientDeliveryMethod deliveryMethod,
+                                   RecordDeliveryMethod recordMethod)
+            throws AMQException
+        {
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+        }
+
+        public boolean wouldSuspend(QueueEntry msg)
+        {
+            return !getCreditManager().useCreditForMessage(msg.getMessage());
+        }
+
+    }
+}

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
            ('svn:eol-style' removed)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -91,7 +91,7 @@
 
             MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
             info.setExchange(exchange);
-            channel.setPublishFrame(info, session, e);
+            channel.setPublishFrame(info, e);
         }
     }
 

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java Thu Aug 14 20:40:49 2008
@@ -22,10 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -49,8 +47,8 @@
             throw body.getChannelNotFoundException(channelId);
         }
 
-        channel.setPrefetchCount(body.getPrefetchCount());
-        channel.setPrefetchSize(body.getPrefetchSize());
+        channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount());
+
 
         MethodRegistry methodRegistry = session.getMethodRegistry();
         AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -1,75 +1,54 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicRecoverSyncBody;
-import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
-import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.AMQException;
-
-public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<BasicRecoverSyncBody>
-{
-    private static final Logger _logger = Logger.getLogger(BasicRecoverSyncMethodHandler.class);
-
-    private static final BasicRecoverSyncMethodHandler _instance = new BasicRecoverSyncMethodHandler();
-
-    public static BasicRecoverSyncMethodHandler getInstance()
-    {
-        return _instance;
-    }
-
-    public void methodReceived(AMQStateManager stateManager, BasicRecoverSyncBody body, int channelId) throws AMQException
-    {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-
-        _logger.debug("Recover received on protocol session " + session + " and channel " + channelId);
-        AMQChannel channel = session.getChannel(channelId);
-
-
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId);
-        }
-
-        channel.resend(body.getRequeue());
-
-        // Qpid 0-8 hacks a synchronous -ok onto recover.
-        // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
-        if(session.getProtocolVersion().equals(ProtocolVersion.v0_9))
-        {
-            MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
-            AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
-            session.writeFrame(recoverOk.generateFrame(channelId));
-
-        }
-
-    }
-}
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.BasicRecoverSyncBody;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.AMQException;
+
+public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<BasicRecoverSyncBody>
+{
+    private static final Logger _logger = Logger.getLogger(BasicRecoverSyncMethodHandler.class);
+
+    private static final BasicRecoverSyncMethodHandler _instance = new BasicRecoverSyncMethodHandler();
+
+    public static BasicRecoverSyncMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    public void methodReceived(AMQStateManager stateManager, BasicRecoverSyncBody body, int channelId) throws AMQException
+    {
+        AMQProtocolSession session = stateManager.getProtocolSession();
+
+        _logger.debug("Recover received on protocol session " + session + " and channel " + channelId);
+        AMQChannel channel = session.getChannel(channelId);
+
+
+        if (channel == null)
+        {
+            throw body.getChannelNotFoundException(channelId);
+        }
+
+        channel.resend(body.getRequeue());
+
+        // Qpid 0-8 hacks a synchronous -ok onto recover.
+        // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
+        if(session.getProtocolVersion().equals(ProtocolVersion.v0_9))
+        {
+            MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
+            AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
+            session.writeFrame(recoverOk.generateFrame(channelId));
+
+        }
+
+    }
+}

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
            ('svn:eol-style' removed)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -22,9 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -49,16 +48,6 @@
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
 
-
-
-//        if (_logger.isDebugEnabled())
-//        {
-//            _logger.debug("Rejecting:" + evt.getMethod().deliveryTag +
-//                          ": Requeue:" + evt.getMethod().requeue +
-////                              ": Resend:" + evt.getMethod().resend +
-//                          " on channel:" + channelId);
-//        }
-
         AMQChannel channel = session.getChannel(channelId);
 
         if (channel == null)
@@ -76,7 +65,7 @@
 
         long deliveryTag = body.getDeliveryTag();
 
-        UnacknowledgedMessage message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+        QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
 
         if (message == null)
         {
@@ -85,11 +74,16 @@
         }
         else
         {
-            if (message.isQueueDeleted() || message.getQueue().isDeleted())
+            if (message.isQueueDeleted())
             {
                 _logger.warn("Message's Queue as already been purged, unable to Reject. " +
                              "Dropping message should use Dead Letter Queue");
-                //sendtoDeadLetterQueue(msg)                
+                message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
+                if(message != null)
+                {
+                    message.discard(channel.getStoreContext());
+                }
+                //sendtoDeadLetterQueue(msg)
                 return;
             }
 
@@ -111,7 +105,7 @@
             // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
             //if (!evt.getMethod().resend)
             {
-                message.entry.reject();
+                message.reject();
             }
 
             if (body.getRequeue())
@@ -121,6 +115,7 @@
             else
             {
                 _logger.warn("Dropping message as requeue not required and there is no dead letter queue");
+                 message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
                 //sendtoDeadLetterQueue(AMQMessage message)
 //                message.queue = channel.getDefaultDeadLetterQueue();
 //                channel.requeue(deliveryTag);

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -57,9 +57,6 @@
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
 
-
-        //fixme Vhost not defined yet
-        //session.getVirtualHost().getAuthenticationManager();
         AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();
 
         SaslServer ss = session.getSaslServer();
@@ -72,11 +69,12 @@
         switch (authResult.status)
         {
             case ERROR:
-                // Can't do this as we violate protocol. Need to send Close
-                // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
-                _logger.info("Authentication failed");
-                stateManager.changeState(AMQState.CONNECTION_CLOSING);
+                Exception cause = authResult.getCause();
 
+                _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+                // This should be abstracted
+                stateManager.changeState(AMQState.CONNECTION_CLOSING);
 
                 ConnectionCloseBody connectionCloseBody =
                         methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
@@ -84,7 +82,7 @@
                                                                  body.getClazz(),
                                                                  body.getMethod());
 
-                session.writeFrame(connectionCloseBody.generateFrame(0) );
+                session.writeFrame(connectionCloseBody.generateFrame(0));
                 disposeSaslServer(session);
                 break;
             case SUCCESS:
@@ -96,7 +94,7 @@
                                                                 ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
                                                                 HeartbeatConfig.getInstance().getDelay());
                 session.writeFrame(tuneBody.generateFrame(0));
-                session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));                
+                session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
                 disposeSaslServer(session);
                 break;
             case CONTINUE:

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -93,7 +93,10 @@
             switch (authResult.status)
             {
                 case ERROR:
-                    _logger.info("Authentication failed");
+                    Exception cause = authResult.getCause();
+
+                    _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
                     stateManager.changeState(AMQState.CONNECTION_CLOSING);
 
                     ConnectionCloseBody closeBody =

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Thu Aug 14 20:40:49 2008
@@ -22,12 +22,10 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Thu Aug 14 20:40:49 2008
@@ -101,7 +101,7 @@
             else if (!exchange.getType().equals(body.getType()))
             {
 
-                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(), null);
+                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
             }
 
         }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Thu Aug 14 20:40:49 2008
@@ -112,7 +112,7 @@
 
             if (!exch.isBound(routingKey, body.getArguments(), queue))
             {
-                queue.bind(routingKey, body.getArguments(), exch);
+                queue.bind(exch, routingKey, body.getArguments());
             }
         }
         catch (AMQInvalidRoutingKeyException rke)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Thu Aug 14 20:40:49 2008
@@ -34,9 +34,10 @@
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.security.access.Permission;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.store.MessageStore;
@@ -116,14 +117,14 @@
                     queue = createQueue(queueName, body, virtualHost, session);
                     if (queue.isDurable() && !queue.isAutoDelete())
                     {
-                        store.createQueue(queue);
+                        store.createQueue(queue, body.getArguments());
                     }
                     queueRegistry.registerQueue(queue);
                     if (autoRegister)
                     {
                         Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
 
-                        queue.bind(queueName, null, defaultExchange);
+                        queue.bind(defaultExchange, queueName, null);
                         _logger.info("Queue " + queueName + " bound to default exchange(" + defaultExchange.getName() + ")");
                     }
                 }
@@ -173,7 +174,9 @@
     {
         final QueueRegistry registry = virtualHost.getQueueRegistry();
         AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
-        final AMQQueue queue = new AMQQueue(queueName, body.getDurable(), owner, body.getAutoDelete(), virtualHost);
+
+        final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), virtualHost,
+                                                                  body.getArguments());
 
 
         if (body.getExclusive() && !body.getDurable())

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Thu Aug 14 20:40:49 2008
@@ -24,11 +24,10 @@
 import org.apache.qpid.framing.QueueDeleteBody;
 import org.apache.qpid.framing.QueueDeleteOkBody;
 import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.store.MessageStore;
@@ -104,15 +103,16 @@
             }
             else
             {
-
+                
                 //Perform ACLs
                 virtualHost.getAccessManager().authorise(session, Permission.DELETE, body, queue);
 
-                int purged = queue.delete(body.getIfUnused(), body.getIfEmpty());
+                int purged = queue.delete();
+
 
                 if (queue.isDurable())
                 {
-                    store.removeQueue(queue.getName());
+                    store.removeQueue(queue);
                 }
 
                 MethodRegistry methodRegistry = session.getMethodRegistry();

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Thu Aug 14 20:40:49 2008
@@ -1,121 +1,119 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- *
- *
- */
-
-package org.apache.qpid.server.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.QueuePurgeBody;
-import org.apache.qpid.framing.QueuePurgeOkBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.security.access.Permission;
-
-public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
-{
-    private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
-
-    public static QueuePurgeHandler getInstance()
-    {
-        return _instance;
-    }
-
-    private final boolean _failIfNotFound;
-
-    public QueuePurgeHandler()
-    {
-        this(true);
-    }
-
-    public QueuePurgeHandler(boolean failIfNotFound)
-    {
-        _failIfNotFound = failIfNotFound;
-    }
-
-    public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException
-    {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-        VirtualHost virtualHost = session.getVirtualHost();
-        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
-        AMQChannel channel = session.getChannel(channelId);
-
-
-        AMQQueue queue;
-        if(body.getQueue() == null)
-        {
-
-           if (channel == null)
-           {
-               throw body.getChannelNotFoundException(channelId);
-           }
-
-           //get the default queue on the channel:
-           queue = channel.getDefaultQueue();
-            
-            if(queue == null)
-            {
-                if(_failIfNotFound)
-                {
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.");
-                }
-            }
-        }
-        else
-        {
-            queue = queueRegistry.getQueue(body.getQueue());
-        }
-
-        if(queue == null)
-        {
-            if(_failIfNotFound)
-            {
-                throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
-            }
-        }
-        else
-        {
-
-                //Perform ACLs
-                virtualHost.getAccessManager().authorise(session, Permission.PURGE, body, queue);
-
-                long purged = queue.clearQueue(channel.getStoreContext());
-
-
-                if(!body.getNowait())
-                {
-
-                    MethodRegistry methodRegistry = session.getMethodRegistry();
-                    AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
-                    session.writeFrame(responseBody.generateFrame(channelId));
-                    
-                }
-        }
-    }
-}
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.QueuePurgeBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.security.access.Permission;
+
+public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
+{
+    private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
+
+    public static QueuePurgeHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private final boolean _failIfNotFound;
+
+    public QueuePurgeHandler()
+    {
+        this(true);
+    }
+
+    public QueuePurgeHandler(boolean failIfNotFound)
+    {
+        _failIfNotFound = failIfNotFound;
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException
+    {
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        VirtualHost virtualHost = session.getVirtualHost();
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+        AMQChannel channel = session.getChannel(channelId);
+
+
+        AMQQueue queue;
+        if(body.getQueue() == null)
+        {
+
+           if (channel == null)
+           {
+               throw body.getChannelNotFoundException(channelId);
+           }
+
+           //get the default queue on the channel:
+           queue = channel.getDefaultQueue();
+            
+            if(queue == null)
+            {
+                if(_failIfNotFound)
+                {
+                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.");
+                }
+            }
+        }
+        else
+        {
+            queue = queueRegistry.getQueue(body.getQueue());
+        }
+
+        if(queue == null)
+        {
+            if(_failIfNotFound)
+            {
+                throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+            }
+        }
+        else
+        {
+
+                //Perform ACLs
+                virtualHost.getAccessManager().authorise(session, Permission.PURGE, body, queue);
+
+                long purged = queue.clearQueue(channel.getStoreContext());
+
+
+                if(!body.getNowait())
+                {
+
+                    MethodRegistry methodRegistry = session.getMethodRegistry();
+                    AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
+                    session.writeFrame(responseBody.generateFrame(channelId));
+                    
+                }
+        }
+    }
+}

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
            ('svn:eol-style' removed)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java Thu Aug 14 20:40:49 2008
@@ -1,24 +1,3 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
 package org.apache.qpid.server.handler;
 
 import org.apache.log4j.Logger;
@@ -63,7 +42,7 @@
 
 
         final AMQQueue queue;
-        final AMQShortString routingKey;
+        final AMQShortString routingKey;               
 
         if (body.getQueue() == null)
         {
@@ -105,7 +84,7 @@
 
         try
         {
-            queue.unBind(routingKey, body.getArguments(), exch);
+            queue.unBind(exch, routingKey, body.getArguments());
         }
         catch (AMQInvalidRoutingKeyException rke)
         {