You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/03/03 15:56:42 UTC
svn commit: r1663717 [2/6] - in
/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java: ./
amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/
amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/
amqp-1-0-common/src/main/java/org/a...
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java Tue Mar 3 14:56:40 2015
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.binding;
-import java.security.AccessControlException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -45,10 +44,8 @@ import org.apache.qpid.server.model.Mana
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.util.StateChangeListener;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class BindingImpl
extends AbstractConfiguredObject<BindingImpl>
@@ -108,26 +105,6 @@ public class BindingImpl
}
}
- @Override
- protected void onCreate()
- {
- super.onCreate();
- try
- {
- _queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this);
- }
- catch(AccessControlException e)
- {
- deleted();
- throw e;
- }
- if (isDurable())
- {
- _queue.getVirtualHost().getDurableConfigurationStore().create(asObjectRecord());
- }
-
- }
-
private static Map<String, Object> enhanceWithDurable(Map<String, Object> attributes,
final AMQQueue queue,
final ExchangeImpl exchange)
@@ -263,12 +240,6 @@ public class BindingImpl
{
_arguments = arguments;
BindingImpl.super.setAttribute(ARGUMENTS, getActualAttributes().get(ARGUMENTS), arguments);
- if (isDurable())
- {
- VirtualHostImpl<?, ?, ?> vhost =
- (VirtualHostImpl<?, ?, ?>) _exchange.getParent(VirtualHost.class);
- vhost.getDurableConfigurationStore().update(true, asObjectRecord());
- }
}
}
);
@@ -278,6 +249,8 @@ public class BindingImpl
@Override
public void validateOnCreate()
{
+ _queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this);
+
AMQQueue queue = getAMQQueue();
Map<String, Object> arguments = getArguments();
if (arguments!=null && !arguments.isEmpty() && FilterSupport.argumentsContainFilter(arguments))
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java Tue Mar 3 14:56:40 2015
@@ -48,6 +48,7 @@ public class BrokerProperties
public static final String PROPERTY_QPID_HOME = "QPID_HOME";
public static final String PROPERTY_QPID_WORK = "QPID_WORK";
public static final String PROPERTY_LOG_RECORDS_BUFFER_SIZE = "qpid.broker_log_records_buffer_size";
+ public static final String POSIX_FILE_PERMISSIONS = "qpid.default_posix_file_permissions";
private BrokerProperties()
{
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java Tue Mar 3 14:56:40 2015
@@ -25,7 +25,6 @@ import java.util.Collection;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.DurableConfigurationStore;
public class StoreConfigurationChangeListener implements ConfigurationChangeListener
@@ -43,7 +42,10 @@ public class StoreConfigurationChangeLis
{
if (newState == State.DELETED)
{
- _store.remove(object.asObjectRecord());
+ if(object.isDurable())
+ {
+ _store.remove(object.asObjectRecord());
+ }
object.removeChangeListener(this);
}
}
@@ -51,20 +53,23 @@ public class StoreConfigurationChangeLis
@Override
public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child)
{
- // exclude VirtualHostNode children from storing in broker store
- if (!(object instanceof VirtualHostNode))
+ if (!object.managesChildStorage())
{
- child.addChangeListener(this);
- _store.update(true,child.asObjectRecord());
+ if(object.isDurable() && child.isDurable())
+ {
+ child.addChangeListener(this);
+ _store.update(true, child.asObjectRecord());
- Class<? extends ConfiguredObject> categoryClass = child.getCategoryClass();
- Collection<Class<? extends ConfiguredObject>> childTypes = child.getModel().getChildTypes(categoryClass);
+ Class<? extends ConfiguredObject> categoryClass = child.getCategoryClass();
+ Collection<Class<? extends ConfiguredObject>> childTypes =
+ child.getModel().getChildTypes(categoryClass);
- for(Class<? extends ConfiguredObject> childClass : childTypes)
- {
- for (ConfiguredObject<?> grandchild : child.getChildren(childClass))
+ for (Class<? extends ConfiguredObject> childClass : childTypes)
{
- childAdded(child, grandchild);
+ for (ConfiguredObject<?> grandchild : child.getChildren(childClass))
+ {
+ childAdded(child, grandchild);
+ }
}
}
}
@@ -74,14 +79,20 @@ public class StoreConfigurationChangeLis
@Override
public void childRemoved(ConfiguredObject object, ConfiguredObject child)
{
- _store.remove(child.asObjectRecord());
+ if(child.isDurable())
+ {
+ _store.remove(child.asObjectRecord());
+ }
child.removeChangeListener(this);
}
@Override
public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
{
- _store.update(false, object.asObjectRecord());
+ if(object.isDurable())
+ {
+ _store.update(false, object.asObjectRecord());
+ }
}
@Override
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Tue Mar 3 14:56:40 2015
@@ -177,17 +177,6 @@ public abstract class AbstractExchange<T
}
@Override
- protected void onCreate()
- {
- super.onCreate();
- if(isDurable())
- {
- getVirtualHost().getDurableConfigurationStore().create(asObjectRecord());
- }
-
- }
-
- @Override
public EventLogger getEventLogger()
{
return _virtualHost.getEventLogger();
@@ -213,12 +202,6 @@ public abstract class AbstractExchange<T
throw new RequiredExchangeException(getName());
}
- if (isDurable() && !isAutoDelete())
- {
- getVirtualHost().getDurableConfigurationStore().remove(asObjectRecord());
-
- }
-
if(_closed.compareAndSet(false,true))
{
List<BindingImpl> bindings = new ArrayList<BindingImpl>(_bindings);
@@ -241,11 +224,6 @@ public abstract class AbstractExchange<T
}
_closeTaskList.clear();
- if (isDurable() && !isAutoDelete())
- {
- getVirtualHost().getDurableConfigurationStore().remove(asObjectRecord());
-
- }
}
deleted();
}
@@ -665,10 +643,6 @@ public abstract class AbstractExchange<T
doRemoveBinding(b);
queue.removeBinding(b);
- if (b.isDurable())
- {
- _virtualHost.getDurableConfigurationStore().remove(b.asObjectRecord());
- }
b.delete();
}
@@ -905,10 +879,6 @@ public abstract class AbstractExchange<T
protected void changeAttributes(final Map<String, Object> attributes)
{
super.changeAttributes(attributes);
- if (isDurable() && getState() != State.DELETED)
- {
- this.getVirtualHost().getDurableConfigurationStore().update(false, asObjectRecord());
- }
}
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java Tue Mar 3 14:56:40 2015
@@ -62,7 +62,8 @@ public class DefaultDestination implemen
final AMQQueue q = _virtualHost.getQueue(routingAddress);
if(q == null)
{
- if(routingAddress != null && routingAddress.contains("/") && !routingAddress.startsWith("/"))
+ routingAddress = _virtualHost.getLocalAddress(routingAddress);
+ if(routingAddress.contains("/") && !routingAddress.startsWith("/"))
{
String[] parts = routingAddress.split("/",2);
ExchangeImpl exchange = _virtualHost.getExchange(parts[0]);
@@ -71,7 +72,7 @@ public class DefaultDestination implemen
return exchange.send(message, parts[1], instanceProperties, txn, postEnqueueAction);
}
}
- else if(routingAddress == null || !routingAddress.contains("/"))
+ else if(!routingAddress.contains("/"))
{
ExchangeImpl exchange = _virtualHost.getExchange(routingAddress);
if(exchange != null)
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Tue Mar 3 14:56:40 2015
@@ -87,6 +87,12 @@ class HeadersBinding
+"' with arguments: " + _binding.getArguments());
_filter = new MessageFilter()
{
+ @Override
+ public String getName()
+ {
+ return "";
+ }
+
@Override
public boolean matches(Filterable message)
{
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java Tue Mar 3 14:56:40 2015
@@ -14,26 +14,62 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
*
+ *
*/
package org.apache.qpid.server.filter;
-//
-// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
-//
import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
-public interface FilterManager
+public class FilterManager
{
- void add(MessageFilter filter);
- void remove(MessageFilter filter);
+ private final Map<String, MessageFilter> _filters = new ConcurrentHashMap<>();
- boolean allAllow(Filterable msg);
+ public FilterManager()
+ {
+ }
+
+ public void add(String name, MessageFilter filter)
+ {
+ _filters.put(name, filter);
+ }
+
+ public boolean allAllow(Filterable msg)
+ {
+ for (MessageFilter filter : _filters.values())
+ {
+ if (!filter.matches(msg))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Iterator<MessageFilter> filters()
+ {
+ return _filters.values().iterator();
+ }
+
+ public boolean hasFilters()
+ {
+ return !_filters.isEmpty();
+ }
+
+ public boolean hasFilter(final String name)
+ {
+ return _filters.containsKey(name);
+ }
+
+ @Override
+ public String toString()
+ {
+ return _filters.toString();
+ }
- Iterator<MessageFilter> filters();
- boolean hasFilters();
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java Tue Mar 3 14:56:40 2015
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.filter;
+import java.util.Map;
+
import org.apache.log4j.Logger;
import org.apache.qpid.common.AMQPFilterTypes;
@@ -27,8 +29,6 @@ import org.apache.qpid.filter.SelectorPa
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
-import java.util.Map;
-
public class FilterManagerFactory
{
@@ -54,20 +54,13 @@ public class FilterManagerFactory
if (selector instanceof String && !selector.equals(""))
{
- manager = new SimpleFilterManager();
+ manager = new FilterManager();
try
{
- manager.add(new JMSSelectorFilter((String)selector));
- }
- catch (ParseException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e);
- }
- catch (SelectorParsingException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e);
+ MessageFilter filter = new JMSSelectorFilter((String)selector);
+ manager.add(filter.getName(), filter);
}
- catch (TokenMgrError e)
+ catch (ParseException | SelectorParsingException | TokenMgrError e)
{
throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e);
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java Tue Mar 3 14:56:40 2015
@@ -26,12 +26,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;
+
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.queue.AMQQueue;
public class FilterSupport
@@ -57,15 +59,7 @@ public class FilterSupport
{
selector = new JMSSelectorFilter(selectorString);
}
- catch (ParseException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- catch (SelectorParsingException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- catch (TokenMgrError e)
+ catch (ParseException | SelectorParsingException | TokenMgrError e)
{
throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
}
@@ -119,6 +113,7 @@ public class FilterSupport
}
}
+ @PluggableService
public static final class NoLocalFilter implements MessageFilter
{
private final MessageSource _queue;
@@ -128,6 +123,12 @@ public class FilterSupport
_queue = queue;
}
+ @Override
+ public String getName()
+ {
+ return AMQPFilterTypes.NO_LOCAL.toString();
+ }
+
public boolean matches(Filterable message)
{
@@ -165,6 +166,8 @@ public class FilterSupport
{
return _queue != null ? _queue.hashCode() : 0;
}
+
+
}
static final class CompoundFilter implements MessageFilter
@@ -178,6 +181,12 @@ public class FilterSupport
_jmsSelectorFilter = jmsSelectorFilter;
}
+ @Override
+ public String getName()
+ {
+ return "";
+ }
+
public boolean matches(Filterable message)
{
return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
@@ -216,5 +225,7 @@ public class FilterSupport
result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
return result;
}
+
+
}
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java Tue Mar 3 14:56:40 2015
@@ -34,6 +34,10 @@ public interface Filterable
Object getConnectionReference();
+ long getMessageNumber();
+
+ long getArrivalTime();
+
public class Factory
{
@@ -41,6 +45,7 @@ public interface Filterable
{
return new Filterable()
{
+
@Override
public AMQMessageHeader getMessageHeader()
{
@@ -64,6 +69,18 @@ public interface Filterable
{
return message.getConnectionReference();
}
+
+ @Override
+ public long getMessageNumber()
+ {
+ return message.getMessageNumber();
+ }
+
+ @Override
+ public long getArrivalTime()
+ {
+ return message.getArrivalTime();
+ }
};
}
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Tue Mar 3 14:56:40 2015
@@ -25,14 +25,18 @@ import org.apache.commons.lang.builder.H
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.log4j.Logger;
+
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.BooleanExpression;
import org.apache.qpid.filter.FilterableMessage;
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.SelectorParser;
import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.plugin.PluggableService;
+@PluggableService
public class JMSSelectorFilter implements MessageFilter
{
private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
@@ -46,6 +50,12 @@ public class JMSSelectorFilter implement
_matcher = new SelectorParser().parse(selector);
}
+ @Override
+ public String getName()
+ {
+ return AMQPFilterTypes.JMS_SELECTOR.toString();
+ }
+
public boolean matches(Filterable message)
{
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java Tue Mar 3 14:56:40 2015
@@ -22,5 +22,6 @@ package org.apache.qpid.server.filter;
public interface MessageFilter
{
+ String getName();
boolean matches(Filterable message);
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Tue Mar 3 14:56:40 2015
@@ -122,8 +122,11 @@ public abstract class AbstractConfigured
private final TaskExecutor _taskExecutor;
private final Class<? extends ConfiguredObject> _category;
+ private final Class<? extends ConfiguredObject> _typeClass;
private final Class<? extends ConfiguredObject> _bestFitInterface;
private final Model _model;
+ private final boolean _managesChildStorage;
+
@ManagedAttributeField
private long _createdTime;
@@ -206,6 +209,8 @@ public abstract class AbstractConfigured
_model = model;
_category = ConfiguredObjectTypeRegistry.getCategory(getClass());
+ Class<? extends ConfiguredObject> typeClass = model.getTypeRegistry().getTypeClass(getClass());
+ _typeClass = typeClass == null ? _category : typeClass;
_attributeTypes = model.getTypeRegistry().getAttributeTypes(getClass());
_automatedFields = model.getTypeRegistry().getAutomatedFields(getClass());
@@ -242,6 +247,7 @@ public abstract class AbstractConfigured
}
_type = ConfiguredObjectTypeRegistry.getType(getClass());
+ _managesChildStorage = managesChildren(_category) || managesChildren(_typeClass);
_bestFitInterface = calculateBestFitInterface();
if(attributes.get(TYPE) != null && !_type.equals(attributes.get(TYPE)))
@@ -315,6 +321,11 @@ public abstract class AbstractConfigured
}
}
+ private boolean managesChildren(final Class<? extends ConfiguredObject> clazz)
+ {
+ return clazz.getAnnotation(ManagedObject.class).managesChildren();
+ }
+
private Class<? extends ConfiguredObject> calculateBestFitInterface()
{
Set<Class<? extends ConfiguredObject>> candidates = new HashSet<Class<? extends ConfiguredObject>>();
@@ -1056,11 +1067,24 @@ public abstract class AbstractConfigured
return _model;
}
+ @Override
public Class<? extends ConfiguredObject> getCategoryClass()
{
return _category;
}
+ @Override
+ public Class<? extends ConfiguredObject> getTypeClass()
+ {
+ return _typeClass;
+ }
+
+ @Override
+ public boolean managesChildStorage()
+ {
+ return _managesChildStorage;
+ }
+
public Map<String,String> getContext()
{
return _context == null ? Collections.<String,String>emptyMap() : Collections.unmodifiableMap(_context);
@@ -1219,8 +1243,7 @@ public abstract class AbstractConfigured
if(attr != null && (attr.isAutomated() || attr.isDerived()))
{
Object value = attr.getValue((X)this);
- if(value != null && attr.isSecure() &&
- !SecurityManager.isSystemProcess())
+ if(value != null && !SecurityManager.isSystemProcess() && attr.isSecureValue(value))
{
return SECURE_VALUES.get(value.getClass());
}
@@ -1620,8 +1643,9 @@ public abstract class AbstractConfigured
{
Object desired = attributes.get(name);
Object expected = getAttribute(name);
- if(((_attributes.get(name) != null && !_attributes.get(name).equals(attributes.get(name)))
- || attributes.get(name) != null)
+ Object currentValue = _attributes.get(name);
+ if(((currentValue != null && !currentValue.equals(desired))
+ || (currentValue == null && desired != null))
&& changeAttribute(name, expected, desired))
{
attributeSet(name, expected, desired);
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java Tue Mar 3 14:56:40 2015
@@ -50,6 +50,25 @@ abstract class AttributeValueConverter<T
}
};
+ static final AttributeValueConverter<Object> OBJECT_CONVERTER = new AttributeValueConverter<Object>()
+ {
+ @Override
+ public Object convert(final Object value, final ConfiguredObject object)
+ {
+ if(value instanceof String)
+ {
+ return AbstractConfiguredObject.interpolate(object, (String) value);
+ }
+ else if(value == null)
+ {
+ return null;
+ }
+ else
+ {
+ return value;
+ }
+ }
+ };
static final AttributeValueConverter<UUID> UUID_CONVERTER = new AttributeValueConverter<UUID>()
{
@Override
@@ -398,7 +417,17 @@ abstract class AttributeValueConverter<T
}
else if(Map.class.isAssignableFrom(type))
{
- return (AttributeValueConverter<X>) MAP_CONVERTER;
+ if(returnType instanceof ParameterizedType)
+ {
+ Type keyType = ((ParameterizedType) returnType).getActualTypeArguments()[0];
+ Type valueType = ((ParameterizedType) returnType).getActualTypeArguments()[1];
+
+ return (AttributeValueConverter<X>) new GenericMapConverter(keyType,valueType);
+ }
+ else
+ {
+ return (AttributeValueConverter<X>) MAP_CONVERTER;
+ }
}
else if(Collection.class.isAssignableFrom(type))
{
@@ -416,6 +445,10 @@ abstract class AttributeValueConverter<T
{
return (AttributeValueConverter<X>) new ConfiguredObjectConverter(type);
}
+ else if(Object.class == type)
+ {
+ return (AttributeValueConverter<X>) OBJECT_CONVERTER;
+ }
throw new IllegalArgumentException("Cannot create attribute converter of type " + type.getName());
}
@@ -575,6 +608,62 @@ abstract class AttributeValueConverter<T
}
}
+ public static class GenericMapConverter extends AttributeValueConverter<Map>
+ {
+
+ private final AttributeValueConverter<?> _keyConverter;
+ private final AttributeValueConverter<?> _valueConverter;
+
+
+ public GenericMapConverter(final Type keyType, final Type valueType)
+ {
+ _keyConverter = getConverter(getRawType(keyType), keyType);
+
+ _valueConverter = getConverter(getRawType(valueType), valueType);
+ }
+
+
+ @Override
+ public Map convert(final Object value, final ConfiguredObject object)
+ {
+ if(value instanceof Map)
+ {
+ Map<?,?> original = (Map<?,?>)value;
+ Map converted = new LinkedHashMap(original.size());
+ for(Map.Entry<?,?> entry : original.entrySet())
+ {
+ converted.put(_keyConverter.convert(entry.getKey(),object),
+ _valueConverter.convert(entry.getValue(), object));
+ }
+ return Collections.unmodifiableMap(converted);
+ }
+ else if(value == null)
+ {
+ return null;
+ }
+ else
+ {
+ if(value instanceof String)
+ {
+ String interpolated = AbstractConfiguredObject.interpolate(object, (String) value);
+ ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ return convert(objectMapper.readValue(interpolated, Map.class), object);
+ }
+ catch (IOException e)
+ {
+ // fall through to the non-JSON single object case
+ }
+ }
+
+ throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Map");
+ }
+
+ }
+ }
+
+
static final class EnumConverter<X extends Enum<X>> extends AttributeValueConverter<X>
{
private final Class<X> _klazz;
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java Tue Mar 3 14:56:40 2015
@@ -28,6 +28,7 @@ import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.regex.Pattern;
import org.apache.log4j.Logger;
@@ -37,6 +38,7 @@ public class ConfiguredAutomatedAttribut
private final ManagedAttribute _annotation;
private final Method _validValuesMethod;
+ private final Pattern _secureValuePattern;
ConfiguredAutomatedAttribute(final Class<C> clazz,
final Method getter,
@@ -53,6 +55,16 @@ public class ConfiguredAutomatedAttribut
validValuesMethod = getValidValuesMethod(validValue, clazz);
}
_validValuesMethod = validValuesMethod;
+
+ String secureValueFilter = _annotation.secureValueFilter();
+ if (secureValueFilter == null || "".equals(secureValueFilter))
+ {
+ _secureValuePattern = null;
+ }
+ else
+ {
+ _secureValuePattern = Pattern.compile(secureValueFilter);
+ }
}
private Method getValidValuesMethod(final String validValue, final Class<C> clazz)
@@ -140,6 +152,11 @@ public class ConfiguredAutomatedAttribut
return _annotation.description();
}
+ public Pattern getSecureValueFilter()
+ {
+ return _secureValuePattern;
+ }
+
public Collection<String> validValues()
{
if(_validValuesMethod != null)
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java Tue Mar 3 14:56:40 2015
@@ -21,10 +21,12 @@
package org.apache.qpid.server.model;
import java.lang.reflect.Method;
+import java.util.regex.Pattern;
public class ConfiguredDerivedAttribute<C extends ConfiguredObject, T> extends ConfiguredObjectAttribute<C,T>
{
private final DerivedAttribute _annotation;
+ private final Pattern _secureValuePattern;
ConfiguredDerivedAttribute(final Class<C> clazz,
final Method getter,
@@ -32,6 +34,16 @@ public class ConfiguredDerivedAttribute<
{
super(clazz, getter);
_annotation = annotation;
+
+ String secureValueFilter = _annotation.secureValueFilter();
+ if (secureValueFilter == null || "".equals(secureValueFilter))
+ {
+ _secureValuePattern = null;
+ }
+ else
+ {
+ _secureValuePattern = Pattern.compile(secureValueFilter);
+ }
}
public boolean isAutomated()
@@ -72,4 +84,10 @@ public class ConfiguredDerivedAttribute<
return _annotation.description();
}
+ @Override
+ public Pattern getSecureValueFilter()
+ {
+ return _secureValuePattern;
+ }
+
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java Tue Mar 3 14:56:40 2015
@@ -239,6 +239,9 @@ public interface ConfiguredObject<X exte
void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException;
Class<? extends ConfiguredObject> getCategoryClass();
+ Class<? extends ConfiguredObject> getTypeClass();
+
+ boolean managesChildStorage();
<C extends ConfiguredObject<C>> C findConfiguredObject(Class<C> clazz, String name);
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java Tue Mar 3 14:56:40 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.server.model;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
+import java.util.regex.Pattern;
public abstract class ConfiguredObjectAttribute<C extends ConfiguredObject, T> extends ConfiguredObjectAttributeOrStatistic<C,T>
{
@@ -49,6 +50,25 @@ public abstract class ConfiguredObjectAt
public abstract String getDescription();
+ public abstract Pattern getSecureValueFilter();
+
+ public boolean isSecureValue(Object value)
+ {
+ if (isSecure())
+ {
+ Pattern filter = getSecureValueFilter();
+ if (filter == null)
+ {
+ return true;
+ }
+ else
+ {
+ return filter.matcher(String.valueOf(value)).matches();
+ }
+ }
+ return false;
+ }
+
public T convert(final Object value, C object)
{
final AttributeValueConverter<T> converter = getConverter();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java Tue Mar 3 14:56:40 2015
@@ -156,7 +156,7 @@ public class ConfiguredObjectFactoryImpl
factory = categoryFactories.get(_defaultTypes.get(category));
if(factory == null)
{
- throw new NoFactoryForTypeException(category, _defaultTypes.get(category));
+ throw new NoFactoryForTypeException(category, type);
}
}
return factory;
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java Tue Mar 3 14:56:40 2015
@@ -385,7 +385,7 @@ public class ConfiguredObjectTypeRegistr
return null;
}
- private Class<? extends ConfiguredObject> getTypeClass(final Class<? extends ConfiguredObject> clazz)
+ public Class<? extends ConfiguredObject> getTypeClass(final Class<? extends ConfiguredObject> clazz)
{
String typeName = getType(clazz);
Class<? extends ConfiguredObject> typeClass = null;
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java Tue Mar 3 14:56:40 2015
@@ -32,4 +32,5 @@ public @interface DerivedAttribute
boolean persist() default false;
String description() default "";
boolean oversize() default false;
+ String secureValueFilter() default "";
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java Tue Mar 3 14:56:40 2015
@@ -37,4 +37,5 @@ public @interface ManagedAttribute
String[] validValues() default {};
boolean oversize() default false;
String oversizedAltText() default "";
+ String secureValueFilter() default "";
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Tue Mar 3 14:56:40 2015
@@ -21,6 +21,8 @@
package org.apache.qpid.server.model;
import java.util.Collection;
+import java.util.List;
+import java.util.Map;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.store.MessageDurability;
@@ -48,6 +50,8 @@ public interface Queue<X extends Queue<X
String QUEUE_FLOW_STOPPED = "queueFlowStopped";
String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
+ String DEFAULT_FILTERS = "defaultFilters";
+ String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers";
String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
@ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT)
@@ -67,6 +71,9 @@ public interface Queue<X extends Queue<X
@ManagedAttribute( defaultValue = "NONE" )
ExclusivityPolicy getExclusive();
+ @ManagedAttribute( defaultValue = "false" )
+ boolean isEnsureNondestructiveConsumers();
+
@DerivedAttribute( persist = true )
String getOwner();
@@ -155,6 +162,9 @@ public interface Queue<X extends Queue<X
@ManagedAttribute
long getMaximumMessageTtl();
+ @ManagedAttribute
+ Map<String, Map<String,List<String>>> getDefaultFilters();
+
//children
Collection<? extends Binding> getBindings();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java Tue Mar 3 14:56:40 2015
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.server.model;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.ManagedObject;
-
@ManagedObject(category=true, managesChildren=false, creatable=false)
public interface RemoteReplicationNode<X extends RemoteReplicationNode<X>> extends ConfiguredObject<X>
{
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java Tue Mar 3 14:56:40 2015
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.model;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogRecorder;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -37,6 +38,9 @@ public interface SystemConfig<X extends
String INITIAL_CONFIGURATION_LOCATION = "initialConfigurationLocation";
String STARTUP_LOGGED_TO_SYSTEM_OUT = "startupLoggedToSystemOut";
+ @ManagedContextDefault(name = BrokerProperties.POSIX_FILE_PERMISSIONS)
+ String DEFAULT_POSIX_FILE_PERMISSIONS = "rw-r-----";
+
@ManagedAttribute(defaultValue = "false")
boolean isManagementMode();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Tue Mar 3 14:56:40 2015
@@ -22,14 +22,16 @@ package org.apache.qpid.server.model;
import java.security.AccessControlException;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.store.MessageStore;
-@ManagedObject( managesChildren = true, defaultType = "ProvidedStore")
+@ManagedObject( defaultType = "ProvidedStore")
public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, E extends Exchange<?> > extends ConfiguredObject<X>
{
@@ -42,6 +44,9 @@ public interface VirtualHost<X extends V
String STORE_TRANSACTION_OPEN_TIMEOUT_WARN = "storeTransactionOpenTimeoutWarn";
String HOUSE_KEEPING_THREAD_COUNT = "houseKeepingThreadCount";
String MODEL_VERSION = "modelVersion";
+ String ENABLED_CONNECTION_VALIDATORS = "enabledConnectionValidators";
+ String DISABLED_CONNECTION_VALIDATORS = "disabledConnectionValidators";
+ String GLOBAL_ADDRESS_DOMAINS = "globalAddressDomains";
@ManagedContextDefault( name = "queue.deadLetterQueueEnabled")
public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false;
@@ -88,6 +93,21 @@ public interface VirtualHost<X extends V
@DerivedAttribute( persist = true )
String getModelVersion();
+ @ManagedContextDefault( name = "virtualhost.enabledConnectionValidators")
+ String DEFAULT_ENABLED_VALIDATORS = "[]";
+
+ @ManagedAttribute( defaultValue = "${virtualhost.enabledConnectionValidators}")
+ List<String> getEnabledConnectionValidators();
+
+ @ManagedContextDefault( name = "virtualhost.disabledConnectionValidators")
+ String DEFAULT_DISABLED_VALIDATORS = "[]";
+
+ @ManagedAttribute( defaultValue = "${virtualhost.disabledConnectionValidators}")
+ List<String> getDisabledConnectionValidators();
+
+ @ManagedAttribute( defaultValue = "[]")
+ List<String> getGlobalAddressDomains();
+
@ManagedStatistic
long getQueueCount();
@@ -129,6 +149,8 @@ public interface VirtualHost<X extends V
void delete();
+ String getRedirectHost(AmqpPort<?> port);
+
public static interface Transaction
{
void dequeue(MessageInstance entry);
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java Tue Mar 3 14:56:40 2015
@@ -24,7 +24,7 @@ import java.util.Collection;
import org.apache.qpid.server.store.DurableConfigurationStore;
-@ManagedObject(category=true, managesChildren=false)
+@ManagedObject(category=true, managesChildren=true)
public interface VirtualHostNode<X extends VirtualHostNode<X>> extends ConfiguredObject<X>
{
String QPID_INITIAL_CONFIG_VIRTUALHOST_CONFIG_VAR = "qpid.initial_config_virtualhost_config";
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Tue Mar 3 14:56:40 2015
@@ -80,7 +80,7 @@ public final class ConnectionAdapter ext
{
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(ID, UUID.randomUUID());
- attributes.put(NAME, _connection.getRemoteAddressString().replaceAll("/", ""));
+ attributes.put(NAME, "[" + _connection.getConnectionId() + "] " + _connection.getRemoteAddressString().replaceAll("/", ""));
attributes.put(DURABLE, false);
return attributes;
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java Tue Mar 3 14:56:40 2015
@@ -25,7 +25,7 @@ import org.apache.qpid.server.model.Grou
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedObject;
-@ManagedObject( category = false, type = "GroupFile" )
+@ManagedObject( category = false, type = "GroupFile", managesChildren = true )
public interface FileBasedGroupProvider<X extends FileBasedGroupProvider<X>> extends GroupProvider<X>, GroupManagingGroupProvider
{
String PATH="path";
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java Tue Mar 3 14:56:40 2015
@@ -34,6 +34,7 @@ import java.util.UUID;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
@@ -50,6 +51,7 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.group.FileGroupDatabase;
import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.util.FileHelper;
public class FileBasedGroupProviderImpl
extends AbstractConfiguredObject<FileBasedGroupProviderImpl> implements FileBasedGroupProvider<FileBasedGroupProviderImpl>
@@ -162,9 +164,11 @@ public class FileBasedGroupProviderImpl
{
throw new IllegalConfigurationException(String.format("Cannot create groups file at '%s'",_path));
}
+
try
{
- file.createNewFile();
+ String posixFileAttributes = getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS);
+ new FileHelper().createNewFile(file, posixFileAttributes);
}
catch (IOException e)
{
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java Tue Mar 3 14:56:40 2015
@@ -21,14 +21,14 @@
package org.apache.qpid.server.model.adapter;
-import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -38,6 +38,9 @@ import java.util.Set;
import java.util.TreeMap;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.ObjectMapper;
@@ -118,7 +121,7 @@ public class FileSystemPreferencesProvid
FileSystemPreferencesStore store = new FileSystemPreferencesStore(new File(_path));
// we need to check and create file if it does not exist every time on open
- store.createIfNotExist();
+ store.createIfNotExist(getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS));
store.open();
_store = store;
_open = true;
@@ -184,6 +187,7 @@ public class FileSystemPreferencesProvid
if(_store != null)
{
+ _store.close();
_store.delete();
deleted();
_authenticationProvider.setPreferencesProvider(null);
@@ -280,7 +284,7 @@ public class FileSystemPreferencesProvid
else
{
FileSystemPreferencesStore store = new FileSystemPreferencesStore(new File(_path));
- store.createIfNotExist();
+ store.createIfNotExist(getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS));
store.open();
_store = store;
}
@@ -334,9 +338,9 @@ public class FileSystemPreferencesProvid
{
private final ObjectMapper _objectMapper;
private final Map<String, Map<String, Object>> _preferences;
+ private final FileHelper _fileHelper;
private File _storeFile;
private FileLock _storeLock;
- private RandomAccessFile _storeRAF;
public FileSystemPreferencesStore(File preferencesFile)
{
@@ -345,9 +349,10 @@ public class FileSystemPreferencesProvid
_objectMapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
_objectMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
_preferences = new TreeMap<String, Map<String, Object>>();
+ _fileHelper = new FileHelper();
}
- public void createIfNotExist()
+ public void createIfNotExist(String filePermissions)
{
if (!_storeFile.exists())
{
@@ -358,7 +363,8 @@ public class FileSystemPreferencesProvid
}
try
{
- if (_storeFile.createNewFile() && !_storeFile.exists())
+ Path path = _fileHelper.createNewFile(_storeFile, filePermissions);
+ if (!Files.exists(path))
{
throw new IllegalConfigurationException(String.format("Cannot create preferences store file at '%s'", _storeFile.getAbsolutePath()));
}
@@ -391,43 +397,20 @@ public class FileSystemPreferencesProvid
}
try
{
- _storeRAF = new RandomAccessFile(_storeFile, "rw");
- FileChannel fileChannel = _storeRAF.getChannel();
- try
- {
- _storeLock = fileChannel.tryLock();
- }
- catch (OverlappingFileLockException e)
- {
- _storeLock = null;
- }
- if (_storeLock == null)
+ getFileLock(_storeFile.getPath() + ".lck");
+ if (_storeFile.length() > 0)
{
- throw new IllegalConfigurationException("Cannot get lock on store file " + _storeFile.getName()
- + " is another instance running?");
- }
- long fileSize = fileChannel.size();
- if (fileSize > 0)
- {
- ByteBuffer buffer = ByteBuffer.allocate((int) fileSize);
- fileChannel.read(buffer);
- buffer.rewind();
- buffer.flip();
- byte[] data = buffer.array();
- try
- {
- Map<String, Map<String, Object>> preferencesMap = _objectMapper.readValue(data,
- new TypeReference<Map<String, Map<String, Object>>>()
- {
- });
- _preferences.putAll(preferencesMap);
- }
- catch (JsonProcessingException e)
- {
- throw new IllegalConfigurationException("Cannot parse preferences json in " + _storeFile.getName(), e);
- }
+ Map<String, Map<String, Object>> preferencesMap = _objectMapper.readValue(_storeFile,
+ new TypeReference<Map<String, Map<String, Object>>>()
+ {
+ });
+ _preferences.putAll(preferencesMap);
}
}
+ catch (JsonProcessingException e)
+ {
+ throw new IllegalConfigurationException("Cannot parse preferences json in " + _storeFile.getName(), e);
+ }
catch (IOException e)
{
throw new IllegalConfigurationException("Cannot load preferences from " + _storeFile.getName(), e);
@@ -443,6 +426,7 @@ public class FileSystemPreferencesProvid
if (_storeLock != null)
{
_storeLock.release();
+ _storeLock.channel().close();
}
}
catch (IOException e)
@@ -452,22 +436,7 @@ public class FileSystemPreferencesProvid
finally
{
_storeLock = null;
- try
- {
- if (_storeRAF != null)
- {
- _storeRAF.close();
- }
- }
- catch (IOException e)
- {
- LOGGER.error("Cannot close preferences file", e);
- }
- finally
- {
- _storeRAF = null;
- _preferences.clear();
- }
+ _preferences.clear();
}
}
}
@@ -544,16 +513,14 @@ public class FileSystemPreferencesProvid
checkStoreOpened();
try
{
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- _objectMapper.writeValue(baos, _preferences);
- FileChannel channel = _storeRAF.getChannel();
- long currentSize = channel.size();
- channel.position(0);
- channel.write(ByteBuffer.wrap(baos.toByteArray()));
- if (currentSize > baos.size())
+ _fileHelper.writeFileSafely(_storeFile.toPath(), new BaseAction<File, IOException>()
{
- channel.truncate(baos.size());
- }
+ @Override
+ public void performAction(File file) throws IOException
+ {
+ _objectMapper.writeValue(file, _preferences);
+ }
+ });
}
catch (IOException e)
{
@@ -569,5 +536,32 @@ public class FileSystemPreferencesProvid
}
}
+ private void getFileLock(String lockFilePath)
+ {
+ File lockFile = new File(lockFilePath);
+ try
+ {
+ lockFile.createNewFile();
+ lockFile.deleteOnExit();
+
+ @SuppressWarnings("resource")
+ FileOutputStream out = new FileOutputStream(lockFile);
+ FileChannel channel = out.getChannel();
+ _storeLock = channel.tryLock();
+ }
+ catch (IOException ioe)
+ {
+ throw new IllegalStateException("Cannot create the lock file " + lockFile.getName(), ioe);
+ }
+ catch(OverlappingFileLockException e)
+ {
+ _storeLock = null;
+ }
+
+ if(_storeLock == null)
+ {
+ throw new IllegalStateException("Cannot get lock on file " + lockFile.getAbsolutePath() + ". Is another instance running?");
+ }
+ }
}
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java Tue Mar 3 14:56:40 2015
@@ -19,8 +19,11 @@
package org.apache.qpid.server.plugin;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.ServiceLoader;
import org.apache.log4j.Logger;
@@ -47,6 +50,16 @@ public class QpidServiceLoader
return instancesOf(clazz, true);
}
+ public <C extends Pluggable> Map<String,C> getInstancesByType(Class<C> clazz)
+ {
+ Map<String,C> instances = new HashMap<>();
+ for(C instance : instancesOf(clazz))
+ {
+ instances.put(instance.getType(), instance);
+ }
+ return Collections.unmodifiableMap(instances);
+ }
+
private <C extends Pluggable> Iterable<C> instancesOf(Class<C> clazz, boolean atLeastOne)
{
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Mar 3 14:56:40 2015
@@ -30,7 +30,9 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -52,6 +54,7 @@ import org.apache.qpid.server.consumer.C
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -75,6 +78,8 @@ import org.apache.qpid.server.model.Queu
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.plugin.MessageFilterFactory;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SecurityManager;
@@ -186,6 +191,9 @@ public abstract class AbstractQueue<X ex
@ManagedAttributeField
private MessageDurability _messageDurability;
+ @ManagedAttributeField
+ private Map<String, Map<String,List<String>>> _defaultFilters;
+
private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name
private final Set<NotificationCheck> _notificationChecks =
@@ -241,12 +249,15 @@ public abstract class AbstractQueue<X ex
private long _minimumMessageTtl;
@ManagedAttributeField
private long _maximumMessageTtl;
+ @ManagedAttributeField
+ private boolean _ensureNondestructiveConsumers;
private final AtomicBoolean _recovering = new AtomicBoolean(true);
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
private final QueueRunner _queueRunner = new QueueRunner(this);
private boolean _closing;
+ private final ConcurrentMap<String,MessageFilter> _defaultFiltersMap = new ConcurrentHashMap<>();
protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
{
@@ -283,11 +294,7 @@ public abstract class AbstractQueue<X ex
});
}
- if (isDurable())
- {
- _virtualHost.getDurableConfigurationStore().create(asObjectRecord());
- }
- else if(getMessageDurability() != MessageDurability.NEVER)
+ if(!isDurable() && getMessageDurability() != MessageDurability.NEVER)
{
Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
new PrivilegedAction<Object>()
@@ -351,17 +358,9 @@ public abstract class AbstractQueue<X ex
case PRINCIPAL:
_exclusiveOwner = sessionModel.getConnectionModel().getAuthorizedPrincipal();
- if(isDurable())
- {
- _virtualHost.getDurableConfigurationStore().update(false,asObjectRecord());
- }
break;
case CONTAINER:
_exclusiveOwner = sessionModel.getConnectionModel().getRemoteContainerName();
- if(isDurable())
- {
- _virtualHost.getDurableConfigurationStore().update(false,asObjectRecord());
- }
break;
case CONNECTION:
_exclusiveOwner = sessionModel.getConnectionModel();
@@ -450,6 +449,40 @@ public abstract class AbstractQueue<X ex
}
_maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES);
+
+ if(_defaultFilters != null)
+ {
+ QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
+ final Map<String, MessageFilterFactory> messageFilterFactories =
+ qpidServiceLoader.getInstancesByType(MessageFilterFactory.class);
+
+ for (Map.Entry<String,Map<String,List<String>>> entry : _defaultFilters.entrySet())
+ {
+ String name = String.valueOf(entry.getKey());
+ Map<String, List<String>> filterValue = entry.getValue();
+ if(filterValue.size() == 1)
+ {
+ String filterTypeName = String.valueOf(filterValue.keySet().iterator().next());
+ MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName);
+ if(filterFactory != null)
+ {
+ List<String> filterArguments = filterValue.values().iterator().next();
+ _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments));
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unknown filter type " + filterTypeName + ", known types are: " + messageFilterFactories.keySet());
+ }
+ }
+ else
+ {
+ throw new IllegalArgumentException("Filter value should be a map with one entry, having the type as key and the value being the filter arguments, not " + filterValue);
+
+ }
+
+ }
+ }
+
updateAlertChecks();
}
@@ -555,6 +588,12 @@ public abstract class AbstractQueue<X ex
}
@Override
+ public Map<String, Map<String, List<String>>> getDefaultFilters()
+ {
+ return _defaultFilters;
+ }
+
+ @Override
public final MessageDurability getMessageDurability()
{
return _messageDurability;
@@ -573,6 +612,14 @@ public abstract class AbstractQueue<X ex
}
@Override
+ public boolean isEnsureNondestructiveConsumers()
+ {
+ return _ensureNondestructiveConsumers;
+ }
+
+
+
+ @Override
public Collection<String> getAvailableAttributes()
{
return new ArrayList<String>(_arguments.keySet());
@@ -603,7 +650,7 @@ public abstract class AbstractQueue<X ex
@Override
public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target,
- final FilterManager filters,
+ FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
EnumSet<ConsumerImpl.Option> optionSet)
@@ -699,6 +746,26 @@ public abstract class AbstractQueue<X ex
{
throw new ExistingConsumerPreventsExclusive();
}
+ if(!_defaultFiltersMap.isEmpty())
+ {
+ if(filters == null)
+ {
+ filters = new FilterManager();
+ }
+ for (Map.Entry<String,MessageFilter> filter : _defaultFiltersMap.entrySet())
+ {
+ if(!filters.hasFilter(filter.getKey()))
+ {
+ filters.add(filter.getKey(), filter.getValue());
+ }
+ }
+ }
+
+ if(_ensureNondestructiveConsumers)
+ {
+ optionSet = EnumSet.copyOf(optionSet);
+ optionSet.removeAll(EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES));
+ }
QueueConsumerImpl consumer = new QueueConsumerImpl(this,
target,
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Tue Mar 3 14:56:40 2015
@@ -62,11 +62,16 @@ public class QueueArgumentsConverter
public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
+ public static final String QPID_DEFAULT_FILTERS = "qpid.default_filters";
+
+ public static final String QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS = "qpid.ensure_nondestructive_consumers";
/**
* No-local queue argument is used to support the no-local feature of Durable Subscribers.
*/
public static final String QPID_NO_LOCAL = "no-local";
+
static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>();
+
static
{
ATTRIBUTE_MAPPINGS.put(X_QPID_MINIMUM_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP);
@@ -99,6 +104,8 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL);
ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY);
+ ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_FILTERS, Queue.DEFAULT_FILTERS);
+ ATTRIBUTE_MAPPINGS.put(QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS, Queue.ENSURE_NONDESTRUCTIVE_CONSUMERS);
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Mar 3 14:56:40 2015
@@ -371,11 +371,16 @@ public abstract class QueueEntryImpl imp
}
}
- private void dequeue()
+ private boolean dequeue()
{
EntryState state = _state;
- if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
+ while(state.getState() == State.ACQUIRED && !_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
+ {
+ state = _state;
+ }
+
+ if(state.getState() == State.ACQUIRED)
{
if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
{
@@ -387,7 +392,11 @@ public abstract class QueueEntryImpl imp
{
notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
}
-
+ return true;
+ }
+ else
+ {
+ return false;
}
}
@@ -420,9 +429,10 @@ public abstract class QueueEntryImpl imp
public void delete()
{
- dequeue();
-
- dispose();
+ if(dequeue())
+ {
+ dispose();
+ }
}
public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn)
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java Tue Mar 3 14:56:40 2015
@@ -62,7 +62,7 @@ public interface FileKeyStore<X extends
@ManagedAttribute(defaultValue = "${this:path}")
String getDescription();
- @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT)
+ @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT, secureValueFilter = "^data\\:.*")
String getStoreUrl();
@DerivedAttribute
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java Tue Mar 3 14:56:40 2015
@@ -31,7 +31,7 @@ public interface NonJavaKeyStore<X exten
@ManagedAttribute(defaultValue = "${this:subjectName}")
String getDescription();
- @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT )
+ @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT, secureValueFilter = "^data\\:.*")
String getPrivateKeyUrl();
@ManagedAttribute( mandatory = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT )
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org