You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC
svn commit: r1525101 [13/21] - in /qpid/branches/linearstore/qpid: ./ bin/
cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/
cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2...
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java Fri Sep 20 18:59:30 2013
@@ -54,6 +54,7 @@ public class BrokerProperties
public static final String PROPERTY_QPID_HOME = "QPID_HOME";
public static final String PROPERTY_QPID_WORK = "QPID_WORK";
+ public static final String PROPERTY_LOG_RECORDS_BUFFER_SIZE = "qpid.broker_log_records_buffer_size";
private BrokerProperties()
{
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Fri Sep 20 18:59:30 2013
@@ -117,7 +117,7 @@ public class QueueConfiguration extends
public String getExchange()
{
- return getStringValue("exchange", ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString());
+ return getStringValue("exchange", ExchangeDefaults.DEFAULT_EXCHANGE_NAME);
}
public List getRoutingKeys()
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Fri Sep 20 18:59:30 2013
@@ -26,7 +26,6 @@ import org.apache.commons.configuration.
import org.apache.qpid.server.configuration.plugins.AbstractConfiguration;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.store.MemoryMessageStore;
import java.io.File;
import java.util.HashMap;
@@ -129,7 +128,7 @@ public class VirtualHostConfiguration ex
public String getMessageStoreClass()
{
- return getStringValue("store.class", MemoryMessageStore.class.getName());
+ return getStringValue("store.class", null);
}
public void setMessageStoreClass(String storeFactoryClass)
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/AuthenticationProviderRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/AuthenticationProviderRecoverer.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/AuthenticationProviderRecoverer.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/AuthenticationProviderRecoverer.java Fri Sep 20 18:59:30 2013
@@ -20,23 +20,29 @@
*/
package org.apache.qpid.server.configuration.startup;
+import java.util.Collection;
import java.util.Map;
import org.apache.qpid.server.configuration.ConfigurationEntry;
import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.RecovererProvider;
+import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.PreferencesProvider;
import org.apache.qpid.server.model.adapter.AuthenticationProviderFactory;
public class AuthenticationProviderRecoverer implements ConfiguredObjectRecoverer<AuthenticationProvider>
{
private final AuthenticationProviderFactory _authenticationProviderFactory;
+ private final StoreConfigurationChangeListener _storeChangeListener;
- public AuthenticationProviderRecoverer(AuthenticationProviderFactory authenticationProviderFactory)
+ public AuthenticationProviderRecoverer(AuthenticationProviderFactory authenticationProviderFactory, StoreConfigurationChangeListener storeChangeListener)
{
_authenticationProviderFactory = authenticationProviderFactory;
+ _storeChangeListener = storeChangeListener;
}
@Override
@@ -46,6 +52,44 @@ public class AuthenticationProviderRecov
Map<String, Object> attributes = configurationEntry.getAttributes();
AuthenticationProvider authenticationProvider = _authenticationProviderFactory.recover(configurationEntry.getId(), attributes, broker);
+ Map<String, Collection<ConfigurationEntry>> childEntries = configurationEntry.getChildren();
+
+ for (String type : childEntries.keySet())
+ {
+ recoverType(recovererProvider, _storeChangeListener, authenticationProvider, childEntries, type);
+ }
+
return authenticationProvider;
}
+
+ private void recoverType(RecovererProvider recovererProvider,
+ StoreConfigurationChangeListener storeChangeListener,
+ AuthenticationProvider authenticationProvider,
+ Map<String, Collection<ConfigurationEntry>> childEntries,
+ String type)
+ {
+ ConfiguredObjectRecoverer<?> recoverer = recovererProvider.getRecoverer(type);
+ if (recoverer == null)
+ {
+ throw new IllegalConfigurationException("Cannot recover entry for the type '" + type + "' from broker");
+ }
+ Collection<ConfigurationEntry> entries = childEntries.get(type);
+ for (ConfigurationEntry childEntry : entries)
+ {
+ ConfiguredObject object = recoverer.create(recovererProvider, childEntry, authenticationProvider);
+ if (object == null)
+ {
+ throw new IllegalConfigurationException("Cannot create configured object for the entry " + childEntry);
+ }
+ if (object instanceof PreferencesProvider)
+ {
+ authenticationProvider.setPreferencesProvider((PreferencesProvider)object);
+ }
+ else
+ {
+ throw new IllegalConfigurationException("Cannot associate " + object + " with authentication provider " + authenticationProvider);
+ }
+ object.addChangeListener(storeChangeListener);
+ }
+ }
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java Fri Sep 20 18:59:30 2013
@@ -47,6 +47,7 @@ import org.apache.qpid.server.model.adap
import org.apache.qpid.server.model.adapter.BrokerAdapter;
import org.apache.qpid.server.model.adapter.GroupProviderFactory;
import org.apache.qpid.server.model.adapter.PortFactory;
+import org.apache.qpid.server.model.adapter.PreferencesProviderCreator;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -65,10 +66,13 @@ public class BrokerRecoverer implements
private final TaskExecutor _taskExecutor;
private final BrokerOptions _brokerOptions;
private final GroupProviderFactory _groupProviderFactory;
+ private final StoreConfigurationChangeListener _storeChangeListener;
+ private final PreferencesProviderCreator _preferencesProviderCreator;
public BrokerRecoverer(AuthenticationProviderFactory authenticationProviderFactory, GroupProviderFactory groupProviderFactory,
- AccessControlProviderFactory accessControlProviderFactory, PortFactory portFactory, StatisticsGatherer statisticsGatherer,
- VirtualHostRegistry virtualHostRegistry, LogRecorder logRecorder, RootMessageLogger rootMessageLogger, TaskExecutor taskExecutor, BrokerOptions brokerOptions)
+ AccessControlProviderFactory accessControlProviderFactory, PortFactory portFactory, PreferencesProviderCreator preferencesProviderFactory, StatisticsGatherer statisticsGatherer,
+ VirtualHostRegistry virtualHostRegistry, LogRecorder logRecorder, RootMessageLogger rootMessageLogger, TaskExecutor taskExecutor,
+ BrokerOptions brokerOptions, StoreConfigurationChangeListener storeChangeListener)
{
_groupProviderFactory = groupProviderFactory;
_portFactory = portFactory;
@@ -80,6 +84,8 @@ public class BrokerRecoverer implements
_rootMessageLogger = rootMessageLogger;
_taskExecutor = taskExecutor;
_brokerOptions = brokerOptions;
+ _storeChangeListener = storeChangeListener;
+ _preferencesProviderCreator = preferencesProviderFactory;
}
@Override
@@ -90,12 +96,11 @@ public class BrokerRecoverer implements
attributesCopy.put(Broker.MODEL_VERSION, Model.MODEL_VERSION);
- StoreConfigurationChangeListener storeChangeListener = new StoreConfigurationChangeListener(entry.getStore());
BrokerAdapter broker = new BrokerAdapter(entry.getId(), attributesCopy, _statisticsGatherer, _virtualHostRegistry,
- _logRecorder, _rootMessageLogger, _authenticationProviderFactory, _groupProviderFactory, _accessControlProviderFactory,
- _portFactory, _taskExecutor, entry.getStore(), _brokerOptions);
+ _logRecorder, _rootMessageLogger, _authenticationProviderFactory,_groupProviderFactory, _accessControlProviderFactory,
+ _portFactory , _preferencesProviderCreator, _taskExecutor, entry.getStore(), _brokerOptions);
- broker.addChangeListener(storeChangeListener);
+ broker.addChangeListener(_storeChangeListener);
//Recover the SSL keystores / truststores first, then others that depend on them
Map<String, Collection<ConfigurationEntry>> childEntries = new HashMap<String, Collection<ConfigurationEntry>>(entry.getChildren());
@@ -117,11 +122,11 @@ public class BrokerRecoverer implements
for (String type : priorityChildEntries.keySet())
{
- recoverType(recovererProvider, storeChangeListener, broker, priorityChildEntries, type);
+ recoverType(recovererProvider, _storeChangeListener, broker, priorityChildEntries, type);
}
for (String type : childEntries.keySet())
{
- recoverType(recovererProvider, storeChangeListener, broker, childEntries, type);
+ recoverType(recovererProvider, _storeChangeListener, broker, childEntries, type);
}
return broker;
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/DefaultRecovererProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/DefaultRecovererProvider.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/DefaultRecovererProvider.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/DefaultRecovererProvider.java Fri Sep 20 18:59:30 2013
@@ -32,12 +32,15 @@ import org.apache.qpid.server.model.Grou
import org.apache.qpid.server.model.KeyStore;
import org.apache.qpid.server.model.Plugin;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.PreferencesProvider;
import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.adapter.AccessControlProviderFactory;
import org.apache.qpid.server.model.adapter.AuthenticationProviderFactory;
import org.apache.qpid.server.model.adapter.GroupProviderFactory;
import org.apache.qpid.server.model.adapter.PortFactory;
+import org.apache.qpid.server.model.adapter.PreferencesProviderCreator;
+import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.plugin.AccessControlFactory;
import org.apache.qpid.server.plugin.AuthenticationManagerFactory;
@@ -61,11 +64,14 @@ public class DefaultRecovererProvider im
private final QpidServiceLoader<PluginFactory> _pluginFactoryServiceLoader;
private final TaskExecutor _taskExecutor;
private final BrokerOptions _brokerOptions;
+ private final StoreConfigurationChangeListener _storeChangeListener;
+ private final PreferencesProviderCreator _preferencesProviderCreator;
public DefaultRecovererProvider(StatisticsGatherer brokerStatisticsGatherer, VirtualHostRegistry virtualHostRegistry,
- LogRecorder logRecorder, RootMessageLogger rootMessageLogger, TaskExecutor taskExecutor, BrokerOptions brokerOptions)
+ LogRecorder logRecorder, RootMessageLogger rootMessageLogger, TaskExecutor taskExecutor, BrokerOptions brokerOptions, StoreConfigurationChangeListener storeChangeListener)
{
- _authenticationProviderFactory = new AuthenticationProviderFactory(new QpidServiceLoader<AuthenticationManagerFactory>());
+ _preferencesProviderCreator = new PreferencesProviderCreator();
+ _authenticationProviderFactory = new AuthenticationProviderFactory(new QpidServiceLoader<AuthenticationManagerFactory>(), _preferencesProviderCreator);
_accessControlProviderFactory = new AccessControlProviderFactory(new QpidServiceLoader<AccessControlFactory>());
_groupProviderFactory = new GroupProviderFactory(new QpidServiceLoader<GroupManagerFactory>());
_portFactory = new PortFactory();
@@ -76,6 +82,7 @@ public class DefaultRecovererProvider im
_pluginFactoryServiceLoader = new QpidServiceLoader<PluginFactory>();
_taskExecutor = taskExecutor;
_brokerOptions = brokerOptions;
+ _storeChangeListener = storeChangeListener;
}
@Override
@@ -83,8 +90,8 @@ public class DefaultRecovererProvider im
{
if (Broker.class.getSimpleName().equals(type))
{
- return new BrokerRecoverer(_authenticationProviderFactory, _groupProviderFactory, _accessControlProviderFactory, _portFactory, _brokerStatisticsGatherer,
- _virtualHostRegistry, _logRecorder, _rootMessageLogger, _taskExecutor, _brokerOptions);
+ return new BrokerRecoverer(_authenticationProviderFactory, _groupProviderFactory, _accessControlProviderFactory, _portFactory, _preferencesProviderCreator,
+ _brokerStatisticsGatherer, _virtualHostRegistry, _logRecorder, _rootMessageLogger, _taskExecutor, _brokerOptions, _storeChangeListener);
}
else if(VirtualHost.class.getSimpleName().equals(type))
{
@@ -96,7 +103,7 @@ public class DefaultRecovererProvider im
}
else if(AuthenticationProvider.class.getSimpleName().equals(type))
{
- return new AuthenticationProviderRecoverer(_authenticationProviderFactory);
+ return new AuthenticationProviderRecoverer(_authenticationProviderFactory, _storeChangeListener);
}
else if(Port.class.getSimpleName().equals(type))
{
@@ -114,6 +121,10 @@ public class DefaultRecovererProvider im
{
return new TrustStoreRecoverer();
}
+ else if(PreferencesProvider.class.getSimpleName().equals(type))
+ {
+ return new PreferencesProviderRecoverer(_preferencesProviderCreator);
+ }
else if(Plugin.class.getSimpleName().equals(type))
{
return new PluginRecoverer(_pluginFactoryServiceLoader);
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java Fri Sep 20 18:59:30 2013
@@ -180,7 +180,12 @@ public class MemoryConfigurationEntrySto
return _entries.get(id);
}
- @Override
+ /**
+ * Copies the store into the given location
+ *
+ * @param target location to copy store into
+ * @throws IllegalConfigurationException if store cannot be copied into given location
+ */
public void copyTo(String copyLocation)
{
File file = new File(copyLocation);
@@ -286,7 +291,7 @@ public class MemoryConfigurationEntrySto
+ " can not be loaded by store of version " + STORE_VERSION);
}
- ConfigurationEntry brokerEntry = toEntry(node, Broker.class, _entries);
+ ConfigurationEntry brokerEntry = toEntry(node, Broker.class, _entries, null);
_rootId = brokerEntry.getId();
}
catch (IOException e)
@@ -365,7 +370,7 @@ public class MemoryConfigurationEntrySto
byte[] bytes = json.getBytes("UTF-8");
bais = new ByteArrayInputStream(bytes);
JsonNode node = loadJsonNodes(bais, _objectMapper);
- ConfigurationEntry brokerEntry = toEntry(node, Broker.class, _entries);
+ ConfigurationEntry brokerEntry = toEntry(node, Broker.class, _entries, null);
_rootId = brokerEntry.getId();
}
catch(Exception e)
@@ -485,7 +490,7 @@ public class MemoryConfigurationEntrySto
return root;
}
- private ConfigurationEntry toEntry(JsonNode parent, Class<? extends ConfiguredObject> expectedConfiguredObjectClass, Map<UUID, ConfigurationEntry> entries)
+ private ConfigurationEntry toEntry(JsonNode parent, Class<? extends ConfiguredObject> expectedConfiguredObjectClass, Map<UUID, ConfigurationEntry> entries, Class<? extends ConfiguredObject> parentClass)
{
Map<String, Object> attributes = null;
Set<UUID> childrenIds = new TreeSet<UUID>();
@@ -515,8 +520,22 @@ public class MemoryConfigurationEntrySto
if (element.isObject())
{
Class<? extends ConfiguredObject> expectedChildConfiguredObjectClass = _relationshipClasses.get(fieldName);
+ if (expectedChildConfiguredObjectClass == null && expectedConfiguredObjectClass != null)
+ {
+ Collection<Class<? extends ConfiguredObject>> childTypes = Model.getInstance().getChildTypes(expectedConfiguredObjectClass);
+ for (Class<? extends ConfiguredObject> childType : childTypes)
+ {
+ String relationship = childType.getSimpleName().toLowerCase();
+ relationship += relationship.endsWith("s") ? "es": "s";
+ if (fieldName.equals(relationship))
+ {
+ expectedChildConfiguredObjectClass = childType;
+ break;
+ }
+ }
+ }
// assuming it is a child node
- ConfigurationEntry entry = toEntry(element, expectedChildConfiguredObjectClass, entries);
+ ConfigurationEntry entry = toEntry(element, expectedChildConfiguredObjectClass, entries, expectedConfiguredObjectClass);
childrenIds.add(entry.getId());
}
else
@@ -531,6 +550,10 @@ public class MemoryConfigurationEntrySto
if (fieldValues != null)
{
Object[] array = fieldValues.toArray(new Object[fieldValues.size()]);
+ if (attributes == null)
+ {
+ attributes = new HashMap<String, Object>();
+ }
attributes.put(fieldName, array);
}
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Sep 20 18:59:30 2013
@@ -26,7 +26,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -39,7 +38,6 @@ import org.apache.qpid.server.model.UUID
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -57,7 +55,7 @@ import java.util.concurrent.atomic.Atomi
public abstract class AbstractExchange implements Exchange
{
private static final Logger _logger = Logger.getLogger(AbstractExchange.class);
- private AMQShortString _name;
+ private String _name;
private final AtomicBoolean _closed = new AtomicBoolean();
private Exchange _alternateExchange;
@@ -98,19 +96,15 @@ public abstract class AbstractExchange i
_type = type;
}
- public AMQShortString getNameShortString()
- {
- return _name;
- }
-
- public final AMQShortString getTypeShortString()
+ @Override
+ public String getTypeName()
{
- return _type.getName();
+ return _type.getType();
}
public void initialise(UUID id,
VirtualHost host,
- AMQShortString name,
+ String name,
boolean durable,
boolean autoDelete)
throws AMQException
@@ -124,7 +118,7 @@ public abstract class AbstractExchange i
_logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
// Log Exchange creation
- CurrentActor.get().message(ExchangeMessages.CREATED(String.valueOf(getTypeShortString()), String.valueOf(name), durable));
+ CurrentActor.get().message(ExchangeMessages.CREATED(getType().getType(), name, durable));
}
public boolean isDurable()
@@ -159,7 +153,7 @@ public abstract class AbstractExchange i
public String toString()
{
- return getClass().getSimpleName() + "[" + getNameShortString() +"]";
+ return getClass().getSimpleName() + "[" + getName() +"]";
}
public VirtualHost getVirtualHost()
@@ -167,16 +161,6 @@ public abstract class AbstractExchange i
return _virtualHost;
}
- public QueueRegistry getQueueRegistry()
- {
- return getVirtualHost().getQueueRegistry();
- }
-
- public final boolean isBound(AMQShortString routingKey, FieldTable ft, AMQQueue queue)
- {
- return isBound(routingKey == null ? "" : routingKey.asString(), FieldTable.convertToMap(ft), queue);
- }
-
public final boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue)
{
for(Binding b : _bindings)
@@ -191,11 +175,6 @@ public abstract class AbstractExchange i
return false;
}
- public final boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- return isBound(routingKey==null ? "" : routingKey.asString(), queue);
- }
-
public final boolean isBound(String bindingKey, AMQQueue queue)
{
for(Binding b : _bindings)
@@ -208,11 +187,6 @@ public abstract class AbstractExchange i
return false;
}
- public final boolean isBound(AMQShortString routingKey)
- {
- return isBound(routingKey == null ? "" : routingKey.asString());
- }
-
public final boolean isBound(String bindingKey)
{
for(Binding b : _bindings)
@@ -420,7 +394,7 @@ public abstract class AbstractExchange i
}
if(_logger.isDebugEnabled())
{
- _logger.debug("Exchange: " + getName() + " - attempt to enqueue message onto deleted queue " + String.valueOf(q.getNameShortString()));
+ _logger.debug("Exchange: " + getName() + " - attempt to enqueue message onto deleted queue " + q.getName());
}
queues.remove(q);
}
@@ -536,7 +510,7 @@ public abstract class AbstractExchange i
// all operations on it to succeed. It is up to the broker to prevent illegal
// attempts at binding to this exchange, not the ACLs.
// Check access
- if (!_virtualHost.getSecurityManager().authoriseUnbind(this, new AMQShortString(bindingKey), queue))
+ if (!_virtualHost.getSecurityManager().authoriseUnbind(this, bindingKey, queue))
{
throw new AMQSecurityException("Permission denied: unbinding " + bindingKey);
}
@@ -601,7 +575,7 @@ public abstract class AbstractExchange i
}
//Perform ACLs
- if (!_virtualHost.getSecurityManager().authoriseBind(AbstractExchange.this, queue, new AMQShortString(bindingKey)))
+ if (!_virtualHost.getSecurityManager().authoriseBind(AbstractExchange.this, queue, bindingKey))
{
throw new AMQSecurityException("Permission denied: binding " + bindingKey);
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Fri Sep 20 18:59:30 2013
@@ -32,7 +32,6 @@ import org.apache.qpid.AMQInternalExcept
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -47,6 +46,7 @@ import org.apache.qpid.server.virtualhos
public class DefaultExchange implements Exchange
{
+ private final QueueRegistry _queueRegistry;
private UUID _id;
private VirtualHost _virtualHost;
private static final Logger _logger = Logger.getLogger(DefaultExchange.class);
@@ -55,11 +55,16 @@ public class DefaultExchange implements
private LogSubject _logSubject;
private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>();
+ public DefaultExchange(QueueRegistry queueRegistry)
+ {
+ _queueRegistry = queueRegistry;
+ }
+
@Override
public void initialise(UUID id,
VirtualHost host,
- AMQShortString name,
+ String name,
boolean durable,
boolean autoDelete) throws AMQException
{
@@ -70,7 +75,7 @@ public class DefaultExchange implements
@Override
public String getName()
{
- return ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString();
+ return ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
}
@Override
@@ -82,7 +87,7 @@ public class DefaultExchange implements
@Override
public long getBindingCount()
{
- return _virtualHost.getQueueRegistry().getQueues().size();
+ return _virtualHost.getQueues().size();
}
@Override
@@ -146,7 +151,7 @@ public class DefaultExchange implements
@Override
public Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
{
- if(_virtualHost.getQueueRegistry().getQueue(bindingKey) == queue && (arguments == null || arguments.isEmpty()))
+ if(_virtualHost.getQueue(bindingKey) == queue && (arguments == null || arguments.isEmpty()))
{
return convertToBinding(queue);
}
@@ -161,7 +166,7 @@ public class DefaultExchange implements
{
String queueName = queue.getName();
- UUID exchangeId = UUIDGenerator.generateBindingUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(),
+ UUID exchangeId = UUIDGenerator.generateBindingUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME,
queueName,
queueName,
_virtualHost.getName());
@@ -170,15 +175,9 @@ public class DefaultExchange implements
}
@Override
- public AMQShortString getNameShortString()
- {
- return AMQShortString.EMPTY_STRING;
- }
-
- @Override
- public AMQShortString getTypeShortString()
+ public String getTypeName()
{
- return getType().getName();
+ return getType().getType();
}
@Override
@@ -207,7 +206,7 @@ public class DefaultExchange implements
@Override
public List<AMQQueue> route(InboundMessage message)
{
- AMQQueue q = _virtualHost.getQueueRegistry().getQueue(message.getRoutingKey());
+ AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
if(q == null)
{
List<AMQQueue> noQueues = Collections.emptyList();
@@ -221,27 +220,9 @@ public class DefaultExchange implements
}
@Override
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- return isBound(routingKey, queue) && (arguments == null || arguments.isEmpty());
- }
-
- @Override
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- return isBound(routingKey) && isBound(queue) && queue.getNameShortString().equals(routingKey); //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public boolean isBound(AMQShortString routingKey)
- {
- return _virtualHost.getQueueRegistry().getQueue(routingKey) != null;
- }
-
- @Override
public boolean isBound(AMQQueue queue)
{
- return _virtualHost.getQueueRegistry().getQueue(queue.getName()) == queue;
+ return _virtualHost.getQueue(queue.getName()) == queue;
}
@Override
@@ -283,7 +264,7 @@ public class DefaultExchange implements
@Override
public boolean isBound(String bindingKey)
{
- return _virtualHost.getQueueRegistry().getQueue(bindingKey) != null;
+ return _virtualHost.getQueue(bindingKey) != null;
}
@Override
@@ -320,7 +301,7 @@ public class DefaultExchange implements
public Collection<Binding> getBindings()
{
List<Binding> bindings = new ArrayList<Binding>();
- for(AMQQueue q : _virtualHost.getQueueRegistry().getQueues())
+ for(AMQQueue q : _virtualHost.getQueues())
{
bindings.add(convertToBinding(q));
}
@@ -330,7 +311,7 @@ public class DefaultExchange implements
@Override
public void addBindingListener(BindingListener listener)
{
- _virtualHost.getQueueRegistry().addRegistryChangeListener(convertListener(listener));//To change body of implemented methods use File | Settings | File Templates.
+ _queueRegistry.addRegistryChangeListener(convertListener(listener));
}
private QueueRegistry.RegistryChangeListener convertListener(final BindingListener listener)
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Fri Sep 20 18:59:30 2013
@@ -44,14 +44,14 @@ public class DefaultExchangeFactory impl
private static final Logger LOGGER = Logger.getLogger(DefaultExchangeFactory.class);
- private static final AMQShortString[] BASE_EXCHANGE_TYPES =
- new AMQShortString[]{ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
- ExchangeDefaults.FANOUT_EXCHANGE_CLASS,
- ExchangeDefaults.HEADERS_EXCHANGE_CLASS,
- ExchangeDefaults.TOPIC_EXCHANGE_CLASS};
+ private static final String[] BASE_EXCHANGE_TYPES =
+ new String[]{ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
+ ExchangeDefaults.FANOUT_EXCHANGE_CLASS,
+ ExchangeDefaults.HEADERS_EXCHANGE_CLASS,
+ ExchangeDefaults.TOPIC_EXCHANGE_CLASS};
private final VirtualHost _host;
- private Map<AMQShortString, ExchangeType<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, ExchangeType<? extends Exchange>>();
+ private Map<String, ExchangeType<? extends Exchange>> _exchangeClassMap = new HashMap<String, ExchangeType<? extends Exchange>>();
public DefaultExchangeFactory(VirtualHost host)
{
@@ -61,7 +61,7 @@ public class DefaultExchangeFactory impl
Iterable<ExchangeType> exchangeTypes = loadExchangeTypes();
for (ExchangeType<?> exchangeType : exchangeTypes)
{
- AMQShortString typeName = exchangeType.getName();
+ String typeName = exchangeType.getType();
if(LOGGER.isDebugEnabled())
{
@@ -80,11 +80,11 @@ public class DefaultExchangeFactory impl
_exchangeClassMap.put(typeName, exchangeType);
}
- for(AMQShortString type : BASE_EXCHANGE_TYPES)
+ for(String type : BASE_EXCHANGE_TYPES)
{
if(!_exchangeClassMap.containsKey(type))
{
- throw new IllegalStateException("Did not find expected exchange type: " + type.asString());
+ throw new IllegalStateException("Did not find expected exchange type: " + type);
}
}
}
@@ -114,19 +114,12 @@ public class DefaultExchangeFactory impl
{
UUID id = UUIDGenerator.generateExchangeUUID(exchange, _host.getName());
- return createExchange(id, new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete);
+ return createExchange(id, exchange, type, durable, autoDelete);
}
public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete)
throws AMQException
{
- return createExchange(id, new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete);
- }
-
- private Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable,
- boolean autoDelete)
- throws AMQException
- {
// Check access
if (!_host.getSecurityManager().authoriseCreateExchange(autoDelete, durable, exchange, null, null, null, type))
{
@@ -143,4 +136,10 @@ public class DefaultExchangeFactory impl
Exchange e = exchType.newInstance(id, _host, exchange, durable, autoDelete);
return e;
}
+
+ @Override
+ public Exchange restoreExchange(UUID id, String exchange, String type, boolean autoDelete) throws AMQException
+ {
+ return createExchange(id, exchange, type, true, autoDelete);
+ }
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Fri Sep 20 18:59:30 2013
@@ -27,6 +27,7 @@ import org.apache.qpid.exchange.Exchange
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -40,20 +41,23 @@ import java.util.concurrent.ConcurrentMa
public class DefaultExchangeRegistry implements ExchangeRegistry
{
private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class);
-
/**
* Maps from exchange name to exchange instance
*/
private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>();
private Exchange _defaultExchange;
- private VirtualHost _host;
+
+ private final VirtualHost _host;
+ private final QueueRegistry _queueRegistry;
+
private final Collection<RegistryChangeListener> _listeners =
Collections.synchronizedCollection(new ArrayList<RegistryChangeListener>());
- public DefaultExchangeRegistry(VirtualHost host)
+ public DefaultExchangeRegistry(VirtualHost host, QueueRegistry queueRegistry)
{
_host = host;
+ _queueRegistry = queueRegistry;
}
public void initialise(ExchangeFactory exchangeFactory) throws AMQException
@@ -61,10 +65,10 @@ public class DefaultExchangeRegistry imp
//create 'standard' exchanges:
new ExchangeInitialiser().initialise(exchangeFactory, this, getDurableConfigurationStore());
- _defaultExchange = new DefaultExchange();
+ _defaultExchange = new DefaultExchange(_queueRegistry);
UUID defaultExchangeId =
- UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), _host.getName());
+ UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME, _host.getName());
_defaultExchange.initialise(defaultExchangeId, _host, ExchangeDefaults.DEFAULT_EXCHANGE_NAME,false, false);
@@ -77,7 +81,7 @@ public class DefaultExchangeRegistry imp
public void registerExchange(Exchange exchange) throws AMQException
{
- _exchangeMap.put(exchange.getNameShortString().toString(), exchange);
+ _exchangeMap.put(exchange.getName(), exchange);
synchronized (_listeners)
{
for(RegistryChangeListener listener : _listeners)
@@ -197,7 +201,7 @@ public class DefaultExchangeRegistry imp
public boolean isReservedExchangeName(String name)
{
- if (name == null || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString().equals(name)
+ if (name == null || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.equals(name)
|| name.startsWith("amq.") || name.startsWith("qpid."))
{
return true;
@@ -205,7 +209,7 @@ public class DefaultExchangeRegistry imp
Collection<ExchangeType<? extends Exchange>> registeredTypes = _host.getExchangeTypes();
for (ExchangeType<? extends Exchange> type : registeredTypes)
{
- if (type.getDefaultExchangeName().toString().equals(name))
+ if (type.getDefaultExchangeName().equals(name))
{
return true;
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Fri Sep 20 18:59:30 2013
@@ -180,10 +180,9 @@ public class DirectExchange extends Abst
{
String bindingKey = binding.getBindingKey();
AMQQueue queue = binding.getQueue();
- AMQShortString routingKey = AMQShortString.valueOf(bindingKey);
assert queue != null;
- assert routingKey != null;
+ assert bindingKey != null;
BindingSet bindings = _bindingsByKey.get(bindingKey);
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java Fri Sep 20 18:59:30 2013
@@ -33,16 +33,11 @@ public class DirectExchangeType implemen
@Override
public String getType()
{
- return getName().toString();
- }
-
- public AMQShortString getName()
- {
return ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
}
public DirectExchange newInstance(UUID id, VirtualHost host,
- AMQShortString name,
+ String name,
boolean durable,
boolean autoDelete) throws AMQException
{
@@ -51,7 +46,7 @@ public class DirectExchangeType implemen
return exch;
}
- public AMQShortString getDefaultExchangeName()
+ public String getDefaultExchangeName()
{
return ExchangeDefaults.DIRECT_EXCHANGE_NAME;
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Fri Sep 20 18:59:30 2013
@@ -23,8 +23,6 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.ExchangeType;
@@ -39,7 +37,7 @@ import java.util.UUID;
public interface Exchange extends ExchangeReferrer
{
- void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, boolean autoDelete)
+ void initialise(UUID id, VirtualHost host, String name, boolean durable, boolean autoDelete)
throws AMQException;
@@ -47,11 +45,9 @@ public interface Exchange extends Exchan
String getName();
- AMQShortString getNameShortString();
-
ExchangeType getType();
- AMQShortString getTypeShortString();
+ String getTypeName();
boolean isDurable();
@@ -107,30 +103,32 @@ public interface Exchange extends Exchan
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
- * @param routingKey
+ * @param bindingKey
* @param arguments
* @param queue
* @return
* @throws AMQException
*/
- boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue);
+
+ boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue);
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key
- * @param routingKey
+ * @param bindingKey
* @param queue
* @return
* @throws AMQException
*/
- boolean isBound(AMQShortString routingKey, AMQQueue queue);
+
+ boolean isBound(String bindingKey, AMQQueue queue);
/**
* Determines whether a message is routing to any queue using a specific _routing key
- * @param routingKey
+ * @param bindingKey
* @return
* @throws AMQException
*/
- boolean isBound(AMQShortString routingKey);
+ boolean isBound(String bindingKey);
/**
* Returns true if this exchange has at least one binding associated with it.
@@ -141,20 +139,14 @@ public interface Exchange extends Exchan
Collection<Binding> getBindings();
- boolean isBound(String bindingKey);
-
boolean isBound(AMQQueue queue);
boolean isBound(Map<String, Object> arguments);
- boolean isBound(String bindingKey, AMQQueue queue);
-
boolean isBound(String bindingKey, Map<String, Object> arguments);
boolean isBound(Map<String, Object> arguments, AMQQueue queue);
- boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue);
-
void removeReference(ExchangeReferrer exchange);
void addReference(ExchangeReferrer exchange);
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java Fri Sep 20 18:59:30 2013
@@ -38,5 +38,6 @@ public interface ExchangeFactory
Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException;
Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) throws AMQException;
+ Exchange restoreExchange(UUID id, String exchange, String type, boolean autoDelete) throws AMQException;
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java Fri Sep 20 18:59:30 2013
@@ -22,7 +22,6 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -33,7 +32,7 @@ public class ExchangeInitialiser
{
for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes())
{
- define (registry, factory, type.getDefaultExchangeName().toString(), type.getName().toString(), store);
+ define (registry, factory, type.getDefaultExchangeName(), type.getType(), store);
}
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java Fri Sep 20 18:59:30 2013
@@ -33,15 +33,10 @@ public class FanoutExchangeType implemen
@Override
public String getType()
{
- return getName().toString();
- }
-
- public AMQShortString getName()
- {
return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
}
- public FanoutExchange newInstance(UUID id, VirtualHost host, AMQShortString name,
+ public FanoutExchange newInstance(UUID id, VirtualHost host, String name,
boolean durable, boolean autoDelete)
throws AMQException
{
@@ -50,7 +45,7 @@ public class FanoutExchangeType implemen
return exch;
}
- public AMQShortString getDefaultExchangeName()
+ public String getDefaultExchangeName()
{
return ExchangeDefaults.FANOUT_EXCHANGE_NAME;
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java Fri Sep 20 18:59:30 2013
@@ -43,13 +43,6 @@ public class FilterSupport
private static final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache =
Collections.synchronizedMap(new WeakHashMap<String, WeakReference<JMSSelectorFilter>>());
- static MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException
- {
- final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
- return getMessageFilter(selectorString);
- }
-
-
static MessageFilter createJMSSelectorFilter(Map<String, Object> args) throws AMQInvalidArgumentException
{
final String selectorString = (String) args.get(AMQPFilterTypes.JMS_SELECTOR.toString());
@@ -85,18 +78,20 @@ public class FilterSupport
return selector;
}
- static boolean argumentsContainFilter(final FieldTable args)
+ public static boolean argumentsContainFilter(final Map<String, Object> args)
{
return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
}
- static boolean argumentsContainFilter(final Map<String, Object> args)
+ public static void removeFilters(final Map<String, Object> args)
{
- return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
+ args.remove(AMQPFilterTypes.JMS_SELECTOR.toString());
+ args.remove(AMQPFilterTypes.NO_LOCAL.toString());
}
+
static boolean argumentsContainNoLocal(final Map<String, Object> args)
{
return args != null
@@ -104,29 +99,12 @@ public class FilterSupport
&& Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.toString()));
}
-
- static boolean argumentsContainNoLocal(final FieldTable args)
- {
- return args != null
- && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue())
- && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue()));
- }
-
-
static boolean argumentsContainJMSSelector(final Map<String,Object> args)
{
return args != null && (args.get(AMQPFilterTypes.JMS_SELECTOR.toString()) instanceof String)
&& ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0;
}
-
- static boolean argumentsContainJMSSelector(final FieldTable args)
- {
- return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())
- && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0);
- }
-
-
static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
{
if(argumentsContainNoLocal(args))
@@ -145,24 +123,6 @@ public class FilterSupport
}
}
- static MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
- {
- if(argumentsContainNoLocal(args))
- {
- MessageFilter filter = new NoLocalFilter(queue);
-
- if(argumentsContainJMSSelector(args))
- {
- filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
- }
- return filter;
- }
- else
- {
- return createJMSSelectorFilter(args);
- }
- }
-
static final class NoLocalFilter implements MessageFilter
{
private final AMQQueue _queue;
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Fri Sep 20 18:59:30 2013
@@ -89,7 +89,7 @@ public class HeadersExchange extends Abs
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + payload.getMessageHeader());
+ _logger.debug("Exchange " + getName() + ": routing message with headers " + payload.getMessageHeader());
}
LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>();
@@ -104,8 +104,8 @@ public class HeadersExchange extends Abs
if (_logger.isDebugEnabled())
{
- _logger.debug("Exchange " + getNameShortString() + ": delivering message with headers " +
- payload.getMessageHeader() + " to " + b.getQueue().getNameShortString());
+ _logger.debug("Exchange " + getName() + ": delivering message with headers " +
+ payload.getMessageHeader() + " to " + b.getQueue().getName());
}
queues.add(b.getQueue());
}
@@ -118,11 +118,10 @@ public class HeadersExchange extends Abs
{
String bindingKey = binding.getBindingKey();
AMQQueue queue = binding.getQueue();
- AMQShortString routingKey = AMQShortString.valueOf(bindingKey);
Map<String,Object> args = binding.getArguments();
assert queue != null;
- assert routingKey != null;
+ assert bindingKey != null;
CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
@@ -138,7 +137,7 @@ public class HeadersExchange extends Abs
if(_logger.isDebugEnabled())
{
- _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() +
+ _logger.debug("Exchange " + getName() + ": Binding " + queue.getName() +
" with binding key '" +bindingKey + "' and args: " + args);
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java Fri Sep 20 18:59:30 2013
@@ -33,15 +33,10 @@ public class HeadersExchangeType impleme
@Override
public String getType()
{
- return getName().toString();
- }
-
- public AMQShortString getName()
- {
return ExchangeDefaults.HEADERS_EXCHANGE_CLASS;
}
- public HeadersExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable,
+ public HeadersExchange newInstance(UUID id, VirtualHost host, String name, boolean durable,
boolean autoDelete) throws AMQException
{
HeadersExchange exch = new HeadersExchange();
@@ -50,7 +45,7 @@ public class HeadersExchangeType impleme
return exch;
}
- public AMQShortString getDefaultExchangeName()
+ public String getDefaultExchangeName()
{
return ExchangeDefaults.HEADERS_EXCHANGE_NAME;
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Fri Sep 20 18:59:30 2013
@@ -50,10 +50,10 @@ public class TopicExchange extends Abstr
private final TopicParser _parser = new TopicParser();
- private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
- new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
+ private final Map<String, TopicExchangeResult> _topicExchangeResults =
+ new ConcurrentHashMap<String, TopicExchangeResult>();
- private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
+ private final Map<Binding, Map<String,Object>> _bindings = new HashMap<Binding, Map<String,Object>>();
public TopicExchange()
{
@@ -62,21 +62,21 @@ public class TopicExchange extends Abstr
protected synchronized void registerQueue(final Binding binding) throws AMQInvalidArgumentException
{
- AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ;
+ final String bindingKey = binding.getBindingKey();
AMQQueue queue = binding.getQueue();
- FieldTable args = FieldTable.convertToFieldTable(binding.getArguments());
+ Map<String,Object> args = binding.getArguments();
assert queue != null;
- assert rKey != null;
+ assert bindingKey != null;
- _logger.debug("Registering queue " + queue.getNameShortString() + " with routing key " + rKey);
+ _logger.debug("Registering queue " + queue.getName() + " with routing key " + bindingKey);
- AMQShortString routingKey = TopicNormalizer.normalize(rKey);
+ String routingKey = TopicNormalizer.normalize(bindingKey);
if(_bindings.containsKey(binding))
{
- FieldTable oldArgs = _bindings.get(binding);
+ Map<String,Object> oldArgs = _bindings.get(binding);
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
if(FilterSupport.argumentsContainFilter(args))
@@ -150,9 +150,9 @@ public class TopicExchange extends Abstr
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
- final AMQShortString routingKey = payload.getRoutingKeyShortString() == null
- ? AMQShortString.EMPTY_STRING
- : payload.getRoutingKeyShortString();
+ final String routingKey = payload.getRoutingKey() == null
+ ? ""
+ : payload.getRoutingKey();
final Collection<AMQQueue> matchedQueues = getMatchedQueues(payload, routingKey);
@@ -181,8 +181,8 @@ public class TopicExchange extends Abstr
{
if(_bindings.containsKey(binding))
{
- FieldTable bindingArgs = _bindings.remove(binding);
- AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
+ Map<String,Object> bindingArgs = _bindings.remove(binding);
+ String bindingKey = TopicNormalizer.normalize(binding.getBindingKey());
TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
result.removeBinding(binding);
@@ -211,7 +211,7 @@ public class TopicExchange extends Abstr
}
}
- private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey)
+ private Collection<AMQQueue> getMatchedQueues(InboundMessage message, String routingKey)
{
Collection<TopicMatcherResult> results = _parser.parse(routingKey);
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java Fri Sep 20 18:59:30 2013
@@ -33,16 +33,11 @@ public class TopicExchangeType implement
@Override
public String getType()
{
- return getName().toString();
- }
-
- public AMQShortString getName()
- {
return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
}
public TopicExchange newInstance(UUID id, VirtualHost host,
- AMQShortString name,
+ String name,
boolean durable,
boolean autoDelete) throws AMQException
{
@@ -51,7 +46,7 @@ public class TopicExchangeType implement
return exch;
}
- public AMQShortString getDefaultExchangeName()
+ public String getDefaultExchangeName()
{
return ExchangeDefaults.TOPIC_EXCHANGE_NAME;
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java Fri Sep 20 18:59:30 2013
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.server.exchange.topic;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.AMQShortStringTokenizer;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.ArrayList;
import java.util.Collection;
@@ -44,7 +44,7 @@ public class TopicMatcherDFAState
private final Collection<TopicMatcherResult> _results;
private final Map<TopicWord, TopicMatcherDFAState> _nextStateMap;
- private static final byte TOPIC_DELIMITTER = (byte)'.';
+ private static final String TOPIC_DELIMITTER = "\\.";
public TopicMatcherDFAState(Map<TopicWord, TopicMatcherDFAState> nextStateMap,
@@ -67,19 +67,19 @@ public class TopicMatcherDFAState
}
- public Collection<TopicMatcherResult> parse(TopicWordDictionary dictionary, AMQShortString routingKey)
+ public Collection<TopicMatcherResult> parse(TopicWordDictionary dictionary, String routingKey)
{
- return parse(dictionary, routingKey.tokenize(TOPIC_DELIMITTER));
+ return parse(dictionary, Arrays.asList(routingKey.split(TOPIC_DELIMITTER)).iterator());
}
private Collection<TopicMatcherResult> parse(final TopicWordDictionary dictionary,
- final AMQShortStringTokenizer tokens)
+ final Iterator<String> tokens)
{
- if(!tokens.hasMoreTokens())
+ if(!tokens.hasNext())
{
return _results;
}
- TopicWord word = dictionary.getWord(tokens.nextToken());
+ TopicWord word = dictionary.getWord(tokens.next());
TopicMatcherDFAState nextState = _nextStateMap.get(word);
if(nextState == null && word != TopicWord.ANY_WORD)
{
@@ -96,7 +96,7 @@ public class TopicMatcherDFAState
}
return nextState.parse(dictionary, tokens);
-
+
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicNormalizer.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicNormalizer.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicNormalizer.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicNormalizer.java Fri Sep 20 18:59:30 2013
@@ -20,46 +20,36 @@
*/
package org.apache.qpid.server.exchange.topic;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.AMQShortStringTokenizer;
-
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.ArrayList;
import java.util.List;
public class TopicNormalizer
{
- private static final byte TOPIC_SEPARATOR = (byte)'.';
- private static final byte HASH_BYTE = (byte)'#';
- private static final byte STAR_BYTE = (byte)'*';
-
- private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString(".");
- private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*");
- private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#");
+
+ private static final String STAR_TOKEN = "*";
+ private static final String HASH_TOKEN = "#";
+ private static final String SEPARATOR = ".";
+
private TopicNormalizer()
{
}
- public static AMQShortString normalize(AMQShortString routingKey)
+ public static String normalize(String routingKey)
{
if(routingKey == null)
{
- return AMQShortString.EMPTY_STRING;
+ return "";
}
- else if(!(routingKey.contains(HASH_BYTE) || routingKey.contains(STAR_BYTE)))
+ else if(!(routingKey.contains(HASH_TOKEN) || !routingKey.contains(STAR_TOKEN)))
{
return routingKey;
}
else
{
- AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
-
- List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>();
-
- while (routingTokens.hasMoreTokens())
- {
- subscriptionList.add(routingTokens.nextToken());
- }
+ List<String> subscriptionList = new ArrayList<String>(Arrays.asList(routingKey.split("\\.")));
int size = subscriptionList.size();
@@ -68,9 +58,9 @@ public class TopicNormalizer
// if there are more levels
if ((index + 1) < size)
{
- if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN))
+ if (subscriptionList.get(index).equals(HASH_TOKEN))
{
- if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN))
+ if (subscriptionList.get(index + 1).equals(HASH_TOKEN))
{
// we don't need #.# delete this one
subscriptionList.remove(index);
@@ -79,7 +69,7 @@ public class TopicNormalizer
index--;
}
- if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN))
+ if (subscriptionList.get(index + 1).equals(STAR_TOKEN))
{
// we don't want #.* swap to *.#
// remove it and put it in at index + 1
@@ -89,11 +79,14 @@ public class TopicNormalizer
} // if we have more levels
}
-
-
- AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING);
-
- return normalizedString;
+ Iterator<String> iter = subscriptionList.iterator();
+ StringBuilder builder = new StringBuilder(iter.next());
+ while(iter.hasNext())
+ {
+ builder.append(SEPARATOR).append(iter.next());
+ }
+ return builder.toString();
}
}
+
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java Fri Sep 20 18:59:30 2013
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.server.exchange.topic;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.AMQShortStringTokenizer;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -35,7 +32,7 @@ import java.util.concurrent.atomic.Atomi
public class TopicParser
{
- private static final byte TOPIC_DELIMITER = (byte)'.';
+ private static final String TOPIC_DELIMITER = "\\.";
private final TopicWordDictionary _dictionary = new TopicWordDictionary();
private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<TopicMatcherDFAState>();
@@ -98,7 +95,7 @@ public class TopicParser
}
- public void addBinding(AMQShortString bindingKey, TopicMatcherResult result)
+ public void addBinding(String bindingKey, TopicMatcherResult result)
{
TopicMatcherDFAState startingStateMachine;
@@ -121,7 +118,7 @@ public class TopicParser
}
- public Collection<TopicMatcherResult> parse(AMQShortString routingKey)
+ public Collection<TopicMatcherResult> parse(String routingKey)
{
TopicMatcherDFAState stateMachine = _stateMachine.get();
if(stateMachine == null)
@@ -135,7 +132,7 @@ public class TopicParser
}
- TopicMatcherDFAState createStateMachine(AMQShortString bindingKey, TopicMatcherResult result)
+ TopicMatcherDFAState createStateMachine(String bindingKey, TopicMatcherResult result)
{
List<TopicWord> wordList = createTopicWordList(bindingKey);
int wildCards = 0;
@@ -422,16 +419,16 @@ public class TopicParser
}
- private List<TopicWord> createTopicWordList(final AMQShortString bindingKey)
+ private List<TopicWord> createTopicWordList(final String bindingKey)
{
- AMQShortStringTokenizer tokens = bindingKey.tokenize(TOPIC_DELIMITER);
+ String[] tokens = bindingKey.split(TOPIC_DELIMITER);
TopicWord previousWord = null;
List<TopicWord> wordList = new ArrayList<TopicWord>();
- while(tokens.hasMoreTokens())
+ for(String token : tokens)
{
- TopicWord nextWord = _dictionary.getOrCreateWord(tokens.nextToken());
+ TopicWord nextWord = _dictionary.getOrCreateWord(token);
if(previousWord == TopicWord.WILDCARD_WORD)
{
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java Fri Sep 20 18:59:30 2013
@@ -21,29 +21,17 @@
package org.apache.qpid.server.exchange.topic;
-import org.apache.qpid.framing.AMQShortString;
-
public final class TopicWord
{
public static final TopicWord ANY_WORD = new TopicWord("*");
public static final TopicWord WILDCARD_WORD = new TopicWord("#");
private String _word;
- public TopicWord()
- {
-
- }
-
public TopicWord(String s)
{
_word = s;
}
- public TopicWord(final AMQShortString name)
- {
- _word = name.toString();
- }
-
public String toString()
{
return _word;
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java Fri Sep 20 18:59:30 2013
@@ -20,27 +20,20 @@
*/
package org.apache.qpid.server.exchange.topic;
-import org.apache.qpid.framing.AMQShortString;
-
import java.util.concurrent.ConcurrentHashMap;
public class TopicWordDictionary
{
- private final ConcurrentHashMap<AMQShortString,TopicWord> _dictionary =
- new ConcurrentHashMap<AMQShortString,TopicWord>();
-
-
+ private final ConcurrentHashMap<String,TopicWord> _dictionary =
+ new ConcurrentHashMap<String,TopicWord>();
public TopicWordDictionary()
{
- _dictionary.put(new AMQShortString("*"), TopicWord.ANY_WORD);
- _dictionary.put(new AMQShortString("#"), TopicWord.WILDCARD_WORD);
+ _dictionary.put("*", TopicWord.ANY_WORD);
+ _dictionary.put("#", TopicWord.WILDCARD_WORD);
}
-
-
-
- public TopicWord getOrCreateWord(AMQShortString name)
+ public TopicWord getOrCreateWord(String name)
{
TopicWord word = _dictionary.putIfAbsent(name, new TopicWord(name));
if(word == null)
@@ -51,7 +44,7 @@ public class TopicWordDictionary
}
- public TopicWord getWord(AMQShortString name)
+ public TopicWord getWord(String name)
{
TopicWord word = _dictionary.get(name);
if(word == null)
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java Fri Sep 20 18:59:30 2013
@@ -14,9 +14,9 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.filter;
@@ -35,7 +35,7 @@ import java.util.Map;
public class FilterManagerFactory
{
-
+
private final static Logger _logger = Logger.getLogger(FilterManagerFactory.class);
private FilterManagerFactory()
@@ -44,25 +44,23 @@ public class FilterManagerFactory
//fixme move to a common class so it can be refered to from client code.
- public static FilterManager createManager(FieldTable filters) throws AMQException
+ public static FilterManager createManager(Map<String,Object> filters) throws AMQException
{
FilterManager manager = null;
if (filters != null)
{
-
-
- if(filters.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()))
+ if(filters.containsKey(AMQPFilterTypes.JMS_SELECTOR.toString()))
{
- String selector = filters.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
+ Object selector = filters.get(AMQPFilterTypes.JMS_SELECTOR.toString());
- if (selector != null && !selector.equals(""))
+ if (selector instanceof String && !selector.equals(""))
{
manager = new SimpleFilterManager();
try
{
- manager.add(new JMSSelectorFilter(selector));
+ manager.add(new JMSSelectorFilter((String)selector));
}
catch (ParseException e)
{
@@ -91,9 +89,5 @@ public class FilterManagerFactory
return manager;
}
-
- public static FilterManager createManager(Map<String,Object> map) throws AMQException
- {
- return createManager(FieldTable.convertToFieldTable(map));
- }
+
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogRecorder.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogRecorder.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogRecorder.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogRecorder.java Fri Sep 20 18:59:30 2013
@@ -25,15 +25,17 @@ import org.apache.log4j.spi.ErrorHandler
import org.apache.log4j.spi.Filter;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.log4j.spi.ThrowableInformation;
+import org.apache.qpid.server.configuration.BrokerProperties;
public class LogRecorder implements Appender, Iterable<LogRecorder.Record>
{
+ private static final int DEFAULT_BUFFER_SIZE = 4096;
private ErrorHandler _errorHandler;
private Filter _filter;
private String _name;
private long _recordId;
- private final int _bufferSize = 4096;
+ private final int _bufferSize = Integer.getInteger(BrokerProperties.PROPERTY_LOG_RECORDS_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
private final int _mask = _bufferSize - 1;
private Record[] _records = new Record[_bufferSize];
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java Fri Sep 20 18:59:30 2013
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.logging.actors;
-import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
@@ -45,7 +45,7 @@ public class AMQPChannelActor extends Ab
* @param channel The Channel for this LogActor
* @param rootLogger The root Logger that this LogActor should use
*/
- public AMQPChannelActor(AMQChannel channel, RootMessageLogger rootLogger)
+ public AMQPChannelActor(AMQSessionModel channel, RootMessageLogger rootLogger)
{
super(rootLogger);
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java Fri Sep 20 18:59:30 2013
@@ -14,16 +14,15 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.logging.actors;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-
+import org.apache.qpid.server.protocol.AMQConnectionModel;
/**
@@ -39,7 +38,7 @@ public class AMQPConnectionActor extends
{
private ConnectionLogSubject _logSubject;
- public AMQPConnectionActor(AMQProtocolSession session, RootMessageLogger rootLogger)
+ public AMQPConnectionActor(AMQConnectionModel session, RootMessageLogger rootLogger)
{
super(rootLogger);
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java Fri Sep 20 18:59:30 2013
@@ -42,9 +42,9 @@ public class BindingLogSubject extends A
{
setLogStringWithFormat(BINDING_FORMAT,
queue.getVirtualHost().getName(),
- exchange.getTypeShortString(),
- exchange.getNameShortString(),
- queue.getNameShortString(),
+ exchange.getType().getType(),
+ exchange.getName(),
+ queue.getName(),
routingKey);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org