You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/05 12:59:50 UTC
svn commit: r1564721 [1/2] - in
/qpid/branches/java-broker-amqp-1-0-management/java:
broker-core/src/main/java/org/apache/qpid/server/exchange/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/p...
Author: rgodfrey
Date: Wed Feb 5 11:59:49 2014
New Revision: 1564721
URL: http://svn.apache.org/r1564721
Log:
Use abstractions for sources and destinations for message ingress and egress in all protocol transports
Added:
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
- copied, changed from r1564581, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
Modified:
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Wed Feb 5 11:59:49 2014
@@ -33,6 +33,7 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -429,7 +430,7 @@ public abstract class AbstractExchange i
public final int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<QueueEntry> postEnqueueAction)
+ final Action<MessageInstance> postEnqueueAction)
{
List<? extends BaseQueue> queues = route(message, instanceProperties);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Wed Feb 5 11:59:49 2014
@@ -36,6 +36,7 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -336,7 +337,7 @@ public class DefaultExchange implements
public final int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<QueueEntry> postEnqueueAction)
+ final Action<MessageInstance> postEnqueueAction)
{
final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
if(q == null)
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java Wed Feb 5 11:59:49 2014
@@ -24,22 +24,16 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
-public interface Exchange extends ExchangeReferrer
+public interface Exchange extends ExchangeReferrer, MessageDestination
{
void initialise(UUID id, VirtualHost host, String name, boolean durable, boolean autoDelete)
throws AMQException;
@@ -97,19 +91,6 @@ public interface Exchange extends Exchan
void close() throws AMQException;
/**
- * Routes a message
- * @param message the message to be routed
- * @param instanceProperties the instance properties
- * @param txn the transaction to enqueue within
- * @param postEnqueueAction action to perform on the result of every enqueue (may be null)
- * @return the number of queues in which the message was enqueued performed
- */
- int send(ServerMessage message,
- InstanceProperties instanceProperties,
- ServerTransaction txn,
- Action<QueueEntry> postEnqueueAction);
-
- /**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
* @param bindingKey
* @param arguments
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java?rev=1564721&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java Wed Feb 5 11:59:49 2014
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+
+public interface MessageDestination
+{
+
+ public String getName();
+
+ /**
+ * Routes a message
+ * @param message the message to be routed
+ * @param instanceProperties the instance properties
+ * @param txn the transaction to enqueue within
+ * @param postEnqueueAction action to perform on the result of every enqueue (may be null)
+ * @return the number of queues in which the message was enqueued performed
+ */
+ int send(ServerMessage message,
+ InstanceProperties instanceProperties,
+ ServerTransaction txn,
+ Action<MessageInstance> postEnqueueAction);
+}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Wed Feb 5 11:59:49 2014
@@ -45,9 +45,9 @@ public interface MessageInstance
void decrementDeliveryCount();
- void addStateChangeListener(StateChangeListener<QueueEntry, State> listener);
+ void addStateChangeListener(StateChangeListener<MessageInstance, State> listener);
- boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener);
+ boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener);
boolean acquiredByConsumer();
@@ -71,7 +71,7 @@ public interface MessageInstance
int getMaximumDeliveryCount();
- int routeToAlternate(Action<QueueEntry> action, ServerTransaction txn);
+ int routeToAlternate(Action<MessageInstance> action, ServerTransaction txn);
Filterable asFilterable();
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1564721&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Wed Feb 5 11:59:49 2014
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.store.TransactionLogResource;
+
+import java.util.Collection;
+import java.util.EnumSet;
+
+public interface MessageSource extends TransactionLogResource
+{
+ Consumer addConsumer(ConsumerTarget target, FilterManager filters,
+ Class<? extends ServerMessage> messageClass,
+ String consumerName, EnumSet<Consumer.Option> options) throws AMQException;
+
+ Collection<Consumer> getConsumers();
+
+ void addConsumerRegistrationListener(ConsumerRegistrationListener listener);
+
+ void removeConsumerRegistrationListener(ConsumerRegistrationListener listener);
+
+ AuthorizationHolder getAuthorizationHolder();
+
+ void setAuthorizationHolder(AuthorizationHolder principalHolder);
+
+ void setExclusiveOwningSession(AMQSessionModel owner);
+
+ AMQSessionModel getExclusiveOwningSession();
+
+ boolean isExclusive();
+
+ void enqueue(ServerMessage message) throws AMQException;
+
+ interface ConsumerRegistrationListener
+ {
+ void consumerAdded(AMQQueue queue, Consumer consumer);
+ void consumerRemoved(AMQQueue queue, Consumer consumer);
+ }
+
+ /**
+ * ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create a consumer, because an exclusive consumer already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Move to top level, used outside this class.
+ */
+ static final class ExistingExclusiveConsumer extends AMQException
+ {
+
+ public ExistingExclusiveConsumer()
+ {
+ super("");
+ }
+ }
+
+ /**
+ * ExistingConsumerPreventsExclusive signals a failure to create an exclusive consumer, as a consumer
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create an exclusive consumer, as a consumer already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Move to top level, used outside this class.
+ */
+ static final class ExistingConsumerPreventsExclusive extends AMQException
+ {
+ public ExistingConsumerPreventsExclusive()
+ {
+ super("");
+ }
+ }
+}
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java?rev=1564721&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java Wed Feb 5 11:59:49 2014
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+public interface CapacityChecker
+{
+ void checkCapacity(AMQSessionModel channel);
+}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Wed Feb 5 11:59:49 2014
@@ -25,25 +25,19 @@ import org.apache.qpid.server.binding.Bi
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
-import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.consumer.Consumer;
-import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
-import java.util.EnumSet;
import java.util.List;
import java.util.Set;
-public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue
+public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, BaseQueue, MessageSource, CapacityChecker
{
- String getName();
public interface NotificationListener
{
@@ -75,29 +69,9 @@ public interface AMQQueue extends Compar
boolean isAutoDelete();
String getOwner();
- AuthorizationHolder getAuthorizationHolder();
- void setAuthorizationHolder(AuthorizationHolder principalHolder);
-
- void setExclusiveOwningSession(AMQSessionModel owner);
- AMQSessionModel getExclusiveOwningSession();
VirtualHost getVirtualHost();
- Consumer addConsumer(final ConsumerTarget target, final FilterManager filters,
- final Class<? extends ServerMessage> messageClass,
- final String consumerName, EnumSet<Consumer.Option> options) throws AMQException;
-
- Collection<Consumer> getConsumers();
-
- interface ConsumerRegistrationListener
- {
- void consumerAdded(AMQQueue queue, Consumer consumer);
- void consumerRemoved(AMQQueue queue, Consumer consumer);
- }
-
- void addConsumerRegistrationListener(ConsumerRegistrationListener listener);
- void removeConsumerRegistrationListener(ConsumerRegistrationListener listener);
-
int getConsumerCount();
@@ -215,8 +189,6 @@ public interface AMQQueue extends Compar
void stop();
- boolean isExclusive();
-
Exchange getAlternateExchange();
void setAlternateExchange(Exchange exchange);
@@ -224,51 +196,6 @@ public interface AMQQueue extends Compar
Collection<String> getAvailableAttributes();
Object getAttribute(String attrName);
- void checkCapacity(AMQSessionModel channel);
-
- /**
- * ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create a consumer, because an exclusive consumer already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- static final class ExistingExclusiveConsumer extends AMQException
- {
-
- public ExistingExclusiveConsumer()
- {
- super("");
- }
- }
-
- /**
- * ExistingConsumerPreventsExclusive signals a failure to create an exclusive consumer, as a consumer
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create an exclusive consumer, as a consumer already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- static final class ExistingConsumerPreventsExclusive extends AMQException
- {
- public ExistingConsumerPreventsExclusive()
- {
- super("");
- }
- }
-
void configure(QueueConfiguration config);
void setExclusive(boolean exclusive);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Wed Feb 5 11:59:49 2014
@@ -22,6 +22,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
@@ -29,7 +30,7 @@ import org.apache.qpid.server.util.Actio
public interface BaseQueue extends TransactionLogResource
{
void enqueue(ServerMessage message) throws AMQException;
- void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException;
+ void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException;
boolean isDurable();
boolean isDeleted();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Wed Feb 5 11:59:49 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.util.StateChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -240,19 +241,19 @@ public class DefinedGroupMessageGroupMan
return groupVal;
}
- private class GroupStateChangeListener implements StateChangeListener<QueueEntry, QueueEntry.State>
+ private class GroupStateChangeListener implements StateChangeListener<MessageInstance, QueueEntry.State>
{
private final Group _group;
public GroupStateChangeListener(final Group group,
- final QueueEntry entry)
+ final MessageInstance entry)
{
_group = group;
}
- public void stateChanged(final QueueEntry entry,
- final QueueEntry.State oldState,
- final QueueEntry.State newState)
+ public void stateChanged(final MessageInstance entry,
+ final MessageInstance.State oldState,
+ final MessageInstance.State newState)
{
synchronized (DefinedGroupMessageGroupManager.this)
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Wed Feb 5 11:59:49 2014
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -62,7 +63,7 @@ public abstract class QueueEntryImpl imp
(QueueEntryImpl.class, EntryState.class, "_state");
- private volatile Set<StateChangeListener<QueueEntry, State>> _stateChangeListeners;
+ private volatile Set<StateChangeListener<MessageInstance, State>> _stateChangeListeners;
private static final
AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -332,7 +333,7 @@ public abstract class QueueEntryImpl imp
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener<QueueEntry, State> l : _stateChangeListeners)
+ for(StateChangeListener<MessageInstance, State> l : _stateChangeListeners)
{
l.stateChanged(this, oldState, newState);
}
@@ -363,7 +364,7 @@ public abstract class QueueEntryImpl imp
dispose();
}
- public int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn)
+ public int routeToAlternate(final Action<MessageInstance> action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
@@ -408,21 +409,21 @@ public abstract class QueueEntryImpl imp
return getQueue().isDeleted();
}
- public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener)
+ public void addStateChangeListener(StateChangeListener<MessageInstance, State> listener)
{
- Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners;
+ Set<StateChangeListener<MessageInstance, State>> listeners = _stateChangeListeners;
if(listeners == null)
{
- _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<QueueEntry, State>>());
+ _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<MessageInstance, State>>());
listeners = _stateChangeListeners;
}
listeners.add(listener);
}
- public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener)
+ public boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener)
{
- Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners;
+ Set<StateChangeListener<MessageInstance, State>> listeners = _stateChangeListeners;
if(listeners != null)
{
return listeners.remove(listener);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed Feb 5 11:59:49 2014
@@ -42,6 +42,7 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -635,7 +636,7 @@ public class SimpleAMQQueue implements A
enqueue(message, null);
}
- public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
{
incrementQueueCount();
incrementQueueSize(message);
@@ -1967,7 +1968,7 @@ public class SimpleAMQQueue implements A
return _notificationChecks;
}
- private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State>
+ private final class QueueEntryListener implements StateChangeListener<MessageInstance, QueueEntry.State>
{
private final QueueConsumer _sub;
@@ -1988,7 +1989,7 @@ public class SimpleAMQQueue implements A
return System.identityHashCode(_sub);
}
- public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
+ public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
deliverAsync(_sub);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Wed Feb 5 11:59:49 2014
@@ -20,6 +20,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -49,7 +50,7 @@ public class SortedQueue extends OutOfOr
return _sortedPropertyName;
}
- public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
{
synchronized (_sortedQueueLock)
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Wed Feb 5 11:59:49 2014
@@ -212,7 +212,7 @@ public class AsyncAutoCommitTransaction
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Wed Feb 5 11:59:49 2014
@@ -154,7 +154,7 @@ public class AutoCommitTransaction imple
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java Wed Feb 5 11:59:49 2014
@@ -105,7 +105,7 @@ public class DistributedTransaction impl
}
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
if(_branch != null)
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java Wed Feb 5 11:59:49 2014
@@ -363,7 +363,7 @@ public class DtxBranch
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message)
{
_enqueueRecords.add(new Record(queue, message));
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Wed Feb 5 11:59:49 2014
@@ -197,7 +197,7 @@ public class LocalTransaction implements
}
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Wed Feb 5 11:59:49 2014
@@ -94,7 +94,7 @@ public interface ServerTransaction
*
* A store operation will result only for a persistent message on a durable queue.
*/
- void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction);
+ void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction);
/**
* Enqueue a message(s) to queue(s) registering a post transaction action.
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Wed Feb 5 11:59:49 2014
@@ -47,6 +47,8 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -441,6 +443,12 @@ public abstract class AbstractVirtualHos
}
@Override
+ public MessageSource getMessageSource(final String name)
+ {
+ return getQueue(name);
+ }
+
+ @Override
public AMQQueue getQueue(UUID id)
{
return _queueRegistry.getQueue(id);
@@ -524,6 +532,13 @@ public abstract class AbstractVirtualHos
}
+
+ @Override
+ public MessageDestination getMessageDestination(final String name)
+ {
+ return getExchange(name);
+ }
+
@Override
public Exchange getExchange(String name)
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Wed Feb 5 11:59:49 2014
@@ -30,6 +30,8 @@ import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
@@ -49,6 +51,7 @@ public interface VirtualHost extends Dur
String getName();
AMQQueue getQueue(String name);
+ MessageSource getMessageSource(String name);
AMQQueue getQueue(UUID id);
@@ -76,6 +79,7 @@ public interface VirtualHost extends Dur
void removeExchange(Exchange exchange, boolean force) throws AMQException;
+ MessageDestination getMessageDestination(String name);
Exchange getExchange(String name);
Exchange getExchange(UUID id);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Wed Feb 5 11:59:49 2014
@@ -26,6 +26,7 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
@@ -305,7 +306,7 @@ public class MockAMQQueue implements AMQ
{
}
- public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
{
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Wed Feb 5 11:59:49 2014
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.consumer.Consumer;
@@ -62,7 +63,7 @@ public class MockQueueEntry implements Q
return false;
}
- public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener)
+ public void addStateChangeListener(StateChangeListener<MessageInstance, State> listener)
{
}
@@ -72,7 +73,7 @@ public class MockQueueEntry implements Q
}
- public int routeToAlternate(final Action<QueueEntry> action, final ServerTransaction txn)
+ public int routeToAlternate(final Action<MessageInstance> action, final ServerTransaction txn)
{
return 0;
}
@@ -152,7 +153,7 @@ public class MockQueueEntry implements Q
}
- public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener)
+ public boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener)
{
return false;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Wed Feb 5 11:59:49 2014
@@ -245,11 +245,11 @@ public class SimpleAMQQueueTest extends
Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
+ Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
{
- public void performAction(QueueEntry entry)
+ public void performAction(MessageInstance entry)
{
- queueEntries.add(entry);
+ queueEntries.add((QueueEntry) entry);
}
};
@@ -298,11 +298,11 @@ public class SimpleAMQQueueTest extends
Consumer.Option.ACQUIRES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
+ Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
{
- public void performAction(QueueEntry entry)
+ public void performAction(MessageInstance entry)
{
- queueEntries.add(entry);
+ queueEntries.add((QueueEntry) entry);
}
};
@@ -356,11 +356,11 @@ public class SimpleAMQQueueTest extends
Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
+ Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
{
- public void performAction(QueueEntry entry)
+ public void performAction(MessageInstance entry)
{
- queueEntries.add(entry);
+ queueEntries.add((QueueEntry) entry);
}
};
@@ -420,11 +420,11 @@ public class SimpleAMQQueueTest extends
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
+ Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
{
- public void performAction(QueueEntry entry)
+ public void performAction(MessageInstance entry)
{
- queueEntries.add(entry);
+ queueEntries.add((QueueEntry)entry);
}
};
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java Wed Feb 5 11:59:49 2014
@@ -27,6 +27,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
@@ -127,6 +129,12 @@ public class MockVirtualHost implements
}
@Override
+ public MessageSource getMessageSource(final String name)
+ {
+ return null;
+ }
+
+ @Override
public AMQQueue getQueue(UUID id)
{
return null;
@@ -174,6 +182,12 @@ public class MockVirtualHost implements
}
@Override
+ public MessageDestination getMessageDestination(final String name)
+ {
+ return null;
+ }
+
+ @Override
public Exchange getExchange(String name)
{
return null;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Wed Feb 5 11:59:49 2014
@@ -385,13 +385,13 @@ public class ConsumerTarget_0_10 extends
final LogActor logActor = CurrentActor.get();
final ServerMessage msg = entry.getMessage();
- int requeues = entry.routeToAlternate(new Action<QueueEntry>()
+ int requeues = entry.routeToAlternate(new Action<MessageInstance>()
{
@Override
- public void performAction(final QueueEntry requeueEntry)
+ public void performAction(final MessageInstance requeueEntry)
{
logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
- requeueEntry.getQueue().getName()));
+ requeueEntry.getOwningResource().getName()));
}
}, null);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Wed Feb 5 11:59:49 2014
@@ -46,7 +46,6 @@ import org.apache.qpid.AMQStoreException
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -55,14 +54,16 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
@@ -104,12 +105,16 @@ public class ServerSession extends Sessi
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
- private final Action<QueueEntry> _checkCapacityAction = new Action<QueueEntry>()
+ private final Action<MessageInstance> _checkCapacityAction = new Action<MessageInstance>()
{
@Override
- public void performAction(final QueueEntry entry)
+ public void performAction(final MessageInstance entry)
{
- entry.getQueue().checkCapacity(ServerSession.this);
+ TransactionLogResource queue = entry.getOwningResource();
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(ServerSession.this);
+ }
}
};
@@ -188,7 +193,7 @@ public class ServerSession extends Sessi
public int enqueue(final MessageTransferMessage message,
final InstanceProperties instanceProperties,
- final Exchange exchange)
+ final MessageDestination exchange)
{
if(_outstandingCredit.get() != UNLIMITED_CREDIT
&& _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Wed Feb 5 11:59:49 2014
@@ -35,7 +35,9 @@ import org.apache.qpid.server.filter.Fil
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
@@ -196,7 +198,7 @@ public class ServerSessionDelegate exten
String queueName = method.getQueue();
VirtualHost vhost = getVirtualHost(session);
- final AMQQueue queue = vhost.getQueue(queueName);
+ final MessageSource queue = vhost.getMessageSource(queueName);
if(queue == null)
{
@@ -308,7 +310,7 @@ public class ServerSessionDelegate exten
@Override
public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
- final Exchange exchange = getExchangeForMessage(ssn, xfr);
+ final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -327,7 +329,6 @@ public class ServerSessionDelegate exten
return;
}
- final Exchange exchangeInUse;
final MessageStore store = getVirtualHost(ssn).getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
final ServerSession serverSession = (ServerSession) ssn;
@@ -829,24 +830,24 @@ public class ServerSessionDelegate exten
return getVirtualHost(session).getExchange(exchangeName);
}
- private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
+ private MessageDestination getDestinationForMessage(Session ssn, MessageTransfer xfr)
{
VirtualHost virtualHost = getVirtualHost(ssn);
- Exchange exchange;
+ MessageDestination destination;
if(xfr.hasDestination())
{
- exchange = virtualHost.getExchange(xfr.getDestination());
- if(exchange == null)
+ destination = virtualHost.getMessageDestination(xfr.getDestination());
+ if(destination == null)
{
- exchange = virtualHost.getDefaultExchange();
+ destination = virtualHost.getDefaultExchange();
}
}
else
{
- exchange = virtualHost.getDefaultExchange();
+ destination = virtualHost.getDefaultExchange();
}
- return exchange;
+ return destination;
}
private VirtualHost getVirtualHost(Session session)
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Feb 5 11:59:49 2014
@@ -56,9 +56,12 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -256,7 +259,7 @@ public class AMQChannel implements AMQSe
return _channelId;
}
- public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException
+ public void setPublishFrame(MessagePublishInfo info, final MessageDestination e) throws AMQSecurityException
{
String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString();
SecurityManager securityManager = getVirtualHost().getSecurityManager();
@@ -265,7 +268,7 @@ public class AMQChannel implements AMQSe
throw new AMQSecurityException("Permission denied: " + e.getName());
}
_currentMessage = new IncomingMessage(info);
- _currentMessage.setExchange(e);
+ _currentMessage.setMessageDestination(e);
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
@@ -350,7 +353,7 @@ public class AMQChannel implements AMQSe
}
};
- int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction,
+ int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction,
immediate ? _immediateAction : _capacityCheckAction);
if(enqueues == 0)
{
@@ -497,19 +500,19 @@ public class AMQChannel implements AMQSe
* Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
* up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
*
+ *
* @param tag the tag chosen by the client (if null, server will generate one)
- * @param queue the queue to subscribe to
+ * @param source the queue to subscribe to
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
*
- * @param noLocal Flag stopping own messages being received.
* @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
*
* @throws AMQException if something goes wrong
*/
- public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
+ public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
+ FieldTable filters, boolean exclusive) throws AMQException
{
if (tag == null)
{
@@ -557,7 +560,7 @@ public class AMQChannel implements AMQSe
try
{
Consumer sub =
- queue.addConsumer(target,
+ source.addConsumer(target,
FilterManagerFactory.createManager(FieldTable.convertToMap(filters)),
AMQMessage.class,
AMQShortString.toString(tag),
@@ -1189,16 +1192,16 @@ public class AMQChannel implements AMQSe
}
- private class ImmediateAction implements Action<QueueEntry>
+ private class ImmediateAction implements Action<MessageInstance>
{
public ImmediateAction()
{
}
- public void performAction(QueueEntry entry)
+ public void performAction(MessageInstance entry)
{
- AMQQueue queue = entry.getQueue();
+ TransactionLogResource queue = entry.getOwningResource();
if (!entry.getDeliveredToConsumer() && entry.acquire())
{
@@ -1246,19 +1249,25 @@ public class AMQChannel implements AMQSe
}
else
{
- queue.checkCapacity(AMQChannel.this);
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+ }
}
}
}
- private final class CapacityCheckAction implements Action<QueueEntry>
+ private final class CapacityCheckAction implements Action<MessageInstance>
{
@Override
- public void performAction(final QueueEntry entry)
+ public void performAction(final MessageInstance entry)
{
- AMQQueue queue = entry.getQueue();
- queue.checkCapacity(AMQChannel.this);
+ TransactionLogResource queue = entry.getOwningResource();
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+ }
}
}
@@ -1477,13 +1486,13 @@ public class AMQChannel implements AMQSe
final ServerMessage msg = rejectedQueueEntry.getMessage();
final Consumer sub = rejectedQueueEntry.getDeliveredConsumer();
- int requeues = rejectedQueueEntry.routeToAlternate(new Action<QueueEntry>()
+ int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
{
@Override
- public void performAction(final QueueEntry requeueEntry)
+ public void performAction(final MessageInstance requeueEntry)
{
_actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
- requeueEntry.getQueue().getName()));
+ requeueEntry.getOwningResource().getName()));
}
}, null);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Wed Feb 5 11:59:49 2014
@@ -49,13 +49,13 @@ import java.util.concurrent.atomic.Atomi
public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
{
- private final StateChangeListener<QueueEntry, QueueEntry.State> _entryReleaseListener =
- new StateChangeListener<QueueEntry, QueueEntry.State>()
+ private final StateChangeListener<MessageInstance, MessageInstance.State> _entryReleaseListener =
+ new StateChangeListener<MessageInstance, MessageInstance.State>()
{
@Override
- public void stateChanged(final QueueEntry entry,
- final QueueEntry.State oldSate,
- final QueueEntry.State newState)
+ public void stateChanged(final MessageInstance entry,
+ final MessageInstance.State oldSate,
+ final MessageInstance.State newState)
{
if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED))
{
@@ -463,7 +463,7 @@ public abstract class ConsumerTarget_0_8
_creditManager.restoreCredit(1, message.getSize());
}
- protected final StateChangeListener<QueueEntry, QueueEntry.State> getReleasedStateChangeListener()
+ protected final StateChangeListener<MessageInstance, MessageInstance.State> getReleasedStateChangeListener()
{
return _entryReleaseListener;
}
@@ -526,11 +526,11 @@ public abstract class ConsumerTarget_0_8
final long size = entry.getMessage().getSize();
_unacknowledgedBytes.addAndGet(size);
_unacknowledgedCount.incrementAndGet();
- entry.addStateChangeListener(new StateChangeListener<QueueEntry, QueueEntry.State>()
+ entry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.State>()
{
- public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState)
+ public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState)
{
- if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED))
+ if(oldState.equals(MessageInstance.State.ACQUIRED) && !newState.equals(MessageInstance.State.ACQUIRED))
{
_unacknowledgedBytes.addAndGet(-size);
_unacknowledgedCount.decrementAndGet();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java Wed Feb 5 11:59:49 2014
@@ -20,15 +20,12 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.message.MessageDestination;
import java.util.ArrayList;
import java.util.List;
@@ -38,7 +35,7 @@ public class IncomingMessage
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
- private Exchange _exchange;
+ private MessageDestination _messageDestination;
/**
* Keeps a track of how many bytes we have received in body frames
@@ -77,9 +74,9 @@ public class IncomingMessage
return _messagePublishInfo.getExchange();
}
- public Exchange getExchange()
+ public MessageDestination getDestination()
{
- return _exchange;
+ return _messageDestination;
}
public ContentHeaderBody getContentHeader()
@@ -92,9 +89,9 @@ public class IncomingMessage
return getContentHeader().getBodySize();
}
- public void setExchange(final Exchange e)
+ public void setMessageDestination(final MessageDestination e)
{
- _exchange = e;
+ _messageDestination = e;
}
public int getBodyCount() throws AMQException
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Wed Feb 5 11:59:49 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -73,7 +74,7 @@ public class BasicConsumeMethodHandler i
" args:" + body.getArguments());
}
- AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
+ MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
if (queue == null)
{
@@ -120,8 +121,11 @@ public class BasicConsumeMethodHandler i
if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
{
- AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
- body.getArguments(), body.getNoLocal(), body.getExclusive());
+ AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
+ queue,
+ !body.getNoAck(),
+ body.getArguments(),
+ body.getExclusive());
if (!body.getNowait())
{
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java Wed Feb 5 11:59:49 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -67,7 +68,7 @@ public class BasicPublishMethodHandler i
}
VirtualHost vHost = session.getVirtualHost();
- Exchange exch = vHost.getExchange(exchangeName.toString());
+ MessageDestination exch = vHost.getMessageDestination(exchangeName.toString());
// if the exchange does not exist we raise a channel exception
if (exch == null)
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Wed Feb 5 11:59:49 2014
@@ -140,7 +140,7 @@ public class AcknowledgeTest extends Qpi
assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
//Subscribe to the queue
- AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true);
+ AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true);
getQueue().deliverAsync();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Wed Feb 5 11:59:49 2014
@@ -144,6 +144,6 @@ public class QueueBrowserUsesNoAckTest e
FieldTable filters = new FieldTable();
filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- return channel.subscribeToQueue(null, queue, true, filters, false, true);
+ return channel.consumeFromSource(null, queue, true, filters, true);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org