You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/04 10:17:56 UTC
svn commit: r1772514 [1/2] - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/filter/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/or...
Author: rgodfrey
Date: Sun Dec 4 10:17:55 2016
New Revision: 1772514
URL: http://svn.apache.org/viewvc?rev=1772514&view=rev
Log:
QPID-7572 : Remove ConsumerImpl interface and add MessageInstanceConsumer
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerOption.java (with props)
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java (with props)
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java (with props)
Removed:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Sun Dec 4 10:17:55 2016
@@ -35,9 +35,10 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AbstractQueue;
+import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
import org.apache.qpid.server.transport.AMQPConnection;
@@ -56,9 +57,9 @@ public abstract class AbstractConsumerTa
private final boolean _isMultiQueue;
private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
- private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
+ private final List<MessageInstanceConsumer> _consumers = new CopyOnWriteArrayList<>();
- private Iterator<ConsumerImpl> _pullIterator;
+ private Iterator<MessageInstanceConsumer> _pullIterator;
private boolean _notifyWorkDesired;
private final AtomicBoolean _scheduled = new AtomicBoolean();
@@ -117,7 +118,7 @@ public abstract class AbstractConsumerTa
}
}
- for (ConsumerImpl consumer : _consumers)
+ for (MessageInstanceConsumer consumer : _consumers)
{
consumer.setNotifyWorkDesired(desired);
}
@@ -144,13 +145,13 @@ public abstract class AbstractConsumerTa
}
@Override
- public void consumerAdded(final ConsumerImpl sub)
+ public void consumerAdded(final MessageInstanceConsumer sub)
{
_consumers.add(sub);
}
@Override
- public ListenableFuture<Void> consumerRemoved(final ConsumerImpl sub)
+ public ListenableFuture<Void> consumerRemoved(final MessageInstanceConsumer sub)
{
if(_consumers.contains(sub))
{
@@ -176,7 +177,7 @@ public abstract class AbstractConsumerTa
return sessionModel.getAMQPConnection().doOnIOThreadAsync(task);
}
- private void consumerRemovedInternal(final ConsumerImpl sub)
+ private void consumerRemovedInternal(final MessageInstanceConsumer sub)
{
_consumers.remove(sub);
if(_consumers.isEmpty())
@@ -185,7 +186,7 @@ public abstract class AbstractConsumerTa
}
}
- public List<ConsumerImpl> getConsumers()
+ public List<MessageInstanceConsumer> getConsumers()
{
return _consumers;
}
@@ -203,7 +204,7 @@ public abstract class AbstractConsumerTa
}
@Override
- public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public final long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
doSend(consumer, entry, batch);
@@ -214,14 +215,14 @@ public abstract class AbstractConsumerTa
return entry.getMessage().getSize();
}
- protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
+ protected abstract void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch);
@Override
public boolean sendNextMessage()
{
- AbstractQueue.MessageContainer messageContainer = null;
- ConsumerImpl consumer = null;
+ MessageContainer messageContainer = null;
+ MessageInstanceConsumer consumer = null;
boolean iteratedCompleteList = false;
while (messageContainer == null)
{
@@ -270,12 +271,12 @@ public abstract class AbstractConsumerTa
{
if (_state.compareAndSet(State.OPEN, State.CLOSED))
{
- List<ConsumerImpl> consumers = new ArrayList<>(_consumers);
+ List<MessageInstanceConsumer> consumers = new ArrayList<>(_consumers);
_consumers.clear();
setNotifyWorkDesired(false);
- for (ConsumerImpl consumer : consumers)
+ for (MessageInstanceConsumer consumer : consumers)
{
consumer.close();
}
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerOption.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerOption.java?rev=1772514&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerOption.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerOption.java Sun Dec 4 10:17:55 2016
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.consumer;
+
+public enum ConsumerOption
+{
+ ACQUIRES,
+ SEES_REQUEUES,
+ TRANSIENT,
+ EXCLUSIVE,
+ NO_LOCAL,
+ DURABLE
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerOption.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Sun Dec 4 10:17:55 2016
@@ -23,6 +23,7 @@ package org.apache.qpid.server.consumer;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -49,9 +50,9 @@ public interface ConsumerTarget
State getState();
- void consumerAdded(ConsumerImpl sub);
+ void consumerAdded(MessageInstanceConsumer sub);
- ListenableFuture<Void> consumerRemoved(ConsumerImpl sub);
+ ListenableFuture<Void> consumerRemoved(MessageInstanceConsumer sub);
long getUnacknowledgedBytes();
@@ -59,7 +60,7 @@ public interface ConsumerTarget
AMQSessionModel<?> getSessionModel();
- long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
+ long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch);
boolean sendNextMessage();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java Sun Dec 4 10:17:55 2016
@@ -31,10 +31,9 @@ import org.apache.qpid.common.AMQPFilter
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
-import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.queue.QueueConsumer;
public class FilterSupport
{
@@ -119,9 +118,9 @@ public class FilterSupport
@PluggableService
public static final class NoLocalFilter implements MessageFilter
{
- private final MessageSource _queue;
+ private final Queue<?> _queue;
- private NoLocalFilter(MessageSource queue)
+ private NoLocalFilter(Queue<?> queue)
{
_queue = queue;
}
@@ -135,8 +134,8 @@ public class FilterSupport
public boolean matches(Filterable message)
{
- final Collection<? extends ConsumerImpl> consumers = _queue.getConsumers();
- for(ConsumerImpl c : consumers)
+ final Collection<QueueConsumer<?>> consumers = _queue.getConsumers();
+ for(QueueConsumer<?> c : consumers)
{
if(c.getSessionModel().getConnectionReference() == message.getConnectionReference())
{
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java?rev=1772514&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java Sun Dec 4 10:17:55 2016
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+public final class MessageContainer
+{
+ private final MessageInstance _messageInstance;
+ private final MessageReference<?> _messageReference;
+ private final boolean _hasNoAvailableMessages;
+
+ public MessageContainer(final boolean hasNoAvailableMessages)
+ {
+ this(null, null, hasNoAvailableMessages);
+ }
+
+ public MessageContainer(final MessageInstance messageInstance,
+ final MessageReference<?> messageReference, final boolean hasNoAvailableMessages)
+ {
+ _messageInstance = messageInstance;
+ _messageReference = messageReference;
+ _hasNoAvailableMessages = hasNoAvailableMessages;
+ }
+
+ public MessageInstance getMessageInstance()
+ {
+ return _messageInstance;
+ }
+
+ public MessageReference<?> getMessageReference()
+ {
+ return _messageReference;
+ }
+
+ public boolean hasNoAvailableMessages()
+ {
+ return _hasNoAvailableMessages;
+ }
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java Sun Dec 4 10:17:55 2016
@@ -54,8 +54,8 @@ public class MessageInfoImpl implements
final ServerMessage message = instance.getMessage();
final AMQMessageHeader messageHeader = message.getMessageHeader();
- _deliveredTo = instance.getDeliveredConsumer() == null ? null : String.valueOf(instance.getDeliveredConsumer()
- .getConsumerNumber());
+ _deliveredTo = instance.getAcquiringConsumer() == null ? null : String.valueOf(instance.getAcquiringConsumer()
+ .getIdentifier());
_arrivalTime = message.getArrivalTime() == 0L ? null : new Date(message.getArrivalTime());
_persistent = message.isPersistent();
_messageId = messageHeader.getMessageId();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Sun Dec 4 10:17:55 2016
@@ -21,7 +21,6 @@
package org.apache.qpid.server.message;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -50,27 +49,25 @@ public interface MessageInstance
boolean acquiredByConsumer();
- boolean isAcquiredBy(ConsumerImpl consumer);
+ boolean isAcquiredBy(MessageInstanceConsumer consumer);
- boolean removeAcquisitionFromConsumer(ConsumerImpl consumer);
+ boolean removeAcquisitionFromConsumer(MessageInstanceConsumer consumer);
void setRedelivered();
boolean isRedelivered();
- ConsumerImpl getDeliveredConsumer();
-
void reject();
- boolean isRejectedBy(ConsumerImpl consumer);
+ boolean isRejectedBy(MessageInstanceConsumer consumer);
boolean getDeliveredToConsumer();
boolean expired();
- boolean acquire(ConsumerImpl sub);
+ boolean acquire(MessageInstanceConsumer sub);
- boolean makeAcquisitionUnstealable(final ConsumerImpl consumer);
+ boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer);
boolean makeAcquisitionStealable();
@@ -80,7 +77,7 @@ public interface MessageInstance
Filterable asFilterable();
- ConsumerImpl getAcquiringConsumer();
+ MessageInstanceConsumer getAcquiringConsumer();
MessageEnqueueRecord getEnqueueRecord();
@@ -171,7 +168,7 @@ public interface MessageInstance
}
}
- abstract class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
+ abstract class ConsumerAcquiredState<C extends MessageInstanceConsumer> extends EntryState
{
public abstract C getConsumer();
@@ -188,7 +185,7 @@ public interface MessageInstance
}
}
- final class StealableConsumerAcquiredState<C extends ConsumerImpl> extends ConsumerAcquiredState
+ final class StealableConsumerAcquiredState<C extends MessageInstanceConsumer> extends ConsumerAcquiredState
{
private final C _consumer;
private final UnstealableConsumerAcquiredState<C> _unstealableState;
@@ -211,7 +208,7 @@ public interface MessageInstance
}
}
- final class UnstealableConsumerAcquiredState<C extends ConsumerImpl> extends ConsumerAcquiredState
+ final class UnstealableConsumerAcquiredState<C extends MessageInstanceConsumer> extends ConsumerAcquiredState
{
private final StealableConsumerAcquiredState<C> _stealableState;
@@ -246,7 +243,7 @@ public interface MessageInstance
void release();
- void release(ConsumerImpl release);
+ void release(MessageInstanceConsumer release);
void delete();
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java?rev=1772514&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java Sun Dec 4 10:17:55 2016
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.server.consumer.ConsumerTarget;
+
+public interface MessageInstanceConsumer
+{
+ boolean isClosed();
+
+ boolean acquires();
+
+ String getName();
+
+ void close();
+
+ void externalStateChange();
+
+ Object getIdentifier();
+
+ MessageContainer pullMessage();
+
+ ConsumerTarget getTarget();
+
+ void setNotifyWorkDesired(boolean desired);
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Sun Dec 4 10:17:55 2016
@@ -23,7 +23,7 @@ package org.apache.qpid.server.message;
import java.util.Collection;
import java.util.EnumSet;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -31,15 +31,15 @@ import org.apache.qpid.server.store.Tran
public interface MessageSource extends TransactionLogResource, MessageNode
{
- ConsumerImpl addConsumer(ConsumerTarget target, FilterManager filters,
- Class<? extends ServerMessage> messageClass,
- String consumerName,
- EnumSet<ConsumerImpl.Option> options,
- Integer priority)
+ MessageInstanceConsumer addConsumer(ConsumerTarget target, FilterManager filters,
+ Class<? extends ServerMessage> messageClass,
+ String consumerName,
+ EnumSet<ConsumerOption> options,
+ Integer priority)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused, QueueDeleted;
- Collection<? extends ConsumerImpl> getConsumers();
+ Collection<? extends MessageInstanceConsumer> getConsumers();
boolean verifySessionAccess(AMQSessionModel<?> session);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java Sun Dec 4 10:17:55 2016
@@ -20,10 +20,13 @@
*/
package org.apache.qpid.server.model;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.protocol.AMQSessionModel;
@ManagedObject(creatable = false, amqpName = "org.apache.qpid.Consumer")
-public interface Consumer<X extends Consumer<X>> extends ConfiguredObject<X>, ConsumerImpl
+public interface Consumer<X extends Consumer<X>> extends ConfiguredObject<X>, MessageInstanceConsumer
{
String DISTRIBUTION_MODE = "distributionMode";
String EXCLUSIVE = "exclusive";
@@ -38,6 +41,8 @@ public interface Consumer<X extends Cons
@ManagedContextDefault( name = SUSPEND_NOTIFICATION_PERIOD)
long SUSPEND_NOTIFICATION_PERIOD_DEFAULT = 10000;
+ AtomicLong CONSUMER_NUMBER_GENERATOR = new AtomicLong(0);
+
@DerivedAttribute
String getLinkName();
@@ -73,4 +78,15 @@ public interface Consumer<X extends Cons
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Prefetch")
long getUnacknowledgedMessages();
+
+ AMQSessionModel getSessionModel();
+
+ long getConsumerNumber();
+
+ boolean isSuspended();
+
+ boolean seesRequeues();
+
+ boolean isActive();
+
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Sun Dec 4 10:17:55 2016
@@ -74,7 +74,7 @@ import org.apache.qpid.filter.selector.T
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.Task;
import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.JMSSelectorFilter;
@@ -85,6 +85,7 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageInfo;
import org.apache.qpid.server.message.MessageInfoImpl;
@@ -695,7 +696,7 @@ public abstract class AbstractQueue<X ex
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- final EnumSet<ConsumerImpl.Option> optionSet,
+ final EnumSet<ConsumerOption> optionSet,
final Integer priority)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused, QueueDeleted
@@ -758,7 +759,7 @@ public abstract class AbstractQueue<X ex
FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- EnumSet<ConsumerImpl.Option> optionSet,
+ EnumSet<ConsumerOption> optionSet,
final Integer priority)
throws ExistingExclusiveConsumer, ConsumerAccessRefused,
ExistingConsumerPreventsExclusive, QueueDeleted
@@ -843,13 +844,13 @@ public abstract class AbstractQueue<X ex
throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusive);
}
- boolean exclusive = optionSet.contains(ConsumerImpl.Option.EXCLUSIVE);
- boolean isTransient = optionSet.contains(ConsumerImpl.Option.TRANSIENT);
+ boolean exclusive = optionSet.contains(ConsumerOption.EXCLUSIVE);
+ boolean isTransient = optionSet.contains(ConsumerOption.TRANSIENT);
- if(_noLocal && !optionSet.contains(ConsumerImpl.Option.NO_LOCAL))
+ if(_noLocal && !optionSet.contains(ConsumerOption.NO_LOCAL))
{
optionSet = EnumSet.copyOf(optionSet);
- optionSet.add(ConsumerImpl.Option.NO_LOCAL);
+ optionSet.add(ConsumerOption.NO_LOCAL);
}
if(exclusive && getConsumerCount() != 0)
@@ -891,7 +892,7 @@ public abstract class AbstractQueue<X ex
if(_ensureNondestructiveConsumers)
{
optionSet = EnumSet.copyOf(optionSet);
- optionSet.removeAll(EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES));
+ optionSet.removeAll(EnumSet.of(ConsumerOption.SEES_REQUEUES, ConsumerOption.ACQUIRES));
}
QueueConsumerImpl consumer = new QueueConsumerImpl(this,
@@ -1917,41 +1918,6 @@ public abstract class AbstractQueue<X ex
return _queueStatistics.getAvailableCount() != 0;
}
- public static final class MessageContainer
- {
- private final MessageInstance _messageInstance;
- private final MessageReference<?> _messageReference;
- private final boolean _hasNoAvailableMessages;
-
- public MessageContainer(final boolean hasNoAvailableMessages)
- {
- this(null, null, hasNoAvailableMessages);
- }
-
- public MessageContainer(final MessageInstance messageInstance,
- final MessageReference<?> messageReference, final boolean hasNoAvailableMessages)
- {
- _messageInstance = messageInstance;
- _messageReference = messageReference;
- _hasNoAvailableMessages = hasNoAvailableMessages;
- }
-
- public MessageInstance getMessageInstance()
- {
- return _messageInstance;
- }
-
- public MessageReference<?> getMessageReference()
- {
- return _messageReference;
- }
-
- public boolean hasNoAvailableMessages()
- {
- return _hasNoAvailableMessages;
- }
- }
-
private static final MessageContainer NO_MESSAGES = new MessageContainer(true);
private static final MessageContainer HAS_MESSAGES = new MessageContainer(false);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Sun Dec 4 10:17:55 2016
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
import org.apache.qpid.server.message.MessageInstance.EntryState;
@@ -183,7 +182,7 @@ public class DefinedGroupMessageGroupMan
}
}
- ConsumerImpl assignedSub = group.getConsumer();
+ QueueConsumer<?> assignedSub = group.getConsumer();
if(assignedSub == sub)
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Sun Dec 4 10:17:55 2016
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.Atomi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -220,7 +220,7 @@ public class LastValueQueueList extends
}
@Override
- public void release(ConsumerImpl consumer)
+ public void release(MessageInstanceConsumer consumer)
{
super.release(consumer);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Sun Dec 4 10:17:55 2016
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
@@ -47,9 +48,9 @@ import org.apache.qpid.server.logging.Ev
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -107,7 +108,7 @@ class QueueConsumerImpl
final String consumerName,
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
- EnumSet<Option> optionSet,
+ EnumSet<ConsumerOption> optionSet,
final Integer priority)
{
super(parentsMap(queue, target.getSessionModel().getModelObject()),
@@ -116,9 +117,9 @@ class QueueConsumerImpl
_sessionReference = target.getSessionModel().getConnectionReference();
_consumerNumber = CONSUMER_NUMBER_GENERATOR.getAndIncrement();
_filters = filters;
- _acquires = optionSet.contains(Option.ACQUIRES);
- _seesRequeues = optionSet.contains(Option.SEES_REQUEUES);
- _isTransient = optionSet.contains(Option.TRANSIENT);
+ _acquires = optionSet.contains(ConsumerOption.ACQUIRES);
+ _seesRequeues = optionSet.contains(ConsumerOption.SEES_REQUEUES);
+ _isTransient = optionSet.contains(ConsumerOption.TRANSIENT);
_target = target;
_queue = queue;
_linkName = consumerName;
@@ -134,7 +135,7 @@ class QueueConsumerImpl
private static Map<String, Object> createAttributeMap(final AMQSessionModel sessionModel,
String linkName,
FilterManager filters,
- EnumSet<Option> optionSet,
+ EnumSet<ConsumerOption> optionSet,
Integer priority)
{
Map<String,Object> attributes = new HashMap<String, Object>();
@@ -145,10 +146,10 @@ class QueueConsumerImpl
+ "|"
+ linkName;
attributes.put(NAME, name);
- attributes.put(EXCLUSIVE, optionSet.contains(Option.EXCLUSIVE));
- attributes.put(NO_LOCAL, optionSet.contains(Option.NO_LOCAL));
- attributes.put(DISTRIBUTION_MODE, optionSet.contains(Option.ACQUIRES) ? "MOVE" : "COPY");
- attributes.put(DURABLE,optionSet.contains(Option.DURABLE));
+ attributes.put(EXCLUSIVE, optionSet.contains(ConsumerOption.EXCLUSIVE));
+ attributes.put(NO_LOCAL, optionSet.contains(ConsumerOption.NO_LOCAL));
+ attributes.put(DISTRIBUTION_MODE, optionSet.contains(ConsumerOption.ACQUIRES) ? "MOVE" : "COPY");
+ attributes.put(DURABLE,optionSet.contains(ConsumerOption.DURABLE));
attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
if(priority != null)
{
@@ -220,9 +221,9 @@ class QueueConsumerImpl
}
@Override
- public MessageSource getMessageSource()
+ public Object getIdentifier()
{
- return _queue;
+ return getConsumerNumber();
}
@Override
@@ -333,9 +334,9 @@ class QueueConsumerImpl
}
@Override
- public AbstractQueue.MessageContainer pullMessage()
+ public MessageContainer pullMessage()
{
- AbstractQueue.MessageContainer messageContainer = _queue.deliverSingleMessage(this);
+ MessageContainer messageContainer = _queue.deliverSingleMessage(this);
if (messageContainer != null)
{
_deliveredCount.incrementAndGet();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Sun Dec 4 10:17:55 2016
@@ -33,8 +33,6 @@ public interface QueueEntry extends Mess
boolean acquireOrSteal(final Runnable delayedAcquisitionTask);
- QueueConsumer getDeliveredConsumer();
-
boolean isQueueDeleted();
QueueEntry getNextNode();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sun Dec 4 10:17:55 2016
@@ -30,11 +30,11 @@ import java.util.concurrent.atomic.Atomi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
@@ -55,7 +55,7 @@ public abstract class QueueEntryImpl imp
private final MessageReference _message;
- private Set<Long> _rejectedBy = null;
+ private Set<Object> _rejectedBy = null;
private static final EntryState HELD_STATE = new EntryState()
{
@@ -263,7 +263,7 @@ public abstract class QueueEntryImpl imp
boolean acquired = acquire();
if(!acquired)
{
- QueueConsumer consumer = getDeliveredConsumer();
+ QueueConsumer consumer = getAcquiringConsumer();
acquired = removeAcquisitionFromConsumer(consumer);
if(acquired)
{
@@ -304,7 +304,7 @@ public abstract class QueueEntryImpl imp
return acquired;
}
- public boolean acquire(ConsumerImpl sub)
+ public boolean acquire(MessageInstanceConsumer sub)
{
final boolean acquired = acquire(((QueueConsumer<?>) sub).getOwningState().getUnstealableState());
if(acquired)
@@ -315,7 +315,7 @@ public abstract class QueueEntryImpl imp
}
@Override
- public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+ public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
{
EntryState state = _state;
if(state instanceof StealableConsumerAcquiredState
@@ -356,13 +356,13 @@ public abstract class QueueEntryImpl imp
}
@Override
- public ConsumerImpl getAcquiringConsumer()
+ public QueueConsumer<?> getAcquiringConsumer()
{
- ConsumerImpl consumer;
+ QueueConsumer<?> consumer;
EntryState state = _state;
if (state instanceof ConsumerAcquiredState)
{
- consumer = ((ConsumerAcquiredState)state).getConsumer();
+ consumer = ((ConsumerAcquiredState<QueueConsumer<?>>)state).getConsumer();
}
else
{
@@ -372,14 +372,14 @@ public abstract class QueueEntryImpl imp
}
@Override
- public boolean isAcquiredBy(ConsumerImpl consumer)
+ public boolean isAcquiredBy(MessageInstanceConsumer consumer)
{
EntryState state = _state;
return (state instanceof ConsumerAcquiredState && ((ConsumerAcquiredState)state).getConsumer() == consumer);
}
@Override
- public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer)
+ public boolean removeAcquisitionFromConsumer(MessageInstanceConsumer consumer)
{
EntryState state = _state;
if(state instanceof StealableConsumerAcquiredState
@@ -410,7 +410,7 @@ public abstract class QueueEntryImpl imp
}
@Override
- public void release(ConsumerImpl consumer)
+ public void release(MessageInstanceConsumer consumer)
{
EntryState state = _state;
if(isAcquiredBy(consumer) && _stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
@@ -469,24 +469,18 @@ public abstract class QueueEntryImpl imp
return false;
}
- @Override
- public QueueConsumer getDeliveredConsumer()
- {
- return (QueueConsumer) getAcquiringConsumer();
- }
-
public void reject()
{
- QueueConsumer consumer = getDeliveredConsumer();
+ QueueConsumer consumer = getAcquiringConsumer();
if (consumer != null)
{
if (_rejectedBy == null)
{
- _rejectedBy = new HashSet<Long>();
+ _rejectedBy = new HashSet<>();
}
- _rejectedBy.add(consumer.getConsumerNumber());
+ _rejectedBy.add(consumer.getIdentifier());
}
else
{
@@ -494,12 +488,13 @@ public abstract class QueueEntryImpl imp
}
}
- public boolean isRejectedBy(ConsumerImpl consumer)
+ @Override
+ public boolean isRejectedBy(MessageInstanceConsumer consumer)
{
if (_rejectedBy != null) // We have consumers that rejected this message
{
- return _rejectedBy.contains(consumer.getConsumerNumber());
+ return _rejectedBy.contains(consumer.getIdentifier());
}
else // This message hasn't been rejected yet.
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java Sun Dec 4 10:17:55 2016
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.Atomi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.message.MessageSource;
@@ -95,7 +95,7 @@ public class TrustStoreMessageSource ext
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- final EnumSet<ConsumerImpl.Option> options, final Integer priority)
+ final EnumSet<ConsumerOption> options, final Integer priority)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused, QueueDeleted
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Sun Dec 4 10:17:55 2016
@@ -29,18 +29,19 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AbstractQueue;
+import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -86,7 +87,7 @@ public abstract class AbstractSystemMess
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- final EnumSet<ConsumerImpl.Option> options, final Integer priority)
+ final EnumSet<ConsumerOption> options, final Integer priority)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused, QueueDeleted
{
@@ -108,14 +109,14 @@ public abstract class AbstractSystemMess
return true;
}
- protected class Consumer implements ConsumerImpl
+ protected class Consumer implements MessageInstanceConsumer
{
- private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
private final List<PropertiesMessageInstance> _queue =
Collections.synchronizedList(new ArrayList<PropertiesMessageInstance>());
private final ConsumerTarget _target;
private final String _name;
+ private final Object _identifier = new Object();
public Consumer(final String consumerName, ConsumerTarget target)
@@ -134,13 +135,19 @@ public abstract class AbstractSystemMess
}
@Override
+ public Object getIdentifier()
+ {
+ return _identifier;
+ }
+
+ @Override
public ConsumerTarget getTarget()
{
return _target;
}
@Override
- public AbstractQueue.MessageContainer pullMessage()
+ public MessageContainer pullMessage()
{
if (!_queue.isEmpty())
{
@@ -148,7 +155,7 @@ public abstract class AbstractSystemMess
if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
{
_queue.remove(0);
- return new AbstractQueue.MessageContainer(propertiesMessageInstance, null, false);
+ return new MessageContainer(propertiesMessageInstance, null, false);
}
}
return null;
@@ -164,54 +171,6 @@ public abstract class AbstractSystemMess
}
@Override
- public long getBytesOut()
- {
- return 0;
- }
-
- @Override
- public long getMessagesOut()
- {
- return 0;
- }
-
- @Override
- public long getUnacknowledgedBytes()
- {
- return 0;
- }
-
- @Override
- public long getUnacknowledgedMessages()
- {
- return 0;
- }
-
- @Override
- public AMQSessionModel getSessionModel()
- {
- return _target.getSessionModel();
- }
-
- @Override
- public MessageSource getMessageSource()
- {
- return AbstractSystemMessageSource.this;
- }
-
- @Override
- public long getConsumerNumber()
- {
- return _id;
- }
-
- @Override
- public boolean isSuspended()
- {
- return !isActive();
- }
-
- @Override
public boolean isClosed()
{
return false;
@@ -224,24 +183,12 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean seesRequeues()
- {
- return false;
- }
-
- @Override
public void close()
{
_consumers.remove(this);
}
@Override
- public boolean isActive()
- {
- return _target.isNotifyWorkDesired();
- }
-
- @Override
public String getName()
{
return _name;
@@ -307,7 +254,7 @@ public abstract class AbstractSystemMess
}
@Override
- public ConsumerImpl getAcquiringConsumer()
+ public Consumer getAcquiringConsumer()
{
return _consumer;
}
@@ -319,13 +266,13 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean isAcquiredBy(final ConsumerImpl consumer)
+ public boolean isAcquiredBy(final MessageInstanceConsumer consumer)
{
return consumer == _consumer && !isDeleted();
}
@Override
- public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ public boolean removeAcquisitionFromConsumer(final MessageInstanceConsumer consumer)
{
return consumer == _consumer;
}
@@ -343,19 +290,13 @@ public abstract class AbstractSystemMess
}
@Override
- public Consumer getDeliveredConsumer()
- {
- return isDeleted() ? null : _consumer;
- }
-
- @Override
public void reject()
{
delete();
}
@Override
- public boolean isRejectedBy(final ConsumerImpl consumer)
+ public boolean isRejectedBy(final MessageInstanceConsumer consumer)
{
return false;
}
@@ -373,13 +314,13 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean acquire(final ConsumerImpl sub)
+ public boolean acquire(final MessageInstanceConsumer sub)
{
return false;
}
@Override
- public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+ public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
{
return false;
}
@@ -435,7 +376,7 @@ public abstract class AbstractSystemMess
}
@Override
- public void release(ConsumerImpl consumer)
+ public void release(MessageInstanceConsumer consumer)
{
release();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java Sun Dec 4 10:17:55 2016
@@ -26,7 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.message.ServerMessage;
@@ -51,7 +51,7 @@ public class VirtualHostPropertiesNode e
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- final EnumSet<ConsumerImpl.Option> options, final Integer priority)
+ final EnumSet<ConsumerOption> options, final Integer priority)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused, QueueDeleted
{
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java Sun Dec 4 10:17:55 2016
@@ -35,6 +35,7 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
@@ -44,7 +45,7 @@ import org.apache.qpid.server.model.Queu
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.queue.AbstractQueue;
+import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.network.Ticker;
@@ -59,7 +60,7 @@ public class TestConsumerTarget implemen
private ArrayList<MessageInstance> _messages = new ArrayList<MessageInstance>();
private boolean _isActive = true;
- private ConsumerImpl _consumer;
+ private MessageInstanceConsumer _consumer;
private MockSessionModel _sessionModel = new MockSessionModel();
private boolean _notifyDesired;
@@ -118,7 +119,7 @@ public class TestConsumerTarget implemen
{
}
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
long size = entry.getMessage().getSize();
if (_messages.contains(entry))
@@ -158,13 +159,13 @@ public class TestConsumerTarget implemen
}
@Override
- public void consumerAdded(final ConsumerImpl sub)
+ public void consumerAdded(final MessageInstanceConsumer sub)
{
_consumer = sub;
}
@Override
- public ListenableFuture<Void> consumerRemoved(final ConsumerImpl sub)
+ public ListenableFuture<Void> consumerRemoved(final MessageInstanceConsumer sub)
{
close();
return Futures.immediateFuture(null);
@@ -179,7 +180,7 @@ public class TestConsumerTarget implemen
@Override
public boolean processPending()
{
- AbstractQueue.MessageContainer messageContainer = _consumer.pullMessage();
+ MessageContainer messageContainer = _consumer.pullMessage();
if (messageContainer == null)
{
return false;
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Sun Dec 4 10:17:55 2016
@@ -47,13 +47,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.TestConsumerTarget;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.DirectExchangeImpl;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
@@ -171,8 +172,8 @@ abstract class AbstractQueueTestBase ext
// Check adding a consumer adds it to the queue
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
@@ -203,8 +204,8 @@ abstract class AbstractQueueTestBase ext
ServerMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA, null, null);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
while(_consumerTarget.processPending());
assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after an enqueue",
@@ -221,8 +222,8 @@ abstract class AbstractQueueTestBase ext
_queue.enqueue(messageA, null, null);
_queue.enqueue(messageB, null, null);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
while(_consumerTarget.processPending());
assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after enqueues",
@@ -244,8 +245,8 @@ abstract class AbstractQueueTestBase ext
when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
_queue.enqueue(messageA, null, null);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
while(_consumerTarget.processPending());
assertEquals("Message which was not yet valid was received", 0, _consumerTarget.getMessages().size());
@@ -270,8 +271,8 @@ abstract class AbstractQueueTestBase ext
when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
_queue.enqueue(messageA, null, null);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
while(_consumerTarget.processPending());
assertEquals("Message was held despite queue not having holding enabled", 1, _consumerTarget.getMessages().size());
@@ -296,8 +297,8 @@ abstract class AbstractQueueTestBase ext
_queue.enqueue(messageB, null, null);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
while(_consumerTarget.processPending());
assertEquals("Expect one message (message B)", 1, _consumerTarget.getMessages().size());
@@ -324,8 +325,8 @@ abstract class AbstractQueueTestBase ext
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
@@ -380,7 +381,7 @@ abstract class AbstractQueueTestBase ext
}
@Override
- public long send(ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public long send(MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
try
{
@@ -394,8 +395,8 @@ abstract class AbstractQueueTestBase ext
};
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
- ConsumerImpl.Option.ACQUIRES), 0);
+ EnumSet.of(ConsumerOption.SEES_REQUEUES,
+ ConsumerOption.ACQUIRES), 0);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
@@ -465,8 +466,8 @@ abstract class AbstractQueueTestBase ext
ServerMessage messageC = createMessage(new Long(26));
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
@@ -518,12 +519,12 @@ abstract class AbstractQueueTestBase ext
QueueConsumer consumer1 = (QueueConsumer) _queue.addConsumer(target1, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
QueueConsumer consumer2 = (QueueConsumer) _queue.addConsumer(target2, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
@@ -562,8 +563,8 @@ abstract class AbstractQueueTestBase ext
// Check adding an exclusive consumer adds it to the queue
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.EXCLUSIVE, ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.EXCLUSIVE, ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
@@ -585,8 +586,8 @@ abstract class AbstractQueueTestBase ext
{
_queue.addConsumer(subB, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
}
catch (MessageSource.ExistingExclusiveConsumer e)
@@ -599,14 +600,14 @@ abstract class AbstractQueueTestBase ext
// existing consumer
_consumer.close();
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
try
{
_consumer = (QueueConsumer<?>) _queue.addConsumer(subB, null, messageA.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.EXCLUSIVE), 0);
+ EnumSet.of(ConsumerOption.EXCLUSIVE), 0);
}
catch (MessageSource.ExistingConsumerPreventsExclusive e)
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Sun Dec 4 10:17:55 2016
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -61,7 +61,7 @@ public class MockMessageInstance impleme
}
@Override
- public ConsumerImpl getAcquiringConsumer()
+ public MessageInstanceConsumer getAcquiringConsumer()
{
return null;
}
@@ -73,13 +73,13 @@ public class MockMessageInstance impleme
}
@Override
- public boolean isAcquiredBy(final ConsumerImpl consumer)
+ public boolean isAcquiredBy(final MessageInstanceConsumer consumer)
{
return false;
}
@Override
- public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ public boolean removeAcquisitionFromConsumer(final MessageInstanceConsumer consumer)
{
return false;
}
@@ -95,13 +95,13 @@ public class MockMessageInstance impleme
}
@Override
- public boolean acquire(final ConsumerImpl sub)
+ public boolean acquire(final MessageInstanceConsumer sub)
{
return false;
}
@Override
- public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+ public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
{
return false;
}
@@ -117,11 +117,6 @@ public class MockMessageInstance impleme
return false;
}
- public ConsumerImpl getDeliveredConsumer()
- {
- return null;
- }
-
public boolean getDeliveredToConsumer()
{
return false;
@@ -147,7 +142,7 @@ public class MockMessageInstance impleme
}
@Override
- public boolean isRejectedBy(final ConsumerImpl consumer)
+ public boolean isRejectedBy(final MessageInstanceConsumer consumer)
{
return false;
}
@@ -158,7 +153,7 @@ public class MockMessageInstance impleme
}
@Override
- public void release(final ConsumerImpl release)
+ public void release(final MessageInstanceConsumer release)
{
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java Sun Dec 4 10:17:55 2016
@@ -28,7 +28,7 @@ import java.util.EnumSet;
import junit.framework.AssertionFailedError;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
@@ -63,7 +63,7 @@ public class PriorityQueueTest extends A
queue.enqueue(createMessage(9L, (byte) 0), null, null);
// Register subscriber
- queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerImpl.Option.class), 0);
+ queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerOption.class), 0);
while(getConsumer().processPending());
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Sun Dec 4 10:17:55 2016
@@ -145,7 +145,9 @@ public abstract class QueueEntryImplTest
StealableConsumerAcquiredState
owningState = new StealableConsumerAcquiredState(consumer);
when(consumer.getOwningState()).thenReturn(owningState);
- when(consumer.getConsumerNumber()).thenReturn(_consumerId++);
+ final Long consumerNum = _consumerId++;
+ when(consumer.getConsumerNumber()).thenReturn(consumerNum);
+ when(consumer.getIdentifier()).thenReturn(consumerNum);
return consumer;
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Sun Dec 4 10:17:55 2016
@@ -28,10 +28,10 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.TestConsumerTarget;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
@@ -56,8 +56,8 @@ public class StandardQueueTest extends A
ServerMessage message = createMessage(25l);
QueueConsumer consumer =
(QueueConsumer) getQueue().addConsumer(getConsumerTarget(), null, message.getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
getQueue().enqueue(message, null, null);
consumer.close();
@@ -82,8 +82,8 @@ public class StandardQueueTest extends A
null,
createMessage(-1l).getClass(),
"test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
// put test messages into a queue
putGivenNumberOfMessages(queue, 4);
@@ -139,7 +139,7 @@ public class StandardQueueTest extends A
* @param entry
* @param batch
*/
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
long size = super.send(consumer, entry, batch);
latch.countDown();
@@ -152,8 +152,8 @@ public class StandardQueueTest extends A
null,
entries.get(0).getMessage().getClass(),
"test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
+ EnumSet.of(ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES), 0);
// wait up to 1 minute for message receipt
@@ -271,7 +271,7 @@ public class StandardQueueTest extends A
}
@Override
- public boolean acquire(ConsumerImpl sub)
+ public boolean acquire(MessageInstanceConsumer sub)
{
if(_message.getMessageNumber() % 2 == 0)
{
@@ -284,7 +284,7 @@ public class StandardQueueTest extends A
}
@Override
- public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+ public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
{
return true;
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java Sun Dec 4 10:17:55 2016
@@ -36,13 +36,14 @@ import java.util.EnumSet;
import java.util.List;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.queue.AbstractQueue;
+import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -72,12 +73,12 @@ public class TrustStoreMessageSourceTest
public void testAddConsumer() throws Exception
{
- final EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+ final EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
final ConsumerTarget target = mock(ConsumerTarget.class);
when(target.allocateCredit(any(ServerMessage.class))).thenReturn(true);
- ConsumerImpl consumer = _trustStoreMessageSource.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
- final AbstractQueue.MessageContainer messageContainer = consumer.pullMessage();
+ MessageInstanceConsumer consumer = _trustStoreMessageSource.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
+ final MessageContainer messageContainer = consumer.pullMessage();
assertNotNull("Could not pull message of TrustStore", messageContainer);
final ServerMessage message = messageContainer.getMessageInstance().getMessage();
assertCertificates(getCertificatesFromMessage(message));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org