You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/02/12 14:27:57 UTC
svn commit: r1567616 [4/12] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/
qpid/cpp/bindings/qmf2/ruby/ qpid/cpp/bindings/qpid/examples/perl/
qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/lib/qpid/messaging/
qpid/cpp/bindings/qpid/ruby/ qp...
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java Wed Feb 12 13:27:51 2014
@@ -741,7 +741,7 @@ public class UpgradeFrom4To5 extends Abs
buf.position(1);
buf = buf.slice();
- metaData.writeToBuffer(0, buf);
+ metaData.writeToBuffer(buf);
output.writeInt(bodySize);
output.writeFast(underlying);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,15 +16,19 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
<parent>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-project</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <artifactId>qpid-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
- <modelVersion>4.0.0</modelVersion>
<artifactId>qpid-bdbstore-systests</artifactId>
+ <version>0.28-SNAPSHOT</version>
+ <name>Qpid BDB Store System Tests</name>
+ <description>BDB message store system tests</description>
<properties>
<broker.home.dir>target${file.separator}qpid-broker${file.separator}${project.version}</broker.home.dir>
@@ -37,7 +41,6 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>${junit-version}</version>
<scope>compile</scope>
</dependency>
@@ -51,50 +54,40 @@
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-systests</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
- <version>${log4j-version}</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>${slf4j-version}</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
- <version>1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-bdbstore</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-bdbstore-jmx</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.sleepycat</groupId>
<artifactId>je</artifactId>
- <version>5.0.84</version>
- <scope>compile</scope>
</dependency>
</dependencies>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Wed Feb 12 13:27:51 2014
@@ -425,10 +425,22 @@ public class BDBMessageStoreTest extends
TransactionLogResource mockQueue = new TransactionLogResource()
{
@Override
+ public String getName()
+ {
+ return getId().toString();
+ }
+
+ @Override
public UUID getId()
{
return mockQueueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
};
Transaction txn = log.newTransaction();
@@ -463,10 +475,22 @@ public class BDBMessageStoreTest extends
TransactionLogResource mockQueue = new TransactionLogResource()
{
@Override
+ public String getName()
+ {
+ return getId().toString();
+ }
+
+ @Override
public UUID getId()
{
return mockQueueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
};
Transaction txn = log.newTransaction();
@@ -520,10 +544,22 @@ public class BDBMessageStoreTest extends
TransactionLogResource mockQueue = new TransactionLogResource()
{
@Override
+ public String getName()
+ {
+ return getId().toString();
+ }
+
+ @Override
public UUID getId()
{
return mockQueueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
};
Transaction txn = log.newTransaction();
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/
('svn:mergeinfo' removed)
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,14 +16,18 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
<parent>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-project</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <artifactId>qpid-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
</parent>
- <modelVersion>4.0.0</modelVersion>
<artifactId>qpid-broker-core</artifactId>
+ <version>0.28-SNAPSHOT</version>
+ <name>Qpid Java Broker Core</name>
+ <description>Broker core functionality and initial configuration</description>
<properties>
<generated-logmessages-dir>${basedir}/src/main/java</generated-logmessages-dir>
@@ -33,50 +37,38 @@
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-common</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-management-common</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
- <version>${log4j-version}</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>${slf4j-version}</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j-version}</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
- <version>1.1.1</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>xalan</groupId>
<artifactId>xalan</artifactId>
- <version>2.7.0</version>
- <scope>compile</scope>
<exclusions>
<exclusion>
<groupId>xml-apis</groupId>
@@ -88,15 +80,11 @@
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils-core</artifactId>
- <version>1.8.3</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-digester</groupId>
<artifactId>commons-digester</artifactId>
- <version>1.8.1</version>
- <scope>compile</scope>
<exclusions>
<exclusion>
<groupId>commons-beanutils</groupId>
@@ -108,50 +96,36 @@
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
- <version>1.6</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
- <version>2.6</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
- <version>3.2.1</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
- <version>1.8</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
- <version>1.9.0</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
- <version>1.9.0</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.bcel</groupId>
<artifactId>bcel</artifactId>
- <version>5.2</version>
- <scope>compile</scope>
<exclusions>
<exclusion>
<!-- Qpid doesn't require BCEL InstructionFinder, so does not need jakarta-regexp. -->
@@ -165,7 +139,7 @@
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-test-utils</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <version>${project.version}</version>
<scope>test</scope>
</dependency>
@@ -261,12 +235,12 @@
<dependency>
<groupId>velocity</groupId>
<artifactId>velocity</artifactId>
- <version>1.4</version>
+ <version>${velocity-version}</version>
</dependency>
<dependency>
<groupId>velocity</groupId>
<artifactId>velocity-dep</artifactId>
- <version>1.4</version>
+ <version>${velocity-version}</version>
</dependency>
</dependencies>
</plugin>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Wed Feb 12 13:27:51 2014
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BindingMessages;
@@ -33,14 +34,18 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -66,7 +71,7 @@ public abstract class AbstractExchange i
private VirtualHost _virtualHost;
- private final List<Task> _closeTaskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<Exchange>> _closeTaskList = new CopyOnWriteArrayList<Action<Exchange>>();
/**
* Whether the exchange is automatically deleted once all queues have detached from it
@@ -138,6 +143,12 @@ public abstract class AbstractExchange i
if(_closed.compareAndSet(false,true))
{
+ List<Binding> bindings = new ArrayList<Binding>(_bindings);
+ for(Binding binding : bindings)
+ {
+ removeBinding(binding);
+ }
+
if(_alternateExchange != null)
{
_alternateExchange.removeReference(this);
@@ -145,9 +156,9 @@ public abstract class AbstractExchange i
CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED());
- for(Task task : _closeTaskList)
+ for(Action<Exchange> task : _closeTaskList)
{
- task.onClose(this);
+ task.performAction(this);
}
_closeTaskList.clear();
}
@@ -300,12 +311,12 @@ public abstract class AbstractExchange i
return !_referrers.isEmpty();
}
- public void addCloseTask(final Task task)
+ public void addCloseTask(final Action<Exchange> task)
{
_closeTaskList.add(task);
}
- public void removeCloseTask(final Task task)
+ public void removeCloseTask(final Action<Exchange> task)
{
_closeTaskList.remove(task);
}
@@ -418,10 +429,10 @@ public abstract class AbstractExchange i
return queues;
}
- public final int send(final ServerMessage message,
+ public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final BaseQueue.PostEnqueueAction postEnqueueAction)
+ final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
{
List<? extends BaseQueue> queues = route(message, instanceProperties);
@@ -579,8 +590,6 @@ public abstract class AbstractExchange i
{
doRemoveBinding(b);
queue.removeBinding(b);
- removeCloseTask(b);
- queue.removeQueueDeleteTask(b);
if (b.isDurable())
{
@@ -659,8 +668,6 @@ public abstract class AbstractExchange i
DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b);
}
- queue.addQueueDeleteTask(b);
- addCloseTask(b);
queue.addBinding(b);
doAddBinding(b);
b.logCreation();
@@ -673,7 +680,7 @@ public abstract class AbstractExchange i
}
}
- private final class BindingImpl extends Binding implements AMQQueue.Task, Task
+ private final class BindingImpl extends Binding
{
private final BindingLogSubject _logSubject;
//TODO : persist creation time
@@ -689,12 +696,6 @@ public abstract class AbstractExchange i
}
-
- public void doTask(final AMQQueue queue) throws AMQException
- {
- removeBinding(this);
- }
-
public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException
{
removeBinding(this);
@@ -729,11 +730,6 @@ public abstract class AbstractExchange i
}
- public static interface Task
- {
- public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException;
- }
-
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Wed Feb 12 13:27:51 2014
@@ -32,18 +32,23 @@ import org.apache.qpid.AMQInternalExcept
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchange implements Exchange
@@ -331,10 +336,10 @@ public class DefaultExchange implements
return _id;
}
- public final int send(final ServerMessage message,
+ public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final BaseQueue.PostEnqueueAction postEnqueueAction)
+ final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
{
final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
if(q == null)
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java Wed Feb 12 13:27:51 2014
@@ -24,20 +24,16 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
-public interface Exchange extends ExchangeReferrer
+public interface Exchange extends ExchangeReferrer, MessageDestination
{
void initialise(UUID id, VirtualHost host, String name, boolean durable, boolean autoDelete)
throws AMQException;
@@ -95,19 +91,6 @@ public interface Exchange extends Exchan
void close() throws AMQException;
/**
- * Routes a message
- * @param message the message to be routed
- * @param instanceProperties the instance properties
- * @param txn the transaction to enqueue within
- * @param postEnqueueAction action to perform on the result of every enqueue (may be null)
- * @return the number of queues in which the message was enqueued performed
- */
- int send(ServerMessage message,
- InstanceProperties instanceProperties,
- ServerTransaction txn,
- BaseQueue.PostEnqueueAction postEnqueueAction);
-
- /**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
* @param bindingKey
* @param arguments
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java Wed Feb 12 13:27:51 2014
@@ -30,6 +30,7 @@ import org.apache.qpid.common.AMQPFilter
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -118,11 +119,11 @@ public class FilterSupport
}
}
- static final class NoLocalFilter implements MessageFilter
+ public static final class NoLocalFilter implements MessageFilter
{
- private final AMQQueue _queue;
+ private final MessageSource _queue;
- public NoLocalFilter(AMQQueue queue)
+ public NoLocalFilter(MessageSource queue)
{
_queue = queue;
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java Wed Feb 12 13:27:51 2014
@@ -35,7 +35,6 @@ import org.apache.qpid.server.logging.Lo
* 2) We can set new actors at the point we have enough information. i.e.
* - Set a low level ConnectionActor when processing bytes from the wire.
* - Set a ChannelActor when we are processing the frame
- * - Set a SubscriptionActor when we are handling the subscription.
* <p/>
* The code performing the logging need not worry about what type of actor is
* currently set so can perform its logging. The resulting log entry though will
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java Wed Feb 12 13:27:51 2014
@@ -42,6 +42,24 @@ public class GenericActor extends Abstra
_defaultMessageLogger = defaultMessageLogger;
}
+ public GenericActor(final String logSubject)
+ {
+ this(new LogSubject()
+ {
+ @Override
+ public String toLogString()
+ {
+ return logSubject;
+ }
+ });
+ }
+
+
+ public GenericActor(LogSubject logSubject)
+ {
+ this(logSubject, CurrentActor.get().getRootMessageLogger());
+ }
+
public GenericActor(LogSubject logSubject, RootMessageLogger rootLogger)
{
super(rootLogger);
@@ -53,6 +71,11 @@ public class GenericActor extends Abstra
return _logSubject.toLogString();
}
+ public LogSubject getLogSubject()
+ {
+ return _logSubject;
+ }
+
public static LogActor getInstance(final String logMessage, RootMessageLogger rootLogger)
{
return new GenericActor(new LogSubject()
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1567616&r1=1567584&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java Wed Feb 12 13:27:51 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.message.internal;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.MessageStore;
@@ -140,6 +141,10 @@ public class InternalMessage extends Abs
return new InternalMessage(handle, internalHeader, bodyObject);
}
+ catch(AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
catch (IOException e)
{
throw new RuntimeException(e);
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AccessControlProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AccessControlProvider.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AccessControlProvider.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AccessControlProvider.java Wed Feb 12 13:27:51 2014
@@ -27,9 +27,7 @@ import org.apache.qpid.server.security.A
public interface AccessControlProvider extends ConfiguredObject
{
- public static final String ID = "id";
public static final String DESCRIPTION = "description";
- public static final String NAME = "name";
public static final String STATE = "state";
public static final String DURABLE = "durable";
public static final String LIFETIME_POLICY = "lifetimePolicy";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AuthenticationProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AuthenticationProvider.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AuthenticationProvider.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AuthenticationProvider.java Wed Feb 12 13:27:51 2014
@@ -29,9 +29,7 @@ import org.apache.qpid.server.security.S
public interface AuthenticationProvider extends ConfiguredObject
{
- public static final String ID = "id";
public static final String DESCRIPTION = "description";
- public static final String NAME = "name";
public static final String STATE = "state";
public static final String DURABLE = "durable";
public static final String LIFETIME_POLICY = "lifetimePolicy";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java Wed Feb 12 13:27:51 2014
@@ -43,9 +43,7 @@ public interface Binding extends Configu
public String ARGUMENTS = "arguments";
public String CREATED = "created";
public String DURABLE = "durable";
- public String ID = "id";
public String LIFETIME_POLICY = "lifetimePolicy";
- public String NAME = "name";
public String STATE = "state";
public String TIME_TO_LIVE = "timeToLive";
public String UPDATED = "updated";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java Wed Feb 12 13:27:51 2014
@@ -48,9 +48,7 @@ public interface Broker extends Configur
String SUPPORTED_PREFERENCES_PROVIDERS_TYPES = "supportedPreferencesProviderTypes";
String CREATED = "created";
String DURABLE = "durable";
- String ID = "id";
String LIFETIME_POLICY = "lifetimePolicy";
- String NAME = "name";
String STATE = "state";
String TIME_TO_LIVE = "timeToLive";
String UPDATED = "updated";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java Wed Feb 12 13:27:51 2014
@@ -25,6 +25,14 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
+@AmqpManagement(
+ attributes = {
+ ConfiguredObject.ID,
+ ConfiguredObject.NAME
+ },
+ operations = {},
+ creatable = false
+)
/**
* An object that can be "managed" (eg via the web interface) and usually read from configuration.
*/
@@ -32,6 +40,9 @@ public interface ConfiguredObject
{
final String DESIRED_STATE = "desiredState";
+ public static final String ID = "id";
+ public static final String NAME = "name";
+// public static final String TYPE = "type";
/**
* Get the universally unique identifier for the object
*
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java Wed Feb 12 13:27:51 2014
@@ -24,6 +24,32 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+@AmqpManagement(
+ attributes = {
+ Connection.ID,
+ Connection.NAME,
+ Connection.STATE,
+ Connection.DURABLE,
+ Connection.LIFETIME_POLICY,
+ Connection.TIME_TO_LIVE,
+ Connection.CREATED,
+ Connection.UPDATED,
+ Connection.CLIENT_ID,
+ Connection.CLIENT_VERSION,
+ Connection.INCOMING,
+ Connection.LOCAL_ADDRESS,
+ Connection.PRINCIPAL,
+ Connection.PROPERTIES,
+ Connection.REMOTE_ADDRESS,
+ Connection.REMOTE_PROCESS_NAME,
+ Connection.REMOTE_PROCESS_PID,
+ Connection.SESSION_COUNT_LIMIT,
+ Connection.TRANSPORT,
+ Connection.PORT
+ },
+ operations = {},
+ creatable = false
+)
public interface Connection extends ConfiguredObject
{
@@ -59,8 +85,6 @@ public interface Connection extends Conf
// Attributes
- public static final String ID = "id";
- public static final String NAME = "name";
public static final String STATE = "state";
public static final String DURABLE = "durable";
public static final String LIFETIME_POLICY = "lifetimePolicy";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java Wed Feb 12 13:27:51 2014
@@ -33,9 +33,7 @@ public interface Consumer extends Config
public String SETTLEMENT_MODE = "settlementMode";
public String CREATED = "created";
public String DURABLE = "durable";
- public String ID = "id";
public String LIFETIME_POLICY = "lifetimePolicy";
- public String NAME = "name";
public String STATE = "state";
public String TIME_TO_LIVE = "timeToLive";
public String UPDATED = "updated";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java Wed Feb 12 13:27:51 2014
@@ -25,6 +25,22 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+@AmqpManagement(
+ attributes = {
+ Exchange.ID,
+ Exchange.NAME,
+ Exchange.STATE,
+ Exchange.DURABLE,
+ Exchange.LIFETIME_POLICY,
+ Exchange.TIME_TO_LIVE,
+ Exchange.CREATED,
+ Exchange.UPDATED,
+ Exchange.ALTERNATE_EXCHANGE,
+ Exchange.TYPE
+ },
+ operations = {}
+)
+
public interface Exchange extends ConfiguredObject
{
String BINDING_COUNT = "bindingCount";
@@ -47,9 +63,7 @@ public interface Exchange extends Config
String CREATED = "created";
String DURABLE = "durable";
- String ID = "id";
String LIFETIME_POLICY = "lifetimePolicy";
- String NAME = "name";
String STATE = "state";
String TIME_TO_LIVE = "timeToLive";
String UPDATED = "updated";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Group.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Group.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Group.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Group.java Wed Feb 12 13:27:51 2014
@@ -27,9 +27,7 @@ public interface Group extends Configure
{
String CREATED = "created";
String DURABLE = "durable";
- String ID = "id";
String LIFETIME_POLICY = "lifetimePolicy";
- String NAME = "name";
String STATE = "state";
String TIME_TO_LIVE = "timeToLive";
String UPDATED = "updated";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/GroupMember.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/GroupMember.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/GroupMember.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/GroupMember.java Wed Feb 12 13:27:51 2014
@@ -27,9 +27,7 @@ public interface GroupMember extends Con
{
String CREATED = "created";
String DURABLE = "durable";
- String ID = "id";
String LIFETIME_POLICY = "lifetimePolicy";
- String NAME = "name";
String STATE = "state";
String TIME_TO_LIVE = "timeToLive";
String UPDATED = "updated";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/GroupProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/GroupProvider.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/GroupProvider.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/GroupProvider.java Wed Feb 12 13:27:51 2014
@@ -27,9 +27,7 @@ import java.util.Set;
public interface GroupProvider extends ConfiguredObject
{
- public static final String ID = "id";
public static final String DESCRIPTION = "description";
- public static final String NAME = "name";
public static final String STATE = "state";
public static final String DURABLE = "durable";
public static final String LIFETIME_POLICY = "lifetimePolicy";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/KeyStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/KeyStore.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/KeyStore.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/KeyStore.java Wed Feb 12 13:27:51 2014
@@ -28,8 +28,6 @@ import javax.net.ssl.KeyManager;
public interface KeyStore extends ConfiguredObject
{
- String ID = "id";
- String NAME = "name";
String DURABLE = "durable";
String LIFETIME_POLICY = "lifetimePolicy";
String STATE = "state";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Plugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Plugin.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Plugin.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Plugin.java Wed Feb 12 13:27:51 2014
@@ -29,9 +29,7 @@ public interface Plugin extends Configur
//Hack, using it for the class name only for consistency with the other things.
String CREATED = "created";
String DURABLE = "durable";
- String ID = "id";
String LIFETIME_POLICY = "lifetimePolicy";
- String NAME = "name";
String STATE = "state";
String TIME_TO_LIVE = "timeToLive";
String UPDATED = "updated";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java Wed Feb 12 13:27:51 2014
@@ -29,9 +29,7 @@ public interface Port extends Configured
{
String CREATED = "created";
String DURABLE = "durable";
- String ID = "id";
String LIFETIME_POLICY = "lifetimePolicy";
- String NAME = "name";
String STATE = "state";
String TIME_TO_LIVE = "timeToLive";
String UPDATED = "updated";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/PreferencesProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/PreferencesProvider.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/PreferencesProvider.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/PreferencesProvider.java Wed Feb 12 13:27:51 2014
@@ -29,8 +29,6 @@ import java.util.Set;
public interface PreferencesProvider extends ConfiguredObject
{
- String ID = "id";
- String NAME = "name";
String TYPE = "type";
String CREATED = "created";
String UPDATED = "updated";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Wed Feb 12 13:27:51 2014
@@ -25,6 +25,40 @@ import java.util.Collection;
import java.util.Collections;
import org.apache.qpid.server.queue.QueueEntryVisitor;
+@AmqpManagement(
+ attributes = {
+ Queue.ID,
+ Queue.NAME,
+ Queue.DESCRIPTION,
+ Queue.STATE,
+ Queue.DURABLE,
+ Queue.LIFETIME_POLICY,
+ Queue.TIME_TO_LIVE,
+ Queue.CREATED,
+ Queue.UPDATED,
+ Queue.QUEUE_TYPE,
+ Queue.ALTERNATE_EXCHANGE,
+ Queue.EXCLUSIVE,
+ Queue.OWNER,
+ Queue.NO_LOCAL,
+ Queue.LVQ_KEY,
+ Queue.SORT_KEY,
+ Queue.MESSAGE_GROUP_KEY,
+ Queue.MESSAGE_GROUP_SHARED_GROUPS,
+ Queue.MAXIMUM_DELIVERY_ATTEMPTS,
+ Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES,
+ Queue.QUEUE_FLOW_RESUME_SIZE_BYTES,
+ Queue.QUEUE_FLOW_STOPPED,
+ Queue.ALERT_THRESHOLD_MESSAGE_AGE,
+ Queue.ALERT_THRESHOLD_MESSAGE_SIZE,
+ Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
+ Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
+ Queue.ALERT_REPEAT_GAP,
+ Queue.PRIORITIES
+ },
+ operations = {}
+)
+
public interface Queue extends ConfiguredObject
{
public static final String BINDING_COUNT = "bindingCount";
@@ -69,9 +103,7 @@ public interface Queue extends Configure
- public static final String ID = "id";
public static final String DESCRIPTION = "description";
- public static final String NAME = "name";
public static final String STATE = "state";
public static final String DURABLE = "durable";
public static final String LIFETIME_POLICY = "lifetimePolicy";
@@ -98,7 +130,7 @@ public interface Queue extends Configure
public static final String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes";
public static final String QUEUE_FLOW_STOPPED = "queueFlowStopped";
public static final String SORT_KEY = "sortKey";
- public static final String TYPE = "type";
+ public static final String QUEUE_TYPE = "queueType";
public static final String PRIORITIES = "priorities";
public static final String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change
@@ -118,7 +150,7 @@ public interface Queue extends Configure
TIME_TO_LIVE,
CREATED,
UPDATED,
- TYPE,
+ QUEUE_TYPE,
ALTERNATE_EXCHANGE,
EXCLUSIVE,
OWNER,
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java Wed Feb 12 13:27:51 2014
@@ -26,8 +26,6 @@ import java.util.Collections;
public interface ReplicationNode extends ConfiguredObject
{
- String ID = "id";
- String NAME = "name";
String STATE = "state";
String CREATED = "created";
String DURABLE = "durable";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java Wed Feb 12 13:27:51 2014
@@ -51,8 +51,6 @@ public interface Session extends Configu
XA_TRANSACTION_BRANCH_SUSPENDS));
- public static final String ID = "id";
- public static final String NAME = "name";
public static final String STATE = "state";
public static final String DURABLE = "durable";
public static final String LIFETIME_POLICY = "lifetimePolicy";
@@ -77,6 +75,6 @@ public interface Session extends Configu
CHANNEL_ID,
PRODUCER_FLOW_BLOCKED));
- Collection<Consumer> getSubscriptions();
+ Collection<Consumer> getConsumers();
Collection<Publisher> getPublishers();
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/TrustStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/TrustStore.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/TrustStore.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/TrustStore.java Wed Feb 12 13:27:51 2014
@@ -28,8 +28,6 @@ import javax.net.ssl.TrustManager;
public interface TrustStore extends ConfiguredObject
{
- String ID = "id";
- String NAME = "name";
String DURABLE = "durable";
String LIFETIME_POLICY = "lifetimePolicy";
String STATE = "state";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/User.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/User.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/User.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/User.java Wed Feb 12 13:27:51 2014
@@ -30,9 +30,7 @@ public interface User extends Configured
{
String CREATED = "created";
String DURABLE = "durable";
- String ID = "id";
String LIFETIME_POLICY = "lifetimePolicy";
- String NAME = "name";
String STATE = "state";
String TIME_TO_LIVE = "timeToLive";
String UPDATED = "updated";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Wed Feb 12 13:27:51 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.model;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
@@ -31,6 +32,43 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+@AmqpManagement(
+ attributes = {
+ VirtualHost.ID,
+ VirtualHost.NAME,
+ VirtualHost.TYPE,
+ VirtualHost.STATE,
+ VirtualHost.DURABLE,
+ VirtualHost.LIFETIME_POLICY,
+ VirtualHost.TIME_TO_LIVE,
+ VirtualHost.CREATED,
+ VirtualHost.UPDATED,
+ VirtualHost.SUPPORTED_EXCHANGE_TYPES,
+ VirtualHost.SUPPORTED_QUEUE_TYPES,
+ VirtualHost.QUEUE_DEAD_LETTER_QUEUE_ENABLED,
+ VirtualHost.HOUSEKEEPING_CHECK_PERIOD,
+ VirtualHost.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS,
+ VirtualHost.QUEUE_FLOW_CONTROL_SIZE_BYTES,
+ VirtualHost.QUEUE_FLOW_RESUME_SIZE_BYTES,
+ VirtualHost.CONFIG_STORE_TYPE,
+ VirtualHost.CONFIG_STORE_PATH,
+ VirtualHost.STORE_TYPE,
+ VirtualHost.STORE_PATH,
+ VirtualHost.STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE,
+ VirtualHost.STORE_TRANSACTION_IDLE_TIMEOUT_WARN,
+ VirtualHost.STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE,
+ VirtualHost.STORE_TRANSACTION_OPEN_TIMEOUT_WARN,
+ VirtualHost.QUEUE_ALERT_REPEAT_GAP,
+ VirtualHost.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE,
+ VirtualHost.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE,
+ VirtualHost.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
+ VirtualHost.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
+ VirtualHost.CONFIG_PATH
+ },
+ operations = {},
+ managesChildren = true
+)
+
public interface VirtualHost extends ConfiguredObject
{
// Statistics
@@ -81,9 +119,7 @@ public interface VirtualHost extends Con
String SUPPORTED_QUEUE_TYPES = "supportedQueueTypes";
String CREATED = "created";
String DURABLE = "durable";
- String ID = "id";
String LIFETIME_POLICY = "lifetimePolicy";
- String NAME = "name";
String STATE = "state";
String TIME_TO_LIVE = "timeToLive";
String TYPE = "type";
@@ -152,11 +188,11 @@ public interface VirtualHost extends Con
public static interface Transaction
{
- void dequeue(QueueEntry entry);
+ void dequeue(MessageInstance entry);
- void copy(QueueEntry entry, Queue queue);
+ void copy(MessageInstance entry, Queue queue);
- void move(QueueEntry entry, Queue queue);
+ void move(MessageInstance entry, Queue queue);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java Wed Feb 12 13:27:51 2014
@@ -22,33 +22,32 @@ package org.apache.qpid.server.model.ada
import java.util.Map;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import java.security.AccessControlException;
import java.util.Collection;
import java.util.Collections;
-public class ConsumerAdapter extends AbstractAdapter implements Consumer
+public class ConsumerAdapter extends AbstractAdapter implements org.apache.qpid.server.model.Consumer
{
- private final Subscription _subscription;
+ private final Consumer _consumer;
private final QueueAdapter _queue;
private final SessionAdapter _session;
private final ConsumerStatistics _statistics;
public ConsumerAdapter(final QueueAdapter queueAdapter, final SessionAdapter sessionAdapter,
- final Subscription subscription)
+ final Consumer consumer)
{
super(UUIDGenerator.generateConsumerUUID(queueAdapter.getVirtualHost().getName(),
queueAdapter.getName(),
- subscription.getSessionModel().getConnectionModel().getRemoteAddressString(),
- String.valueOf(subscription.getSessionModel().getChannelId()),
- subscription.getConsumerName()), queueAdapter.getTaskExecutor());
- _subscription = subscription;
+ consumer.getSessionModel().getConnectionModel().getRemoteAddressString(),
+ String.valueOf(consumer.getSessionModel().getChannelId()),
+ consumer.getName()), queueAdapter.getTaskExecutor());
+ _consumer = consumer;
_queue = queueAdapter;
_session = sessionAdapter;
_statistics = new ConsumerStatistics();
@@ -57,7 +56,7 @@ public class ConsumerAdapter extends Abs
public String getName()
{
- return _subscription.getConsumerName();
+ return _consumer.getName();
}
public String setName(final String currentName, final String desiredName)
@@ -107,7 +106,7 @@ public class ConsumerAdapter extends Abs
@Override
public Collection<String> getAttributeNames()
{
- return Consumer.AVAILABLE_ATTRIBUTES;
+ return org.apache.qpid.server.model.Consumer.AVAILABLE_ATTRIBUTES;
}
@Override
@@ -147,7 +146,7 @@ public class ConsumerAdapter extends Abs
}
else if(DISTRIBUTION_MODE.equals(name))
{
- return _subscription.acquires() ? "MOVE" : "COPY";
+ return _consumer.acquires() ? "MOVE" : "COPY";
}
else if(SETTLEMENT_MODE.equals(name))
{
@@ -197,11 +196,11 @@ public class ConsumerAdapter extends Abs
{
if(name.equals(BYTES_OUT))
{
- return _subscription.getBytesOut();
+ return _consumer.getBytesOut();
}
else if(name.equals(MESSAGES_OUT))
{
- return _subscription.getMessagesOut();
+ return _consumer.getMessagesOut();
}
else if(name.equals(STATE_CHANGED))
{
@@ -209,11 +208,11 @@ public class ConsumerAdapter extends Abs
}
else if(name.equals(UNACKNOWLEDGED_BYTES))
{
- return _subscription.getUnacknowledgedBytes();
+ return _consumer.getUnacknowledgedBytes();
}
else if(name.equals(UNACKNOWLEDGED_MESSAGES))
{
- return _subscription.getUnacknowledgedMessages();
+ return _consumer.getUnacknowledgedMessages();
}
return null; // TODO - Implement
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java Wed Feb 12 13:27:51 2014
@@ -26,16 +26,15 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFinder;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -47,10 +46,12 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.MapValueConverter;
-final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.SubscriptionRegistrationListener, AMQQueue.NotificationListener
+final class QueueAdapter<Q extends AMQQueue<?,Q,?>> extends AbstractAdapter implements Queue,
+ MessageSource.ConsumerRegistrationListener<Q>,
+ AMQQueue.NotificationListener
{
@SuppressWarnings("serial")
static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{
@@ -66,25 +67,26 @@ final class QueueAdapter extends Abstrac
put(DESCRIPTION, String.class);
}});
- private final AMQQueue _queue;
+ private final AMQQueue<?,Q,?> _queue;
+
private final Map<Binding, BindingAdapter> _bindingAdapters =
new HashMap<Binding, BindingAdapter>();
- private Map<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter> _consumerAdapters =
- new HashMap<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter>();
+ private final Map<Consumer, ConsumerAdapter> _consumerAdapters =
+ new HashMap<Consumer, ConsumerAdapter>();
private final VirtualHostAdapter _vhost;
private QueueStatisticsAdapter _statistics;
private QueueNotificationListener _queueNotificationListener;
- public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue queue)
+ public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue<?,Q,?> queue)
{
super(queue.getId(), virtualHostAdapter.getTaskExecutor());
_vhost = virtualHostAdapter;
addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter);
_queue = queue;
- _queue.addSubscriptionRegistrationListener(this);
+ _queue.addConsumerRegistrationListener(this);
populateConsumers();
_statistics = new QueueStatisticsAdapter(queue);
_queue.setNotificationListener(this);
@@ -124,21 +126,20 @@ final class QueueAdapter extends Abstrac
private void populateConsumers()
{
- Collection<org.apache.qpid.server.subscription.Subscription> actualSubscriptions = _queue.getConsumers();
+ Collection<? extends Consumer> actualConsumers = _queue.getConsumers();
synchronized (_consumerAdapters)
{
- Iterator<org.apache.qpid.server.subscription.Subscription> iter = _consumerAdapters.keySet().iterator();
- for(org.apache.qpid.server.subscription.Subscription subscription : actualSubscriptions)
+ for(Consumer consumer : actualConsumers)
{
- if(!_consumerAdapters.containsKey(subscription))
+ if(!_consumerAdapters.containsKey(consumer))
{
- SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
- ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
- _consumerAdapters.put(subscription, adapter);
+ SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
+ ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, consumer);
+ _consumerAdapters.put(consumer, adapter);
if (sessionAdapter != null)
{ // Register ConsumerAdapter with the SessionAdapter.
- sessionAdapter.subscriptionRegistered(subscription, adapter);
+ sessionAdapter.consumerRegistered(consumer, adapter);
}
}
}
@@ -153,11 +154,11 @@ final class QueueAdapter extends Abstrac
}
}
- public Collection<Consumer> getConsumers()
+ public Collection<org.apache.qpid.server.model.Consumer> getConsumers()
{
synchronized (_consumerAdapters)
{
- return new ArrayList<Consumer>(_consumerAdapters.values());
+ return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values());
}
}
@@ -321,7 +322,7 @@ final class QueueAdapter extends Abstrac
{
// TODO
}
- else if(TYPE.equals(name))
+ else if(QUEUE_TYPE.equals(name))
{
// TODO
}
@@ -396,9 +397,10 @@ final class QueueAdapter extends Abstrac
}
else if(LVQ_KEY.equals(name))
{
- if(_queue instanceof ConflationQueue)
+ AMQQueue queue = _queue;
+ if(queue instanceof ConflationQueue)
{
- return ((ConflationQueue)_queue).getConflationKey();
+ return ((ConflationQueue)queue).getConflationKey();
}
}
else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
@@ -427,22 +429,24 @@ final class QueueAdapter extends Abstrac
}
else if(SORT_KEY.equals(name))
{
- if(_queue instanceof SortedQueue)
+ AMQQueue queue = _queue;
+ if(queue instanceof SortedQueue)
{
- return ((SortedQueue)_queue).getSortedPropertyName();
+ return ((SortedQueue)queue).getSortedPropertyName();
}
}
- else if(TYPE.equals(name))
+ else if(QUEUE_TYPE.equals(name))
{
- if(_queue instanceof SortedQueue)
+ AMQQueue queue = _queue;
+ if(queue instanceof SortedQueue)
{
return "sorted";
}
- if(_queue instanceof ConflationQueue)
+ if(queue instanceof ConflationQueue)
{
return "lvq";
}
- if(_queue instanceof AMQPriorityQueue)
+ if(queue instanceof PriorityQueue)
{
return "priority";
}
@@ -486,9 +490,10 @@ final class QueueAdapter extends Abstrac
}
else if(PRIORITIES.equals(name))
{
- if(_queue instanceof AMQPriorityQueue)
+ AMQQueue queue = _queue;
+ if(queue instanceof PriorityQueue)
{
- return ((AMQPriorityQueue)_queue).getPriorities();
+ return ((PriorityQueue)queue).getPriorities();
}
}
return super.getAttribute(name);
@@ -502,7 +507,7 @@ final class QueueAdapter extends Abstrac
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
- if(clazz == Consumer.class)
+ if(clazz == org.apache.qpid.server.model.Consumer.class)
{
return (Collection<C>) getConsumers();
}
@@ -587,19 +592,19 @@ final class QueueAdapter extends Abstrac
return _queue;
}
- public void subscriptionRegistered(final AMQQueue queue, final Subscription subscription)
+ public void consumerAdded(final AMQQueue queue, final Consumer consumer)
{
ConsumerAdapter adapter = null;
synchronized (_consumerAdapters)
{
- if(!_consumerAdapters.containsKey(subscription))
+ if(!_consumerAdapters.containsKey(consumer))
{
- SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
- adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
- _consumerAdapters.put(subscription, adapter);
+ SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
+ adapter = new ConsumerAdapter(this, sessionAdapter, consumer);
+ _consumerAdapters.put(consumer, adapter);
if (sessionAdapter != null)
{ // Register ConsumerAdapter with the SessionAdapter.
- sessionAdapter.subscriptionRegistered(subscription, adapter);
+ sessionAdapter.consumerRegistered(consumer, adapter);
}
}
}
@@ -609,20 +614,20 @@ final class QueueAdapter extends Abstrac
}
}
- public void subscriptionUnregistered(final AMQQueue queue, final Subscription subscription)
+ public void consumerRemoved(final AMQQueue queue, final Consumer consumer)
{
ConsumerAdapter adapter = null;
synchronized (_consumerAdapters)
{
- adapter = _consumerAdapters.remove(subscription);
+ adapter = _consumerAdapters.remove(consumer);
}
if(adapter != null)
{
- SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
if (sessionAdapter != null)
{ // Unregister ConsumerAdapter with the SessionAdapter.
- sessionAdapter.subscriptionUnregistered(subscription);
+ sessionAdapter.consumerUnregistered(consumer);
}
childRemoved(adapter);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Wed Feb 12 13:27:51 2014
@@ -34,9 +34,8 @@ import org.apache.qpid.server.model.Publ
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -47,7 +46,7 @@ final class SessionAdapter extends Abstr
private AMQSessionModel _session;
private SessionStatistics _statistics;
- private Map<Subscription, ConsumerAdapter> _consumerAdapters = new HashMap<Subscription, ConsumerAdapter>();
+ private Map<Consumer, ConsumerAdapter> _consumerAdapters = new HashMap<Consumer, ConsumerAdapter>();
public SessionAdapter(final AMQSessionModel session, TaskExecutor taskExecutor)
{
@@ -56,11 +55,11 @@ final class SessionAdapter extends Abstr
_statistics = new SessionStatistics();
}
- public Collection<Consumer> getSubscriptions()
+ public Collection<org.apache.qpid.server.model.Consumer> getConsumers()
{
synchronized (_consumerAdapters)
{
- return new ArrayList<Consumer>(_consumerAdapters.values());
+ return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values());
}
}
@@ -119,29 +118,29 @@ final class SessionAdapter extends Abstr
}
/**
- * Register a ConsumerAdapter (Subscription) with this Session keyed by the Subscription.
- * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter.
+ * Register a ConsumerAdapter with this Session keyed by the Consumer.
+ * @param consumer the org.apache.qpid.server.consumer.Consumer used to key the ConsumerAdapter.
* @param adapter the registered ConsumerAdapter.
*/
- void subscriptionRegistered(Subscription subscription, ConsumerAdapter adapter)
+ void consumerRegistered(Consumer consumer, ConsumerAdapter adapter)
{
synchronized (_consumerAdapters)
{
- _consumerAdapters.put(subscription, adapter);
+ _consumerAdapters.put(consumer, adapter);
}
childAdded(adapter);
}
/**
- * Unregister a ConsumerAdapter (Subscription) with this Session keyed by the Subscription.
- * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter.
+ * Unregister a ConsumerAdapter with this Session keyed by the Consumer.
+ * @param consumer the org.apache.qpid.server.consumer.Consumer used to key the ConsumerAdapter.
*/
- void subscriptionUnregistered(Subscription subscription)
+ void consumerUnregistered(Consumer consumer)
{
ConsumerAdapter adapter = null;
synchronized (_consumerAdapters)
{
- adapter = _consumerAdapters.remove(subscription);
+ adapter = _consumerAdapters.remove(consumer);
}
if (adapter != null)
{
@@ -188,9 +187,9 @@ final class SessionAdapter extends Abstr
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
- if(clazz == Consumer.class)
+ if(clazz == org.apache.qpid.server.model.Consumer.class)
{
- return (Collection<C>) getSubscriptions();
+ return (Collection<C>) getConsumers();
}
else if(clazz == Publisher.class)
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Wed Feb 12 13:27:51 2014
@@ -50,6 +50,7 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -73,7 +74,6 @@ import org.apache.qpid.server.plugin.Vir
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.replication.ReplicationGroupListener;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
@@ -203,7 +203,10 @@ public final class VirtualHostAdapter ex
{
if(!_exchangeAdapters.containsKey(exchange))
{
- _exchangeAdapters.put(exchange, new ExchangeAdapter(this,exchange));
+ final ExchangeAdapter adapter = new ExchangeAdapter(this, exchange);
+ _exchangeAdapters.put(exchange, adapter);
+ childAdded(adapter);
+
}
}
}
@@ -221,7 +224,9 @@ public final class VirtualHostAdapter ex
{
if(!_queueAdapters.containsKey(queue))
{
- _queueAdapters.put(queue, new QueueAdapter(this, queue));
+ final QueueAdapter adapter = new QueueAdapter(this, queue);
+ _queueAdapters.put(queue, adapter);
+ childAdded(adapter);
}
}
}
@@ -403,9 +408,9 @@ public final class VirtualHostAdapter ex
{
attributes = new HashMap<String, Object>(attributes);
- if (attributes.containsKey(Queue.TYPE))
+ if (attributes.containsKey(Queue.QUEUE_TYPE))
{
- String typeAttribute = MapValueConverter.getStringAttribute(Queue.TYPE, attributes, null);
+ String typeAttribute = MapValueConverter.getStringAttribute(Queue.QUEUE_TYPE, attributes, null);
QueueType queueType = null;
try
{
@@ -791,11 +796,11 @@ public final class VirtualHostAdapter ex
op.withinTransaction(new Transaction()
{
- public void dequeue(final QueueEntry entry)
+ public void dequeue(final MessageInstance entry)
{
if(entry.acquire())
{
- txn.dequeue(entry.getQueue(), entry.getMessage(), new ServerTransaction.Action()
+ txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action()
{
public void postCommit()
{
@@ -809,7 +814,7 @@ public final class VirtualHostAdapter ex
}
}
- public void copy(QueueEntry entry, Queue queue)
+ public void copy(MessageInstance entry, Queue queue)
{
final ServerMessage message = entry.getMessage();
final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue();
@@ -820,7 +825,7 @@ public final class VirtualHostAdapter ex
{
try
{
- toQueue.enqueue(message);
+ toQueue.enqueue(message, null);
}
catch(AMQException e)
{
@@ -835,7 +840,7 @@ public final class VirtualHostAdapter ex
}
- public void move(final QueueEntry entry, Queue queue)
+ public void move(final MessageInstance entry, Queue queue)
{
final ServerMessage message = entry.getMessage();
final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue();
@@ -849,7 +854,7 @@ public final class VirtualHostAdapter ex
{
try
{
- toQueue.enqueue(message);
+ toQueue.enqueue(message, null);
}
catch (AMQException e)
{
@@ -862,7 +867,7 @@ public final class VirtualHostAdapter ex
entry.release();
}
});
- txn.dequeue(entry.getQueue(), message,
+ txn.dequeue(entry.getOwningResource(), message,
new ServerTransaction.Action()
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Wed Feb 12 13:27:51 2014
@@ -27,12 +27,11 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
/**
* Session model interface.
* Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet}
- * when monitoring the blocking and blocking of queues/sessions in {@link SimpleAMQQueue}.
+ * when monitoring the blocking and blocking of queues/sessions in {@link AMQQueue}.
*/
public interface AMQSessionModel extends Comparable<AMQSessionModel>
{
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/
('svn:mergeinfo' removed)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org