You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/10/21 09:32:09 UTC
svn commit: r1765973 [2/7] - in /qpid/java/branches/transfer-queue:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/
bdbstore/src/test/java/org/apache/qpid/server/stor...
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Fri Oct 21 09:32:07 2016
@@ -21,15 +21,12 @@
package org.apache.qpid.server.message;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.store.MessageEnqueueRecord;
-import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
-public interface MessageInstance
+public interface MessageInstance extends BaseMessageInstance
{
@@ -50,39 +47,37 @@ public interface MessageInstance
boolean acquiredByConsumer();
- boolean isAcquiredBy(ConsumerImpl consumer);
+ boolean isAcquiredBy(MessageInstanceConsumer consumer);
- boolean removeAcquisitionFromConsumer(ConsumerImpl consumer);
+ boolean removeAcquisitionFromConsumer(MessageInstanceConsumer consumer);
void setRedelivered();
boolean isRedelivered();
- ConsumerImpl getDeliveredConsumer();
+ MessageInstanceConsumer getDeliveredConsumer();
void reject();
- boolean isRejectedBy(ConsumerImpl consumer);
-
- boolean getDeliveredToConsumer();
+ boolean isRejectedBy(MessageInstanceConsumer consumer);
boolean expired();
- boolean acquire(ConsumerImpl sub);
+ boolean acquire(MessageInstanceConsumer sub);
- boolean makeAcquisitionUnstealable(final ConsumerImpl consumer);
+ boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer);
boolean makeAcquisitionStealable();
int getMaximumDeliveryCount();
- int routeToAlternate(Action<? super MessageInstance> action, ServerTransaction txn);
+ int routeToAlternate(Action<? super BaseMessageInstance> action, ServerTransaction txn);
Filterable asFilterable();
- ConsumerImpl getAcquiringConsumer();
+ MessageInstanceConsumer getAcquiringConsumer();
- MessageEnqueueRecord getEnqueueRecord();
+ InstanceProperties getInstanceProperties();
enum State
{
@@ -171,7 +166,7 @@ public interface MessageInstance
}
}
- abstract class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
+ abstract class ConsumerAcquiredState<C extends AcquiringMessageInstanceConsumer<C,?>> extends EntryState
{
public abstract C getConsumer();
@@ -188,7 +183,7 @@ public interface MessageInstance
}
}
- final class StealableConsumerAcquiredState<C extends ConsumerImpl> extends ConsumerAcquiredState
+ final class StealableConsumerAcquiredState<C extends AcquiringMessageInstanceConsumer<C,?>> extends ConsumerAcquiredState
{
private final C _consumer;
private final UnstealableConsumerAcquiredState<C> _unstealableState;
@@ -211,7 +206,7 @@ public interface MessageInstance
}
}
- final class UnstealableConsumerAcquiredState<C extends ConsumerImpl> extends ConsumerAcquiredState
+ final class UnstealableConsumerAcquiredState<C extends AcquiringMessageInstanceConsumer<C,?>> extends ConsumerAcquiredState
{
private final StealableConsumerAcquiredState<C> _stealableState;
@@ -240,25 +235,15 @@ public interface MessageInstance
boolean isAvailable();
- boolean acquire();
-
boolean isAcquired();
void release();
- void release(ConsumerImpl release);
+ void release(MessageInstanceConsumer release);
boolean resend();
- void delete();
-
boolean isDeleted();
boolean isHeld();
-
- ServerMessage getMessage();
-
- InstanceProperties getInstanceProperties();
-
- TransactionLogResource getOwningResource();
}
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+public interface MessageInstanceConsumer
+{
+
+ boolean isClosed();
+
+ boolean acquires();
+
+ String getName();
+
+ void close();
+
+ void flush();
+
+ void externalStateChange();
+
+ Object getIdentifier();
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Fri Oct 21 09:32:07 2016
@@ -23,23 +23,22 @@ package org.apache.qpid.server.message;
import java.util.Collection;
import java.util.EnumSet;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.store.TransactionLogResource;
-public interface MessageSource extends TransactionLogResource, MessageNode
+public interface MessageSource<X extends MessageInstanceConsumer> extends TransactionLogResource, MessageNode
{
- ConsumerImpl addConsumer(ConsumerTarget target, FilterManager filters,
- Class<? extends ServerMessage> messageClass,
- String consumerName,
- EnumSet<ConsumerImpl.Option> options,
- Integer priority)
- throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
- ConsumerAccessRefused;
+ X addConsumer(ConsumerTarget target,
+ FilterManager filters,
+ Class<? extends ServerMessage> messageClass,
+ String consumerName,
+ EnumSet<ConsumerOption> options,
+ Integer priority)
+ throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, ConsumerAccessRefused;
- Collection<? extends ConsumerImpl> getConsumers();
+ Collection<X> getConsumers();
boolean verifySessionAccess(AMQSessionModel<?> session);
Copied: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfigurationChangeListener.java (from r1765972, qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/NoopConfigurationChangeListener.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfigurationChangeListener.java?p2=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfigurationChangeListener.java&p1=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/NoopConfigurationChangeListener.java&r1=1765972&r2=1765973&rev=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/NoopConfigurationChangeListener.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfigurationChangeListener.java Fri Oct 21 09:32:07 2016
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.model;
-public class NoopConfigurationChangeListener implements ConfigurationChangeListener
+public abstract class AbstractConfigurationChangeListener implements ConfigurationChangeListener
{
@Override
public void stateChanged(ConfiguredObject<?> object, State oldState, State newState)
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Fri Oct 21 09:32:07 2016
@@ -2596,11 +2596,11 @@ public abstract class AbstractConfigured
public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
{
final Map<String,Object> updateAttributes = new HashMap<>(attributes);
- Object desiredState = updateAttributes.remove(ConfiguredObject.DESIRED_STATE);
- runTask(new Task<Void, RuntimeException>()
+ final Object desiredState = updateAttributes.remove(ConfiguredObject.DESIRED_STATE);
+ return doOnConfigThread(new Task<ListenableFuture<Void>, RuntimeException>()
{
@Override
- public Void execute()
+ public ListenableFuture<Void> execute()
{
authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet());
if (!isSystemProcess())
@@ -2609,7 +2609,29 @@ public abstract class AbstractConfigured
}
changeAttributes(updateAttributes);
- return null;
+ if(desiredState != null)
+ {
+ State state;
+ if(desiredState instanceof State)
+ {
+ state = (State)desiredState;
+ }
+ else if(desiredState instanceof String)
+ {
+ state = State.valueOf((String)desiredState);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot convert an object of type " + desiredState.getClass().getName() + " to a State");
+ }
+ return setDesiredState(state);
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
+
+
}
@Override
@@ -2630,27 +2652,6 @@ public abstract class AbstractConfigured
return "attributes number=" + attributes.size();
}
});
- if(desiredState != null)
- {
- State state;
- if(desiredState instanceof State)
- {
- state = (State)desiredState;
- }
- else if(desiredState instanceof String)
- {
- state = State.valueOf((String)desiredState);
- }
- else
- {
- throw new IllegalArgumentException("Cannot convert an object of type " + desiredState.getClass().getName() + " to a State");
- }
- return setDesiredState(state);
- }
- else
- {
- return Futures.immediateFuture(null);
- }
}
public void forceUpdateAllSecureAttributes()
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java Fri Oct 21 09:32:07 2016
@@ -469,11 +469,13 @@ public abstract class AbstractSystemConf
return preferenceStoreFactory.createInstance(this, attributes);
}
+ @Override
protected final Principal getSystemPrincipal()
{
return _systemPrincipal;
}
+ @Override
public Runnable getOnContainerResolveTask()
{
return _onContainerResolveTask;
@@ -485,11 +487,13 @@ public abstract class AbstractSystemConf
_onContainerResolveTask = onContainerResolveTask;
}
+ @Override
public Runnable getOnContainerCloseTask()
{
return _onContainerCloseTask;
}
+ @Override
public void setOnContainerCloseTask(final Runnable onContainerCloseTask)
{
_onContainerCloseTask = onContainerCloseTask;
@@ -497,6 +501,7 @@ public abstract class AbstractSystemConf
private class ShutdownService implements Runnable
{
+ @Override
public void run()
{
Subject.doAs(getSystemTaskSubject("Shutdown"),
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AnonymousCredential.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AnonymousCredential.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AnonymousCredential.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AnonymousCredential.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+@ManagedObject(category = false, type = AnonymousCredential.ANONYMOUS)
+public interface AnonymousCredential<X extends AnonymousCredential<X>> extends Credential<X>
+{
+ String ANONYMOUS = "Anonymous";
+
+
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AnonymousCredential.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java Fri Oct 21 09:32:07 2016
@@ -120,7 +120,7 @@ public class BrokerImpl extends Abstract
@Override
public Result authorise(final SecurityToken token,
final Operation operation,
- final ConfiguredObject<?> configuredObject)
+ final PermissionedObject configuredObject)
{
return isSystemProcess() ? Result.ALLOWED : Result.DEFER;
}
@@ -128,7 +128,7 @@ public class BrokerImpl extends Abstract
@Override
public Result authorise(final SecurityToken token,
final Operation operation,
- final ConfiguredObject<?> configuredObject,
+ final PermissionedObject configuredObject,
final Map<String, Object> arguments)
{
return isSystemProcess() ? Result.ALLOWED : Result.DEFER;
@@ -1135,17 +1135,11 @@ public class BrokerImpl extends Abstract
}
- private final class AccessControlProviderListener implements ConfigurationChangeListener
+ private final class AccessControlProviderListener extends AbstractConfigurationChangeListener
{
private final Set<ConfiguredObject<?>> _bulkChanges = new HashSet<>();
@Override
- public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
- {
-
- }
-
- @Override
public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
{
if(object.getCategoryClass() == Broker.class && child.getCategoryClass() == AccessControlProvider.class)
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java Fri Oct 21 09:32:07 2016
@@ -96,6 +96,11 @@ public final class BrokerModel extends M
addRelationship(VirtualHost.class, VirtualHostAccessControlProvider.class);
addRelationship(VirtualHost.class, Exchange.class);
addRelationship(VirtualHost.class, Queue.class);
+ addRelationship(VirtualHost.class, RemoteHost.class);
+
+
+ addRelationship(RemoteHost.class, RemoteHostAddress.class);
+ addRelationship(RemoteHost.class, Credential.class);
addRelationship(VirtualHostLogger.class, VirtualHostLogInclusionRule.class);
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java Fri Oct 21 09:32:07 2016
@@ -40,7 +40,8 @@ import org.apache.qpid.server.store.Conf
/**
* An object that can be "managed" (eg via the web interface) and usually read from configuration.
*/
-public interface ConfiguredObject<X extends ConfiguredObject<X>> extends ContextProvider, TaskExecutorProvider
+public interface ConfiguredObject<X extends ConfiguredObject<X>> extends ContextProvider, TaskExecutorProvider,
+ PermissionedObject
{
String OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT = "Value is too long to display";
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java Fri Oct 21 09:32:07 2016
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.server.model;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.qpid.server.protocol.AMQSessionModel;
@ManagedObject
-public interface Consumer<X extends Consumer<X>> extends ConfiguredObject<X>, ConsumerImpl
+public interface Consumer<X extends Consumer<X>> extends ConfiguredObject<X>
{
String DISTRIBUTION_MODE = "distributionMode";
String EXCLUSIVE = "exclusive";
@@ -36,7 +38,7 @@ public interface Consumer<X extends Cons
String SUSPEND_NOTIFICATION_PERIOD = "consumer.suspendNotificationPeriod";
@ManagedContextDefault( name = SUSPEND_NOTIFICATION_PERIOD)
- long SUSPEND_NOTIFICATION_PERIOD_DEFAULT = 10000;
+ long SUSPEND_NOTIFICATION_PERIOD_DEFAULT = 10000;AtomicLong CONSUMER_NUMBER_GENERATOR = new AtomicLong(0);
@ManagedAttribute
String getDistributionMode();
@@ -70,4 +72,21 @@ public interface Consumer<X extends Cons
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Prefetch")
long getUnacknowledgedMessages();
+
+ AMQSessionModel getSessionModel();
+
+ long getConsumerNumber();
+
+ boolean isSuspended();
+
+ boolean seesRequeues();
+
+ boolean trySendLock();
+
+
+ void getSendLock();
+
+ void releaseSendLock();
+
+ boolean isActive();
}
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Credential.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Credential.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Credential.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Credential.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.List;
+
+import javax.security.sasl.SaslClient;
+
+@ManagedObject(creatable = false)
+public interface Credential<X extends Credential<X>> extends ConfiguredObject<X>
+{
+
+ SaslClient getSaslClient(List<String> mechanisms);
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Credential.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ExternalCredential.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ExternalCredential.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ExternalCredential.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ExternalCredential.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+@ManagedObject(category = false, type = ExternalCredential.EXTERNAL)
+public interface ExternalCredential<X extends ExternalCredential<X>> extends Credential<X>
+{
+ String EXTERNAL = "External";
+
+ @ManagedAttribute
+ String getUsername();
+
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ExternalCredential.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/PermissionedObject.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/PermissionedObject.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/PermissionedObject.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/PermissionedObject.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+public interface PermissionedObject
+{
+ Class<? extends ConfiguredObject> getCategoryClass();
+
+ String getName();
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/PermissionedObject.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Fri Oct 21 09:32:07 2016
@@ -34,11 +34,11 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.CapacityChecker;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.NotificationCheck;
import org.apache.qpid.server.queue.QueueConsumer;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.queue.RecoverableBaseQueue;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.util.Deletable;
@@ -46,8 +46,8 @@ import org.apache.qpid.server.util.Delet
@ManagedObject( defaultType = "standard", description = Queue.CLASS_DESCRIPTION )
public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
Comparable<X>, ExchangeReferrer,
- BaseQueue,
- MessageSource,
+ RecoverableBaseQueue,
+ MessageSource<QueueConsumer<?>>,
CapacityChecker,
MessageDestination,
Deletable<X>
@@ -220,7 +220,7 @@ public interface Queue<X extends Queue<X
Collection<? extends Binding<?>> getBindings();
- Collection<? extends Consumer<?>> getConsumers();
+ Collection<QueueConsumer<?>> getConsumers();
//operations
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHost.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHost.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHost.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHost.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Collection;
+
+@ManagedObject(defaultType = RemoteHost.REMOTE_HOST_TYPE)
+public interface RemoteHost<X extends RemoteHost<X>> extends ConfiguredObject<X>
+{
+
+ String REMOTE_HOST_TYPE = "Standard";
+
+ @ManagedAttribute(defaultValue = "10")
+ int getRetryPeriod();
+
+ @ManagedAttribute(defaultValue = "true")
+ boolean isRedirectFollowed();
+
+ @ManagedAttribute(defaultValue = "[]")
+ Collection<String> getRoutableAddresses();
+
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHost.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHostAddress.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHostAddress.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHostAddress.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHostAddress.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Collection;
+import java.util.List;
+
+@ManagedObject( defaultType = RemoteHostAddress.REMOTE_HOST_ADDRESS_TYPE)
+public interface RemoteHostAddress<X extends RemoteHostAddress<X>> extends ConfiguredObject<X>
+{
+
+ String REMOTE_HOST_ADDRESS_TYPE = "Standard";
+
+ @ManagedAttribute(mandatory = true)
+ String getAddress();
+
+ @ManagedAttribute(mandatory = true)
+ int getPort();
+
+ @ManagedAttribute
+ String getHostName();
+
+ @ManagedAttribute
+ Protocol getProtocol();
+
+ @ManagedAttribute( defaultValue = "TCP" )
+ Transport getTransport();
+
+ @ManagedAttribute
+ KeyStore getKeyStore();
+
+ @ManagedAttribute
+ Collection<TrustStore> getTrustStores();
+
+ @ManagedAttribute(defaultValue = "0")
+ int getDesiredHeartbeatInterval();
+
+
+ @DerivedAttribute
+ List<String> getTlsProtocolWhiteList();
+
+ @DerivedAttribute
+ List<String> getTlsProtocolBlackList();
+
+ @DerivedAttribute
+ List<String> getTlsCipherSuiteWhiteList();
+
+ @DerivedAttribute
+ List<String> getTlsCipherSuiteBlackList();
+
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHostAddress.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/UsernamePasswordCredential.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/UsernamePasswordCredential.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/UsernamePasswordCredential.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/UsernamePasswordCredential.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+@ManagedObject(category = false, type = UsernamePasswordCredential.USERNAME_PASSWORD)
+public interface UsernamePasswordCredential<X extends UsernamePasswordCredential<X>> extends Credential<X>
+{
+ String USERNAME_PASSWORD = "UsernamePassword";
+
+ @ManagedAttribute
+ String getUsername();
+
+ @ManagedAttribute(secure = true)
+ String getPassword();
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/UsernamePasswordCredential.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Fri Oct 21 09:32:07 2016
@@ -37,7 +37,9 @@ import org.apache.qpid.server.stats.Stat
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.preferences.UserPreferencesCreator;
+import org.apache.qpid.server.transfer.TransferQueue;
import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.HouseKeepingTask;
import org.apache.qpid.server.virtualhost.NodeAutoCreationPolicy;
@@ -261,8 +263,12 @@ public interface VirtualHost<X extends V
String getLocalAddress(String routingAddress);
+ TransferQueue getTransferQueue();
+
void setFirstOpening(boolean firstOpening);
+ boolean makeConnection(RemoteHostAddress<?> address, final Action<Boolean> onConnectionLoss);
+
interface Transaction
{
void dequeue(QueueEntry entry);
Copied: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/plugin/OutboundProtocolEngineCreator.java (from r1755476, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/plugin/OutboundProtocolEngineCreator.java?p2=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/plugin/OutboundProtocolEngineCreator.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java&r1=1755476&r2=1765973&rev=1765973&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/plugin/OutboundProtocolEngineCreator.java Fri Oct 21 09:32:07 2016
@@ -19,25 +19,16 @@ package org.apache.qpid.server.plugin;/*
*
*/
-import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.federation.OutboundProtocolEngine;
import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.transport.ServerNetworkConnection;
-import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.server.model.RemoteHostAddress;
+import org.apache.qpid.server.model.VirtualHost;
-public interface ProtocolEngineCreator extends Pluggable
+public interface OutboundProtocolEngineCreator extends Pluggable
{
Protocol getVersion();
- byte[] getHeaderIdentifier();
- ProtocolEngine newProtocolEngine(Broker<?> broker,
- ServerNetworkConnection network,
- AmqpPort<?> port,
- Transport transport,
- long id,
- final AggregateTicker aggregateTicker);
+ OutboundProtocolEngine newProtocolEngine(RemoteHostAddress<?> address,
+ VirtualHost<?> virtualHost);
- byte[] getSuggestedAlternativeHeader();
}
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Oct 21 09:32:07 2016
@@ -33,7 +33,6 @@ import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -73,7 +72,6 @@ import org.apache.qpid.filter.selector.T
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.Task;
import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.JMSSelectorFilter;
@@ -83,6 +81,8 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.ConsumerOption;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageInfo;
@@ -724,7 +724,7 @@ public abstract class AbstractQueue<X ex
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- final EnumSet<ConsumerImpl.Option> optionSet,
+ final EnumSet<ConsumerOption> optionSet,
final Integer priority)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused
@@ -777,7 +777,7 @@ public abstract class AbstractQueue<X ex
FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- EnumSet<ConsumerImpl.Option> optionSet,
+ EnumSet<ConsumerOption> optionSet,
final Integer priority)
throws ExistingExclusiveConsumer, ConsumerAccessRefused,
ExistingConsumerPreventsExclusive
@@ -787,89 +787,30 @@ public abstract class AbstractQueue<X ex
throw new ExistingExclusiveConsumer();
}
- Object exclusiveOwner = _exclusiveOwner;
- switch(_exclusive)
+ if(_noLocal && !optionSet.contains(ConsumerOption.NO_LOCAL))
{
- case CONNECTION:
- if(exclusiveOwner == null)
- {
- exclusiveOwner = target.getSessionModel().getAMQPConnection();
- addExclusivityConstraint(target.getSessionModel().getAMQPConnection());
- }
- else
- {
- if(exclusiveOwner != target.getSessionModel().getAMQPConnection())
- {
- throw new ConsumerAccessRefused();
- }
- }
- break;
- case SESSION:
- if(exclusiveOwner == null)
- {
- exclusiveOwner = target.getSessionModel();
- addExclusivityConstraint(target.getSessionModel());
- }
- else
- {
- if(exclusiveOwner != target.getSessionModel())
- {
- throw new ConsumerAccessRefused();
- }
- }
- break;
- case LINK:
- if(getConsumerCount() != 0)
- {
- throw new ConsumerAccessRefused();
- }
- break;
- case PRINCIPAL:
- Principal currentAuthorizedPrincipal = target.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
- if(exclusiveOwner == null)
- {
- exclusiveOwner = currentAuthorizedPrincipal;
- }
- else
- {
- if(!Objects.equals(((Principal) exclusiveOwner).getName(), currentAuthorizedPrincipal.getName()))
- {
- throw new ConsumerAccessRefused();
- }
- }
- break;
- case CONTAINER:
- if(exclusiveOwner == null)
- {
- exclusiveOwner = target.getSessionModel().getAMQPConnection().getRemoteContainerName();
- }
- else
- {
- if(!exclusiveOwner.equals(target.getSessionModel().getAMQPConnection().getRemoteContainerName()))
- {
- throw new ConsumerAccessRefused();
- }
- }
- break;
- case NONE:
- break;
- default:
- throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusive);
+ optionSet = EnumSet.copyOf(optionSet);
+ optionSet.add(ConsumerOption.NO_LOCAL);
}
- boolean exclusive = optionSet.contains(ConsumerImpl.Option.EXCLUSIVE);
- boolean isTransient = optionSet.contains(ConsumerImpl.Option.TRANSIENT);
-
- if(_noLocal && !optionSet.contains(ConsumerImpl.Option.NO_LOCAL))
+ if(_ensureNondestructiveConsumers)
{
optionSet = EnumSet.copyOf(optionSet);
- optionSet.add(ConsumerImpl.Option.NO_LOCAL);
+ optionSet.removeAll(EnumSet.of(ConsumerOption.SEES_REQUEUES, ConsumerOption.ACQUIRES));
}
+
+
+ boolean exclusive = optionSet.contains(ConsumerOption.EXCLUSIVE);
+ boolean isTransient = optionSet.contains(ConsumerOption.TRANSIENT);
+
+
+
if(exclusive && getConsumerCount() != 0)
{
throw new ExistingConsumerPreventsExclusive();
}
+
if(!_defaultFiltersMap.isEmpty())
{
if(filters == null)
@@ -902,11 +843,6 @@ public abstract class AbstractQueue<X ex
}
}
- if(_ensureNondestructiveConsumers)
- {
- optionSet = EnumSet.copyOf(optionSet);
- optionSet.removeAll(EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES));
- }
QueueConsumerImpl consumer = new QueueConsumerImpl(this,
target,
@@ -916,7 +852,11 @@ public abstract class AbstractQueue<X ex
optionSet,
priority);
- _exclusiveOwner = exclusiveOwner;
+ checkExclusivity(target);
+
+
+ consumer.open();
+
target.consumerAdded(consumer);
@@ -964,6 +904,80 @@ public abstract class AbstractQueue<X ex
return consumer;
}
+ private void checkExclusivity(final ConsumerTarget target) throws ConsumerAccessRefused
+ {
+ Object exclusiveOwner = _exclusiveOwner;
+ switch(_exclusive)
+ {
+ case CONNECTION:
+ if(exclusiveOwner == null)
+ {
+ exclusiveOwner = target.getSessionModel().getAMQPConnection();
+ addExclusivityConstraint(target.getSessionModel().getAMQPConnection());
+ }
+ else
+ {
+ if(exclusiveOwner != target.getSessionModel().getAMQPConnection())
+ {
+ throw new ConsumerAccessRefused();
+ }
+ }
+ break;
+ case SESSION:
+ if(exclusiveOwner == null)
+ {
+ exclusiveOwner = target.getSessionModel();
+ addExclusivityConstraint(target.getSessionModel());
+ }
+ else
+ {
+ if(exclusiveOwner != target.getSessionModel())
+ {
+ throw new ConsumerAccessRefused();
+ }
+ }
+ break;
+ case LINK:
+ if(getConsumerCount() != 0)
+ {
+ throw new ConsumerAccessRefused();
+ }
+ break;
+ case PRINCIPAL:
+ Principal currentAuthorizedPrincipal = target.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
+ if(exclusiveOwner == null)
+ {
+ exclusiveOwner = currentAuthorizedPrincipal;
+ }
+ else
+ {
+ if(!Objects.equals(((Principal) exclusiveOwner).getName(), currentAuthorizedPrincipal.getName()))
+ {
+ throw new ConsumerAccessRefused();
+ }
+ }
+ break;
+ case CONTAINER:
+ if(exclusiveOwner == null)
+ {
+ exclusiveOwner = target.getSessionModel().getAMQPConnection().getRemoteContainerName();
+ }
+ else
+ {
+ if(!exclusiveOwner.equals(target.getSessionModel().getAMQPConnection().getRemoteContainerName()))
+ {
+ throw new ConsumerAccessRefused();
+ }
+ }
+ break;
+ case NONE:
+ break;
+ default:
+ throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusive);
+ }
+ _exclusiveOwner = exclusiveOwner;
+ }
+
@Override
protected ListenableFuture<Void> beforeClose()
{
@@ -1112,7 +1126,7 @@ public abstract class AbstractQueue<X ex
// ------ Enqueue / Dequeue
- public final void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
+ public final void enqueue(ServerMessage message, Action<? super BaseMessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
incrementQueueCount();
incrementQueueSize(message);
@@ -2057,6 +2071,57 @@ public abstract class AbstractQueue<X ex
txn.commit();
}
+ int routeToAlternate(final QueueEntry queueEntry, final Action<? super BaseMessageInstance> action, ServerTransaction txn)
+ {
+ if (!queueEntry.isAcquired())
+ {
+ throw new IllegalStateException("Illegal queue entry state. " + this + " is not acquired.");
+ }
+
+ Exchange<?> alternateExchange = getAlternateExchange();
+ boolean autocommit = txn == null;
+ int enqueues;
+
+ if(autocommit)
+ {
+ txn = new LocalTransaction(getVirtualHost().getMessageStore());
+ }
+
+ if (alternateExchange != null)
+ {
+ enqueues = alternateExchange.send(queueEntry.getMessage(),
+ queueEntry.getMessage().getInitialRoutingAddress(),
+ queueEntry.getInstanceProperties(),
+ txn,
+ action);
+ }
+ else
+ {
+ enqueues = 0;
+ }
+
+ txn.dequeue(queueEntry.getEnqueueRecord(), new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ queueEntry.delete();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+
+ if(autocommit)
+ {
+ txn.commit();
+ }
+
+ return enqueues;
+ }
+
+
private void performQueueDeleteTasks()
{
for (Action<? super X> task : _deleteTaskList)
@@ -3038,7 +3103,7 @@ public abstract class AbstractQueue<X ex
final String routingAddress,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<? super MessageInstance> postEnqueueAction)
+ final Action<? super BaseMessageInstance> postEnqueueAction)
{
if (_virtualHost.getState() != State.ACTIVE)
{
@@ -3169,7 +3234,7 @@ public abstract class AbstractQueue<X ex
case CONTAINER:
case CONNECTION:
AMQSessionModel session = null;
- for(ConsumerImpl c : getConsumers())
+ for(QueueConsumer c : getConsumers())
{
if(session == null)
{
@@ -3195,7 +3260,7 @@ public abstract class AbstractQueue<X ex
case CONTAINER:
case PRINCIPAL:
AMQPConnection con = null;
- for(ConsumerImpl c : getConsumers())
+ for(QueueConsumer c : getConsumers())
{
if(con == null)
{
@@ -3223,7 +3288,7 @@ public abstract class AbstractQueue<X ex
case NONE:
case PRINCIPAL:
String containerID = null;
- for(ConsumerImpl c : getConsumers())
+ for(QueueConsumer c : getConsumers())
{
if(containerID == null)
{
@@ -3254,7 +3319,7 @@ public abstract class AbstractQueue<X ex
case NONE:
case CONTAINER:
Principal principal = null;
- for(ConsumerImpl c : getConsumers())
+ for(QueueConsumer c : getConsumers())
{
if(principal == null)
{
@@ -3494,7 +3559,7 @@ public abstract class AbstractQueue<X ex
}
}
- private class DeletedChildListener implements ConfigurationChangeListener
+ private class DeletedChildListener extends AbstractConfigurationChangeListener
{
@Override
public void stateChanged(final ConfiguredObject object, final State oldState, final State newState)
@@ -3504,39 +3569,6 @@ public abstract class AbstractQueue<X ex
AbstractQueue.this.childRemoved(object);
}
}
-
- @Override
- public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
- {
-
- }
-
- @Override
- public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
- {
-
- }
-
- @Override
- public void attributeSet(final ConfiguredObject object,
- final String attributeName,
- final Object oldAttributeValue,
- final Object newAttributeValue)
- {
-
- }
-
- @Override
- public void bulkChangeStart(final ConfiguredObject<?> object)
- {
-
- }
-
- @Override
- public void bulkChangeEnd(final ConfiguredObject<?> object)
- {
-
- }
}
private static class EnqueueRequest
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Fri Oct 21 09:32:07 2016
@@ -21,7 +21,7 @@
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.BaseMessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -29,7 +29,7 @@ import org.apache.qpid.server.util.Actio
public interface BaseQueue extends TransactionLogResource
{
- void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord record);
+ void enqueue(ServerMessage message, Action<? super BaseMessageInstance> action, MessageEnqueueRecord record);
boolean isDurable();
boolean isDeleted();
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Fri Oct 21 09:32:07 2016
@@ -20,21 +20,20 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
-import org.apache.qpid.server.message.MessageInstance.EntryState;
-import org.apache.qpid.server.util.StateChangeListener;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
import org.apache.qpid.server.message.ServerMessage;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import org.apache.qpid.server.util.StateChangeListener;
public class DefinedGroupMessageGroupManager implements MessageGroupManager
{
@@ -183,7 +182,7 @@ public class DefinedGroupMessageGroupMan
}
}
- ConsumerImpl assignedSub = group.getConsumer();
+ QueueConsumer<?> assignedSub = group.getConsumer();
if(assignedSub == sub)
{
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Fri Oct 21 09:32:07 2016
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.Atomi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -220,7 +220,7 @@ public class LastValueQueueList extends
}
@Override
- public void release(ConsumerImpl consumer)
+ public void release(MessageInstanceConsumer consumer)
{
super.release(consumer);
Copied: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedBaseQueueEntryList.java (from r1765972, qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedBaseQueueEntryList.java?p2=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedBaseQueueEntryList.java&p1=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java&r1=1765972&r2=1765973&rev=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedBaseQueueEntryList.java Fri Oct 21 09:32:07 2016
@@ -25,23 +25,22 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
-public abstract class OrderedQueueEntryList implements QueueEntryList
+public abstract class OrderedBaseQueueEntryList<T extends RecoverableBaseQueue> implements QueueEntryList
{
private final OrderedQueueEntry _head;
private volatile OrderedQueueEntry _tail;
- static final AtomicReferenceFieldUpdater<OrderedQueueEntryList, OrderedQueueEntry>
+ static final AtomicReferenceFieldUpdater<OrderedBaseQueueEntryList, OrderedQueueEntry>
_tailUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (OrderedQueueEntryList.class, OrderedQueueEntry.class, "_tail");
+ (OrderedBaseQueueEntryList.class, OrderedQueueEntry.class, "_tail");
- private final Queue<?> _queue;
+ private final T _queue;
static final AtomicReferenceFieldUpdater<OrderedQueueEntry, OrderedQueueEntry>
_nextUpdater = OrderedQueueEntry._nextUpdater;
@@ -51,7 +50,7 @@ public abstract class OrderedQueueEntryL
private final AtomicReference<QueueEntry> _unscavengedHWM = new AtomicReference<QueueEntry>();
- public OrderedQueueEntryList(Queue<?> queue, HeadCreator headCreator)
+ public OrderedBaseQueueEntryList(T queue, HeadCreator headCreator)
{
_queue = queue;
_head = headCreator.createHead(this);
@@ -73,7 +72,7 @@ public abstract class OrderedQueueEntryL
}
- public Queue<?> getQueue()
+ public T getQueue()
{
return _queue;
}
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java Fri Oct 21 09:32:07 2016
@@ -34,12 +34,12 @@ public abstract class OrderedQueueEntry
private volatile OrderedQueueEntry _next;
- public OrderedQueueEntry(OrderedQueueEntryList queueEntryList)
+ public OrderedQueueEntry(OrderedBaseQueueEntryList queueEntryList)
{
super(queueEntryList);
}
- public OrderedQueueEntry(OrderedQueueEntryList queueEntryList,
+ public OrderedQueueEntry(OrderedBaseQueueEntryList queueEntryList,
ServerMessage message,
final MessageEnqueueRecord messageEnqueueRecord)
{
@@ -63,7 +63,7 @@ public abstract class OrderedQueueEntry
final OrderedQueueEntry newNext = next.getNextNode();
if(newNext != null)
{
- OrderedQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
+ OrderedBaseQueueEntryList._nextUpdater.compareAndSet(this, next, newNext);
next = getNextNode();
}
else
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+
+public abstract class OrderedQueueEntryList extends OrderedBaseQueueEntryList<AbstractQueue<?>>
+{
+ public OrderedQueueEntryList(final AbstractQueue<?> queue,
+ final HeadCreator headCreator)
+ {
+ super(queue, headCreator);
+ }
+
+ @Override
+ public void onAcquiredByConsumer(final QueueEntry queueEntry, final MessageInstanceConsumer consumer)
+ {
+ getQueue().incrementUnackedMsgCount(queueEntry);
+ }
+
+ @Override
+ public void onNoLongerAcquiredByConsumer(final QueueEntry queueEntry)
+ {
+ getQueue().decrementUnackedMsgCount(queueEntry);
+ }
+
+ @Override
+ public void requeue(final QueueEntry queueEntry)
+ {
+ getQueue().requeue(queueEntry);
+ }
+
+ @Override
+ public boolean isHeld(final QueueEntry queueEntry, final long evaluationTime)
+ {
+ return getQueue().isHeld(queueEntry, evaluationTime);
+ }
+
+ @Override
+ public void dequeue(final QueueEntry queueEntry)
+ {
+ getQueue().dequeue(queueEntry);
+ }
+
+ @Override
+ public int routeToAlternate(final QueueEntry queueEntry, final Action<? super BaseMessageInstance> action, ServerTransaction txn)
+ {
+ return getQueue().routeToAlternate(queueEntry, action, txn);
+ }
+
+ @Override
+ public int getMaximumDeliveryAttempts()
+ {
+ return getQueue().getMaximumDeliveryAttempts();
+ }
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Fri Oct 21 09:32:07 2016
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.message.AcquiringMessageInstanceConsumer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Queue;
-public interface QueueConsumer<X extends QueueConsumer<X>> extends Consumer<X>
+public interface QueueConsumer<X extends QueueConsumer<X>> extends Consumer<X>, AcquiringMessageInstanceConsumer<X, ConsumerTarget>
{
void flushBatched();
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Fri Oct 21 09:32:07 2016
@@ -46,9 +46,9 @@ import org.apache.qpid.server.logging.Ev
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.ConsumerOption;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Consumer;
@@ -123,7 +123,7 @@ class QueueConsumerImpl
ConsumerTarget target, final String consumerName,
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
- EnumSet<Option> optionSet,
+ EnumSet<ConsumerOption> optionSet,
final Integer priority)
{
super(parentsMap(queue, target.getSessionModel().getModelObject()),
@@ -132,18 +132,16 @@ class QueueConsumerImpl
_sessionReference = target.getSessionModel().getConnectionReference();
_consumerNumber = CONSUMER_NUMBER_GENERATOR.getAndIncrement();
_filters = filters;
- _acquires = optionSet.contains(Option.ACQUIRES);
- _seesRequeues = optionSet.contains(Option.SEES_REQUEUES);
- _isTransient = optionSet.contains(Option.TRANSIENT);
+ _acquires = optionSet.contains(ConsumerOption.ACQUIRES);
+ _seesRequeues = optionSet.contains(ConsumerOption.SEES_REQUEUES);
+ _isTransient = optionSet.contains(ConsumerOption.TRANSIENT);
_target = target;
_queue = queue;
// Access control
authorise(Operation.CREATE);
- open();
- setupLogging();
_listener = new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
{
@@ -165,20 +163,29 @@ class QueueConsumerImpl
getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(period));
}
};
+
+
+ }
+
+ @Override
+ protected void onOpen()
+ {
+ super.onOpen();
+ setupLogging();
}
private static Map<String, Object> createAttributeMap(String name,
FilterManager filters,
- EnumSet<Option> optionSet,
+ EnumSet<ConsumerOption> optionSet,
Integer priority)
{
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(ID, UUID.randomUUID());
attributes.put(NAME, name);
- attributes.put(EXCLUSIVE, optionSet.contains(Option.EXCLUSIVE));
- attributes.put(NO_LOCAL, optionSet.contains(Option.NO_LOCAL));
- attributes.put(DISTRIBUTION_MODE, optionSet.contains(Option.ACQUIRES) ? "MOVE" : "COPY");
- attributes.put(DURABLE,optionSet.contains(Option.DURABLE));
+ attributes.put(EXCLUSIVE, optionSet.contains(ConsumerOption.EXCLUSIVE));
+ attributes.put(NO_LOCAL, optionSet.contains(ConsumerOption.NO_LOCAL));
+ attributes.put(DISTRIBUTION_MODE, optionSet.contains(ConsumerOption.ACQUIRES) ? "MOVE" : "COPY");
+ attributes.put(DURABLE,optionSet.contains(ConsumerOption.DURABLE));
attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
if(priority != null)
{
@@ -279,9 +286,9 @@ class QueueConsumerImpl
}
@Override
- public MessageSource getMessageSource()
+ public Object getIdentifier()
{
- return _queue;
+ return getConsumerNumber();
}
@Override
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Fri Oct 21 09:32:07 2016
@@ -20,20 +20,20 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.AcquiringMessageInstanceConsumer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.model.Queue;
public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
{
- Queue<?> getQueue();
+ RecoverableBaseQueue getQueue();
long getSize();
boolean acquireOrSteal(final Runnable delayedAcquisitionTask);
- QueueConsumer getDeliveredConsumer();
+ AcquiringMessageInstanceConsumer<?,?> getDeliveredConsumer();
boolean isQueueDeleted();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org