You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/05/12 14:07:17 UTC
[4/6] activemq-artemis git commit: ARTEMIS-1156: moving our
collections on its own package
ARTEMIS-1156: moving our collections on its own package
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dc26ac96
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dc26ac96
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dc26ac96
Branch: refs/heads/master
Commit: dc26ac96b4b24b5caa8a14db24b25b6f7e8026d9
Parents: c1d55aa
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri May 12 10:00:15 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri May 12 10:06:05 2017 -0400
----------------------------------------------------------------------
.../artemis/cli/process/ProcessBuilder.java | 2 +-
.../artemis/core/server/NetworkHealthCheck.java | 2 +-
.../artemis/utils/ConcurrentHashSet.java | 81 --
.../activemq/artemis/utils/ConcurrentSet.java | 29 -
.../activemq/artemis/utils/DataConstants.java | 2 +-
.../activemq/artemis/utils/TypedProperties.java | 937 ------------------
.../utils/collections/ConcurrentHashSet.java | 81 ++
.../utils/collections/ConcurrentSet.java | 29 +
.../artemis/utils/collections/LinkedList.java | 32 +
.../utils/collections/LinkedListImpl.java | 390 ++++++++
.../utils/collections/LinkedListIterator.java | 32 +
.../utils/collections/PriorityLinkedList.java | 38 +
.../collections/PriorityLinkedListImpl.java | 248 +++++
.../utils/collections/TypedProperties.java | 939 +++++++++++++++++++
...uentPropertyBeanIntrospectorWithIgnores.java | 2 +-
.../artemis/utils/ConcurrentHashSetTest.java | 2 +
.../utils/TypedPropertiesConversionTest.java | 1 +
.../artemis/utils/TypedPropertiesTest.java | 1 +
.../core/client/impl/ClientConsumerImpl.java | 4 +-
.../core/client/impl/ClientMessageImpl.java | 2 +-
.../core/client/impl/ClientMessageInternal.java | 2 +-
.../client/impl/ClientSessionFactoryImpl.java | 2 +-
.../artemis/core/cluster/DiscoveryGroup.java | 2 +-
.../artemis/core/message/impl/CoreMessage.java | 2 +-
.../core/server/management/Notification.java | 2 +-
.../activemq/artemis/reader/MapMessageUtil.java | 2 +-
.../activemq/artemis/utils/LinkedList.java | 32 -
.../activemq/artemis/utils/LinkedListImpl.java | 390 --------
.../artemis/utils/LinkedListIterator.java | 32 -
.../artemis/utils/PriorityLinkedList.java | 38 -
.../artemis/utils/PriorityLinkedListImpl.java | 248 -----
.../util/TimeAndCounterIDGeneratorTest.java | 2 +-
.../store/file/JDBCSequentialFileFactory.java | 2 +-
.../artemis/jms/client/ActiveMQConnection.java | 2 +-
.../artemis/jms/client/ActiveMQJMSProducer.java | 2 +-
.../artemis/jms/client/ActiveMQMapMessage.java | 2 +-
.../artemis/jms/client/ThreadAwareContext.java | 2 +-
.../jms/server/impl/JMSServerManagerImpl.java | 4 +-
.../artemis/core/journal/impl/JournalImpl.java | 8 +-
.../amqp/converter/jms/ServerJMSMapMessage.java | 2 +-
.../protocol/mqtt/MQTTConnectionManager.java | 2 +-
.../protocol/mqtt/MQTTRetainMessageManager.java | 2 +-
.../protocol/openwire/OpenWireConnection.java | 2 +-
.../openwire/OpenWireMessageConverter.java | 2 +-
.../artemis/ra/ActiveMQRAConnectionManager.java | 2 +-
.../artemis/ra/recovery/RecoveryManager.java | 2 +-
.../impl/ActiveMQServerControlImpl.java | 4 +-
.../core/management/impl/QueueControlImpl.java | 2 +-
.../core/paging/cursor/PageIterator.java | 2 +-
.../core/paging/cursor/PageSubscription.java | 2 +-
.../cursor/impl/PageSubscriptionImpl.java | 2 +-
.../activemq/artemis/core/paging/impl/Page.java | 2 +-
.../core/paging/impl/PagingManagerImpl.java | 2 +-
.../impl/journal/LargeServerMessageImpl.java | 2 +-
.../core/postoffice/impl/PostOfficeImpl.java | 2 +-
.../core/remoting/impl/invm/InVMAcceptor.java | 2 +-
.../core/remoting/impl/netty/NettyAcceptor.java | 2 +-
.../core/security/impl/SecurityStoreImpl.java | 4 +-
.../activemq/artemis/core/server/Queue.java | 2 +-
.../core/server/cluster/ClusterManager.java | 2 +-
.../core/server/cluster/impl/BridgeImpl.java | 2 +-
.../server/cluster/impl/BroadcastGroupImpl.java | 2 +-
.../cluster/impl/ClusterConnectionImpl.java | 4 +-
.../group/impl/GroupHandlingAbstract.java | 2 +-
.../server/group/impl/LocalGroupingHandler.java | 2 +-
.../group/impl/RemoteGroupingHandler.java | 4 +-
.../core/server/impl/ActiveMQServerImpl.java | 7 +-
.../artemis/core/server/impl/QueueImpl.java | 10 +-
.../core/server/impl/ScaleDownHandler.java | 2 +-
.../core/server/impl/ServerConsumerImpl.java | 4 +-
.../core/server/impl/ServerSessionImpl.java | 15 +-
.../management/impl/ManagementServiceImpl.java | 4 +-
.../core/list/PriorityLinkedListTest.java | 4 +-
.../group/impl/ClusteredResetMockTest.java | 2 +-
.../impl/ScheduledDeliveryHandlerTest.java | 2 +-
etc/findbugs-exclude.xml | 2 +-
.../tests/integration/client/ConsumerTest.java | 2 +-
.../integration/client/SlowConsumerTest.java | 4 +-
.../integration/cluster/bridge/BridgeTest.java | 5 +-
.../integration/mqtt/imported/MQTTFQQNTest.java | 2 +-
.../integration/mqtt/imported/MQTTTest.java | 2 +-
.../integration/paging/PagingSendTest.java | 2 +-
.../integration/plugin/MqttPluginTest.java | 32 +-
.../stress/paging/PageCursorStressTest.java | 2 +-
.../unit/core/postoffice/impl/FakeQueue.java | 5 +-
.../unit/core/server/impl/QueueImplTest.java | 2 +-
.../artemis/tests/unit/util/LinkedListTest.java | 4 +-
87 files changed, 1904 insertions(+), 1902 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc26ac96/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/process/ProcessBuilder.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/process/ProcessBuilder.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/process/ProcessBuilder.java
index 5e4acf4..9eb7960 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/process/ProcessBuilder.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/process/ProcessBuilder.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
public class ProcessBuilder {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc26ac96/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.java
index cabd045..eef79a1 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.java
@@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.logs.ActiveMQUtilLogger;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc26ac96/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentHashSet.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentHashSet.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentHashSet.java
deleted file mode 100644
index 1b4e5d7..0000000
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentHashSet.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.utils;
-
-import java.util.AbstractSet;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * A ConcurrentHashSet.
- *
- * Offers same concurrency as ConcurrentHashMap but for a Set
- */
-public class ConcurrentHashSet<E> extends AbstractSet<E> implements ConcurrentSet<E> {
-
- private final ConcurrentMap<E, Object> theMap;
-
- private static final Object dummy = new Object();
-
- public ConcurrentHashSet() {
- theMap = new ConcurrentHashMap<>();
- }
-
- @Override
- public int size() {
- return theMap.size();
- }
-
- @Override
- public Iterator<E> iterator() {
- return theMap.keySet().iterator();
- }
-
- @Override
- public boolean isEmpty() {
- return theMap.isEmpty();
- }
-
- @Override
- public boolean add(final E o) {
- return theMap.put(o, ConcurrentHashSet.dummy) == null;
- }
-
- @Override
- public boolean contains(final Object o) {
- return theMap.containsKey(o);
- }
-
- @Override
- public void clear() {
- theMap.clear();
- }
-
- @Override
- public boolean remove(final Object o) {
- return theMap.remove(o) == ConcurrentHashSet.dummy;
- }
-
- @Override
- public boolean addIfAbsent(final E o) {
- Object obj = theMap.putIfAbsent(o, ConcurrentHashSet.dummy);
-
- return obj == null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc26ac96/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentSet.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentSet.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentSet.java
deleted file mode 100644
index e55be24..0000000
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentSet.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.utils;
-
-import java.util.Set;
-
-/**
- * A ConcurrentSet
- *
- * @param <E> The generic class
- */
-public interface ConcurrentSet<E> extends Set<E> {
-
- boolean addIfAbsent(E o);
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc26ac96/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/DataConstants.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/DataConstants.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/DataConstants.java
index 38df9b6..5b2b365 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/DataConstants.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/DataConstants.java
@@ -32,7 +32,7 @@ public final class DataConstants {
public static final int SIZE_FLOAT = 4;
- static final int SIZE_CHAR = 2;
+ public static final int SIZE_CHAR = 2;
public static final byte TRUE = 1;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc26ac96/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
deleted file mode 100644
index fda135b..0000000
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
+++ /dev/null
@@ -1,937 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.utils;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
-
-import static org.apache.activemq.artemis.utils.DataConstants.BOOLEAN;
-import static org.apache.activemq.artemis.utils.DataConstants.BYTE;
-import static org.apache.activemq.artemis.utils.DataConstants.BYTES;
-import static org.apache.activemq.artemis.utils.DataConstants.CHAR;
-import static org.apache.activemq.artemis.utils.DataConstants.DOUBLE;
-import static org.apache.activemq.artemis.utils.DataConstants.FLOAT;
-import static org.apache.activemq.artemis.utils.DataConstants.INT;
-import static org.apache.activemq.artemis.utils.DataConstants.LONG;
-import static org.apache.activemq.artemis.utils.DataConstants.NULL;
-import static org.apache.activemq.artemis.utils.DataConstants.SHORT;
-import static org.apache.activemq.artemis.utils.DataConstants.STRING;
-
-/**
- * Property Value Conversion.
- * <p>
- * This implementation follows section 3.5.4 of the <i>Java Message Service</i> specification
- * (Version 1.1 April 12, 2002).
- * <p>
- */
-public final class TypedProperties {
-
- private static final SimpleString AMQ_PROPNAME = new SimpleString("_AMQ_");
-
- private Map<SimpleString, PropertyValue> properties;
-
- private volatile int size;
-
- private boolean internalProperties;
-
- public TypedProperties() {
- }
-
- /**
- * Return the number of properites
- * */
- public int size() {
- return properties.size();
- }
-
- public int getMemoryOffset() {
- // The estimate is basically the encode size + 2 object references for each entry in the map
- // Note we don't include the attributes or anything else since they already included in the memory estimate
- // of the ServerMessage
-
- return properties == null ? 0 : size + 2 * DataConstants.SIZE_INT * properties.size();
- }
-
- public TypedProperties(final TypedProperties other) {
- properties = other.properties == null ? null : new HashMap<>(other.properties);
- size = other.size;
- }
-
- public boolean hasInternalProperties() {
- return internalProperties;
- }
-
- public void putBooleanProperty(final SimpleString key, final boolean value) {
- checkCreateProperties();
- doPutValue(key, new BooleanValue(value));
- }
-
- public void putByteProperty(final SimpleString key, final byte value) {
- checkCreateProperties();
- doPutValue(key, new ByteValue(value));
- }
-
- public void putBytesProperty(final SimpleString key, final byte[] value) {
- checkCreateProperties();
- doPutValue(key, value == null ? new NullValue() : new BytesValue(value));
- }
-
- public void putShortProperty(final SimpleString key, final short value) {
- checkCreateProperties();
- doPutValue(key, new ShortValue(value));
- }
-
- public void putIntProperty(final SimpleString key, final int value) {
- checkCreateProperties();
- doPutValue(key, new IntValue(value));
- }
-
- public void putLongProperty(final SimpleString key, final long value) {
- checkCreateProperties();
- doPutValue(key, new LongValue(value));
- }
-
- public void putFloatProperty(final SimpleString key, final float value) {
- checkCreateProperties();
- doPutValue(key, new FloatValue(value));
- }
-
- public void putDoubleProperty(final SimpleString key, final double value) {
- checkCreateProperties();
- doPutValue(key, new DoubleValue(value));
- }
-
- public void putSimpleStringProperty(final SimpleString key, final SimpleString value) {
- checkCreateProperties();
- doPutValue(key, value == null ? new NullValue() : new StringValue(value));
- }
-
- public void putNullValue(final SimpleString key) {
- checkCreateProperties();
- doPutValue(key, new NullValue());
- }
-
- public void putCharProperty(final SimpleString key, final char value) {
- checkCreateProperties();
- doPutValue(key, new CharValue(value));
- }
-
- public void putTypedProperties(final TypedProperties otherProps) {
- if (otherProps == null || otherProps.properties == null) {
- return;
- }
-
- checkCreateProperties();
- Set<Entry<SimpleString, PropertyValue>> otherEntries = otherProps.properties.entrySet();
- for (Entry<SimpleString, PropertyValue> otherEntry : otherEntries) {
- doPutValue(otherEntry.getKey(), otherEntry.getValue());
- }
- }
-
- public Object getProperty(final SimpleString key) {
- return doGetProperty(key);
- }
-
- public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- Object value = doGetProperty(key);
- if (value == null) {
- return Boolean.valueOf(null);
- } else if (value instanceof Boolean) {
- return (Boolean) value;
- } else if (value instanceof SimpleString) {
- return Boolean.valueOf(((SimpleString) value).toString());
- }
- throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
- }
-
- public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- Object value = doGetProperty(key);
- if (value == null) {
- return Byte.valueOf(null);
- } else if (value instanceof Byte) {
- return (Byte) value;
- } else if (value instanceof SimpleString) {
- return Byte.parseByte(((SimpleString) value).toString());
- }
- throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
- }
-
- public Character getCharProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- Object value = doGetProperty(key);
- if (value == null) {
- throw new NullPointerException("Invalid conversion: " + key);
- }
-
- if (value instanceof Character) {
- return ((Character) value);
- }
- throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
- }
-
- public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- Object value = doGetProperty(key);
- if (value == null) {
- return null;
- } else if (value instanceof byte[]) {
- return (byte[]) value;
- }
- throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
- }
-
- public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- Object value = doGetProperty(key);
- if (value == null) {
- return Double.valueOf(null);
- } else if (value instanceof Float) {
- return ((Float) value).doubleValue();
- } else if (value instanceof Double) {
- return (Double) value;
- } else if (value instanceof SimpleString) {
- return Double.parseDouble(((SimpleString) value).toString());
- }
- throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
- }
-
- public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- Object value = doGetProperty(key);
- if (value == null) {
- return Integer.valueOf(null);
- } else if (value instanceof Integer) {
- return (Integer) value;
- } else if (value instanceof Byte) {
- return ((Byte) value).intValue();
- } else if (value instanceof Short) {
- return ((Short) value).intValue();
- } else if (value instanceof SimpleString) {
- return Integer.parseInt(((SimpleString) value).toString());
- }
- throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
- }
-
- public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- Object value = doGetProperty(key);
- if (value == null) {
- return Long.valueOf(null);
- } else if (value instanceof Long) {
- return (Long) value;
- } else if (value instanceof Byte) {
- return ((Byte) value).longValue();
- } else if (value instanceof Short) {
- return ((Short) value).longValue();
- } else if (value instanceof Integer) {
- return ((Integer) value).longValue();
- } else if (value instanceof SimpleString) {
- return Long.parseLong(((SimpleString) value).toString());
- }
- throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
- }
-
- public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- Object value = doGetProperty(key);
- if (value == null) {
- return Short.valueOf(null);
- } else if (value instanceof Byte) {
- return ((Byte) value).shortValue();
- } else if (value instanceof Short) {
- return (Short) value;
- } else if (value instanceof SimpleString) {
- return Short.parseShort(((SimpleString) value).toString());
- }
- throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
- }
-
- public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- Object value = doGetProperty(key);
- if (value == null)
- return Float.valueOf(null);
- if (value instanceof Float) {
- return ((Float) value);
- }
- if (value instanceof SimpleString) {
- return Float.parseFloat(((SimpleString) value).toString());
- }
- throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
- }
-
- public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- Object value = doGetProperty(key);
-
- if (value == null) {
- return null;
- }
-
- if (value instanceof SimpleString) {
- return (SimpleString) value;
- } else if (value instanceof Boolean) {
- return new SimpleString(value.toString());
- } else if (value instanceof Character) {
- return new SimpleString(value.toString());
- } else if (value instanceof Byte) {
- return new SimpleString(value.toString());
- } else if (value instanceof Short) {
- return new SimpleString(value.toString());
- } else if (value instanceof Integer) {
- return new SimpleString(value.toString());
- } else if (value instanceof Long) {
- return new SimpleString(value.toString());
- } else if (value instanceof Float) {
- return new SimpleString(value.toString());
- } else if (value instanceof Double) {
- return new SimpleString(value.toString());
- }
- throw new ActiveMQPropertyConversionException("Invalid conversion: " + key);
- }
-
- public Object removeProperty(final SimpleString key) {
- return doRemoveProperty(key);
- }
-
- public boolean containsProperty(final SimpleString key) {
- if (size == 0) {
- return false;
-
- } else {
- return properties.containsKey(key);
- }
- }
-
- public Set<SimpleString> getPropertyNames() {
- if (size == 0) {
- return Collections.emptySet();
- } else {
- return properties.keySet();
- }
- }
-
- public synchronized void decode(final ByteBuf buffer) {
- byte b = buffer.readByte();
-
- if (b == DataConstants.NULL) {
- properties = null;
- } else {
- int numHeaders = buffer.readInt();
-
- properties = new HashMap<>(numHeaders);
- size = 0;
-
- for (int i = 0; i < numHeaders; i++) {
- int len = buffer.readInt();
- byte[] data = new byte[len];
- buffer.readBytes(data);
- SimpleString key = new SimpleString(data);
-
- byte type = buffer.readByte();
-
- PropertyValue val;
-
- switch (type) {
- case NULL: {
- val = new NullValue();
- doPutValue(key, val);
- break;
- }
- case CHAR: {
- val = new CharValue(buffer);
- doPutValue(key, val);
- break;
- }
- case BOOLEAN: {
- val = new BooleanValue(buffer);
- doPutValue(key, val);
- break;
- }
- case BYTE: {
- val = new ByteValue(buffer);
- doPutValue(key, val);
- break;
- }
- case BYTES: {
- val = new BytesValue(buffer);
- doPutValue(key, val);
- break;
- }
- case SHORT: {
- val = new ShortValue(buffer);
- doPutValue(key, val);
- break;
- }
- case INT: {
- val = new IntValue(buffer);
- doPutValue(key, val);
- break;
- }
- case LONG: {
- val = new LongValue(buffer);
- doPutValue(key, val);
- break;
- }
- case FLOAT: {
- val = new FloatValue(buffer);
- doPutValue(key, val);
- break;
- }
- case DOUBLE: {
- val = new DoubleValue(buffer);
- doPutValue(key, val);
- break;
- }
- case STRING: {
- val = new StringValue(buffer);
- doPutValue(key, val);
- break;
- }
- default: {
- throw ActiveMQUtilBundle.BUNDLE.invalidType(type);
- }
- }
- }
- }
- }
-
- public synchronized void encode(final ByteBuf buffer) {
- if (properties == null) {
- buffer.writeByte(DataConstants.NULL);
- } else {
- buffer.writeByte(DataConstants.NOT_NULL);
-
- buffer.writeInt(properties.size());
-
- for (Map.Entry<SimpleString, PropertyValue> entry : properties.entrySet()) {
- SimpleString s = entry.getKey();
- byte[] data = s.getData();
- buffer.writeInt(data.length);
- buffer.writeBytes(data);
-
- entry.getValue().write(buffer);
- }
- }
- }
-
- public int getEncodeSize() {
- if (properties == null) {
- return DataConstants.SIZE_BYTE;
- } else {
- return DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + size;
- }
- }
-
- public void clear() {
- if (properties != null) {
- properties.clear();
- }
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder("TypedProperties[");
-
- if (properties != null) {
-
- Iterator<Entry<SimpleString, PropertyValue>> iter = properties.entrySet().iterator();
-
- while (iter.hasNext()) {
- Entry<SimpleString, PropertyValue> iterItem = iter.next();
- sb.append(iterItem.getKey() + "=");
-
- // it seems weird but it's right!!
- // The first getValue is from the EntrySet
- // The second is to convert the PropertyValue into the actual value
- Object theValue = iterItem.getValue().getValue();
-
- if (theValue == null) {
- sb.append("NULL-value");
- } else if (theValue instanceof byte[]) {
- sb.append("[" + ByteUtil.maxString(ByteUtil.bytesToHex((byte[]) theValue, 2), 150) + ")");
-
- if (iterItem.getKey().toString().startsWith("_AMQ_ROUTE_TO")) {
- sb.append(",bytesAsLongs(");
- try {
- ByteBuffer buff = ByteBuffer.wrap((byte[]) theValue);
- while (buff.hasRemaining()) {
- long bindingID = buff.getLong();
- sb.append(bindingID);
- if (buff.hasRemaining()) {
- sb.append(",");
- }
- }
- } catch (Throwable e) {
- sb.append("error-converting-longs=" + e.getMessage());
- }
- sb.append("]");
- }
- } else {
- sb.append(theValue.toString());
- }
-
- if (iter.hasNext()) {
- sb.append(",");
- }
- }
- }
-
- return sb.append("]").toString();
- }
-
- // Private ------------------------------------------------------------------------------------
-
- private void checkCreateProperties() {
- if (properties == null) {
- properties = new HashMap<>();
- }
- }
-
- private synchronized void doPutValue(final SimpleString key, final PropertyValue value) {
- if (key.startsWith(AMQ_PROPNAME)) {
- internalProperties = true;
- }
-
- PropertyValue oldValue = properties.put(key, value);
- if (oldValue != null) {
- size += value.encodeSize() - oldValue.encodeSize();
- } else {
- size += SimpleString.sizeofString(key) + value.encodeSize();
- }
- }
-
- private synchronized Object doRemoveProperty(final SimpleString key) {
- if (properties == null) {
- return null;
- }
-
- PropertyValue val = properties.remove(key);
-
- if (val == null) {
- return null;
- } else {
- size -= SimpleString.sizeofString(key) + val.encodeSize();
-
- return val.getValue();
- }
- }
-
- private synchronized Object doGetProperty(final Object key) {
- if (size == 0) {
- return null;
- }
-
- PropertyValue val = properties.get(key);
-
- if (val == null) {
- return null;
- } else {
- return val.getValue();
- }
- }
-
- // Inner classes ------------------------------------------------------------------------------
-
- private abstract static class PropertyValue {
-
- abstract Object getValue();
-
- abstract void write(ByteBuf buffer);
-
- abstract int encodeSize();
-
- @Override
- public String toString() {
- return "" + getValue();
- }
- }
-
- private static final class NullValue extends PropertyValue {
-
- private NullValue() {
- }
-
- @Override
- public Object getValue() {
- return null;
- }
-
- @Override
- public void write(final ByteBuf buffer) {
- buffer.writeByte(DataConstants.NULL);
- }
-
- @Override
- public int encodeSize() {
- return DataConstants.SIZE_BYTE;
- }
-
- }
-
- private static final class BooleanValue extends PropertyValue {
-
- final boolean val;
-
- private BooleanValue(final boolean val) {
- this.val = val;
- }
-
- private BooleanValue(final ByteBuf buffer) {
- val = buffer.readBoolean();
- }
-
- @Override
- public Object getValue() {
- return val;
- }
-
- @Override
- public void write(final ByteBuf buffer) {
- buffer.writeByte(DataConstants.BOOLEAN);
- buffer.writeBoolean(val);
- }
-
- @Override
- public int encodeSize() {
- return DataConstants.SIZE_BYTE + DataConstants.SIZE_BOOLEAN;
- }
-
- }
-
- private static final class ByteValue extends PropertyValue {
-
- final byte val;
-
- private ByteValue(final byte val) {
- this.val = val;
- }
-
- private ByteValue(final ByteBuf buffer) {
- val = buffer.readByte();
- }
-
- @Override
- public Object getValue() {
- return val;
- }
-
- @Override
- public void write(final ByteBuf buffer) {
- buffer.writeByte(DataConstants.BYTE);
- buffer.writeByte(val);
- }
-
- @Override
- public int encodeSize() {
- return DataConstants.SIZE_BYTE + DataConstants.SIZE_BYTE;
- }
- }
-
- private static final class BytesValue extends PropertyValue {
-
- final byte[] val;
-
- private BytesValue(final byte[] val) {
- this.val = val;
- }
-
- private BytesValue(final ByteBuf buffer) {
- int len = buffer.readInt();
- val = new byte[len];
- buffer.readBytes(val);
- }
-
- @Override
- public Object getValue() {
- return val;
- }
-
- @Override
- public void write(final ByteBuf buffer) {
- buffer.writeByte(DataConstants.BYTES);
- buffer.writeInt(val.length);
- buffer.writeBytes(val);
- }
-
- @Override
- public int encodeSize() {
- return DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + val.length;
- }
-
- }
-
- private static final class ShortValue extends PropertyValue {
-
- final short val;
-
- private ShortValue(final short val) {
- this.val = val;
- }
-
- private ShortValue(final ByteBuf buffer) {
- val = buffer.readShort();
- }
-
- @Override
- public Object getValue() {
- return val;
- }
-
- @Override
- public void write(final ByteBuf buffer) {
- buffer.writeByte(DataConstants.SHORT);
- buffer.writeShort(val);
- }
-
- @Override
- public int encodeSize() {
- return DataConstants.SIZE_BYTE + DataConstants.SIZE_SHORT;
- }
- }
-
- private static final class IntValue extends PropertyValue {
-
- final int val;
-
- private IntValue(final int val) {
- this.val = val;
- }
-
- private IntValue(final ByteBuf buffer) {
- val = buffer.readInt();
- }
-
- @Override
- public Object getValue() {
- return val;
- }
-
- @Override
- public void write(final ByteBuf buffer) {
- buffer.writeByte(DataConstants.INT);
- buffer.writeInt(val);
- }
-
- @Override
- public int encodeSize() {
- return DataConstants.SIZE_BYTE + DataConstants.SIZE_INT;
- }
- }
-
- private static final class LongValue extends PropertyValue {
-
- final long val;
-
- private LongValue(final long val) {
- this.val = val;
- }
-
- private LongValue(final ByteBuf buffer) {
- val = buffer.readLong();
- }
-
- @Override
- public Object getValue() {
- return val;
- }
-
- @Override
- public void write(final ByteBuf buffer) {
- buffer.writeByte(DataConstants.LONG);
- buffer.writeLong(val);
- }
-
- @Override
- public int encodeSize() {
- return DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
- }
- }
-
- private static final class FloatValue extends PropertyValue {
-
- final float val;
-
- private FloatValue(final float val) {
- this.val = val;
- }
-
- private FloatValue(final ByteBuf buffer) {
- val = Float.intBitsToFloat(buffer.readInt());
- }
-
- @Override
- public Object getValue() {
- return val;
- }
-
- @Override
- public void write(final ByteBuf buffer) {
- buffer.writeByte(DataConstants.FLOAT);
- buffer.writeInt(Float.floatToIntBits(val));
- }
-
- @Override
- public int encodeSize() {
- return DataConstants.SIZE_BYTE + DataConstants.SIZE_FLOAT;
- }
-
- }
-
- private static final class DoubleValue extends PropertyValue {
-
- final double val;
-
- private DoubleValue(final double val) {
- this.val = val;
- }
-
- private DoubleValue(final ByteBuf buffer) {
- val = Double.longBitsToDouble(buffer.readLong());
- }
-
- @Override
- public Object getValue() {
- return val;
- }
-
- @Override
- public void write(final ByteBuf buffer) {
- buffer.writeByte(DataConstants.DOUBLE);
- buffer.writeLong(Double.doubleToLongBits(val));
- }
-
- @Override
- public int encodeSize() {
- return DataConstants.SIZE_BYTE + DataConstants.SIZE_DOUBLE;
- }
- }
-
- private static final class CharValue extends PropertyValue {
-
- final char val;
-
- private CharValue(final char val) {
- this.val = val;
- }
-
- private CharValue(final ByteBuf buffer) {
- val = (char) buffer.readShort();
- }
-
- @Override
- public Object getValue() {
- return val;
- }
-
- @Override
- public void write(final ByteBuf buffer) {
- buffer.writeByte(DataConstants.CHAR);
- buffer.writeShort((short) val);
- }
-
- @Override
- public int encodeSize() {
- return DataConstants.SIZE_BYTE + DataConstants.SIZE_CHAR;
- }
- }
-
- private static final class StringValue extends PropertyValue {
-
- final SimpleString val;
-
- private StringValue(final SimpleString val) {
- this.val = val;
- }
-
- private StringValue(final ByteBuf buffer) {
- val = SimpleString.readSimpleString(buffer);
- }
-
- @Override
- public Object getValue() {
- return val;
- }
-
- @Override
- public void write(final ByteBuf buffer) {
- buffer.writeByte(DataConstants.STRING);
- SimpleString.writeSimpleString(buffer, val);
- }
-
- @Override
- public int encodeSize() {
- return DataConstants.SIZE_BYTE + SimpleString.sizeofString(val);
- }
- }
-
- public boolean isEmpty() {
- return properties.isEmpty();
- }
-
- public Map<String, Object> getMap() {
- Map<String, Object> m = new HashMap<>();
- for (Entry<SimpleString, PropertyValue> entry : properties.entrySet()) {
- Object val = entry.getValue().getValue();
- if (val instanceof SimpleString) {
- m.put(entry.getKey().toString(), ((SimpleString) val).toString());
- } else {
- m.put(entry.getKey().toString(), val);
- }
- }
- return m;
- }
-
- /**
- * Helper for MapMessage#setObjectProperty(String, Object)
- *
- * @param key The SimpleString key
- * @param value The Object value
- * @param properties The typed properties
- */
- public static void setObjectProperty(final SimpleString key, final Object value, final TypedProperties properties) {
- if (value == null) {
- properties.putNullValue(key);
- } else if (value instanceof Boolean) {
- properties.putBooleanProperty(key, (Boolean) value);
- } else if (value instanceof Byte) {
- properties.putByteProperty(key, (Byte) value);
- } else if (value instanceof Character) {
- properties.putCharProperty(key, (Character) value);
- } else if (value instanceof Short) {
- properties.putShortProperty(key, (Short) value);
- } else if (value instanceof Integer) {
- properties.putIntProperty(key, (Integer) value);
- } else if (value instanceof Long) {
- properties.putLongProperty(key, (Long) value);
- } else if (value instanceof Float) {
- properties.putFloatProperty(key, (Float) value);
- } else if (value instanceof Double) {
- properties.putDoubleProperty(key, (Double) value);
- } else if (value instanceof String) {
- properties.putSimpleStringProperty(key, new SimpleString((String) value));
- } else if (value instanceof SimpleString) {
- properties.putSimpleStringProperty(key, (SimpleString) value);
- } else if (value instanceof byte[]) {
- properties.putBytesProperty(key, (byte[]) value);
- } else {
- throw new ActiveMQPropertyConversionException(value.getClass() + " is not a valid property type");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc26ac96/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentHashSet.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentHashSet.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentHashSet.java
new file mode 100644
index 0000000..21c1588
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentHashSet.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.AbstractSet;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A ConcurrentHashSet.
+ *
+ * Offers same concurrency as ConcurrentHashMap but for a Set
+ */
+public class ConcurrentHashSet<E> extends AbstractSet<E> implements ConcurrentSet<E> {
+
+ private final ConcurrentMap<E, Object> theMap;
+
+ private static final Object dummy = new Object();
+
+ public ConcurrentHashSet() {
+ theMap = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public int size() {
+ return theMap.size();
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ return theMap.keySet().iterator();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return theMap.isEmpty();
+ }
+
+ @Override
+ public boolean add(final E o) {
+ return theMap.put(o, ConcurrentHashSet.dummy) == null;
+ }
+
+ @Override
+ public boolean contains(final Object o) {
+ return theMap.containsKey(o);
+ }
+
+ @Override
+ public void clear() {
+ theMap.clear();
+ }
+
+ @Override
+ public boolean remove(final Object o) {
+ return theMap.remove(o) == ConcurrentHashSet.dummy;
+ }
+
+ @Override
+ public boolean addIfAbsent(final E o) {
+ Object obj = theMap.putIfAbsent(o, ConcurrentHashSet.dummy);
+
+ return obj == null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc26ac96/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentSet.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentSet.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentSet.java
new file mode 100644
index 0000000..bab7aa3
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentSet.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Set;
+
+/**
+ * A ConcurrentSet
+ *
+ * @param <E> The generic class
+ */
+public interface ConcurrentSet<E> extends Set<E> {
+
+ boolean addIfAbsent(E o);
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc26ac96/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedList.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedList.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedList.java
new file mode 100644
index 0000000..3a77c42
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedList.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+public interface LinkedList<E> {
+
+ void addHead(E e);
+
+ void addTail(E e);
+
+ E poll();
+
+ LinkedListIterator<E> iterator();
+
+ void clear();
+
+ int size();
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc26ac96/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
new file mode 100644
index 0000000..81b5d60
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.lang.reflect.Array;
+import java.util.NoSuchElementException;
+
+/**
+ * A linked list implementation which allows multiple iterators to exist at the same time on the queue, and which see any
+ * elements added or removed from the queue either directly or via iterators.
+ *
+ * This class is not thread safe.
+ */
+public class LinkedListImpl<E> implements LinkedList<E> {
+
+ private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10;
+
+ private final Node<E> head = new Node<>(null);
+
+ private Node<E> tail = null;
+
+ private int size;
+
+ // We store in an array rather than a Map for the best performance
+ private volatile Iterator[] iters;
+
+ private int numIters;
+
+ private int nextIndex;
+
+ public LinkedListImpl() {
+ iters = createIteratorArray(INITIAL_ITERATOR_ARRAY_SIZE);
+ }
+
+ @Override
+ public void addHead(E e) {
+ Node<E> node = new Node<>(e);
+
+ node.next = head.next;
+
+ node.prev = head;
+
+ head.next = node;
+
+ if (size == 0) {
+ tail = node;
+ } else {
+ // Need to set the previous element on the former head
+ node.next.prev = node;
+ }
+
+ size++;
+ }
+
+ @Override
+ public void addTail(E e) {
+ if (size == 0) {
+ addHead(e);
+ } else {
+ Node<E> node = new Node<>(e);
+
+ node.prev = tail;
+
+ tail.next = node;
+
+ tail = node;
+
+ size++;
+ }
+ }
+
+ @Override
+ public E poll() {
+ Node<E> ret = head.next;
+
+ if (ret != null) {
+ removeAfter(head);
+
+ return ret.val;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void clear() {
+ tail = head.next = null;
+
+ size = 0;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public LinkedListIterator<E> iterator() {
+ return new Iterator();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder str = new StringBuilder("LinkedListImpl [ ");
+
+ Node<E> node = head;
+
+ while (node != null) {
+ str.append(node.toString());
+
+ if (node.next != null) {
+ str.append(", ");
+ }
+
+ node = node.next;
+ }
+
+ return str.toString();
+ }
+
+ public int numIters() {
+ return numIters;
+ }
+
+ private Iterator[] createIteratorArray(int size) {
+ return (Iterator[]) Array.newInstance(Iterator.class, size);
+ }
+
+ private void removeAfter(Node<E> node) {
+ Node<E> toRemove = node.next;
+
+ node.next = toRemove.next;
+
+ if (toRemove.next != null) {
+ toRemove.next.prev = node;
+ }
+
+ if (toRemove == tail) {
+ tail = node;
+ }
+
+ size--;
+
+ if (toRemove.iterCount != 0) {
+ LinkedListImpl.this.nudgeIterators(toRemove);
+ }
+
+ //Help GC - otherwise GC potentially has to traverse a very long list to see if elements are reachable, this can result in OOM
+ //https://jira.jboss.org/browse/HORNETQ-469
+ toRemove.next = toRemove.prev = null;
+ }
+
+ private synchronized void nudgeIterators(Node<E> node) {
+ for (int i = 0; i < numIters; i++) {
+ Iterator iter = iters[i];
+ if (iter != null) {
+ iter.nudged(node);
+ }
+ }
+ }
+
+ private synchronized void addIter(Iterator iter) {
+ if (numIters == iters.length) {
+ resize(2 * numIters);
+ }
+
+ iters[nextIndex++] = iter;
+
+ numIters++;
+ }
+
+ private synchronized void resize(int newSize) {
+ Iterator[] newIters = createIteratorArray(newSize);
+
+ System.arraycopy(iters, 0, newIters, 0, numIters);
+
+ iters = newIters;
+ }
+
+ private synchronized void removeIter(Iterator iter) {
+ for (int i = 0; i < numIters; i++) {
+ if (iter == iters[i]) {
+ iters[i] = null;
+
+ if (i != numIters - 1) {
+ // Fill in the hole
+
+ System.arraycopy(iters, i + 1, iters, i, numIters - i - 1);
+ }
+
+ numIters--;
+
+ if (numIters >= INITIAL_ITERATOR_ARRAY_SIZE && numIters == iters.length / 2) {
+ resize(numIters);
+ }
+
+ nextIndex--;
+
+ return;
+ }
+ }
+
+ throw new IllegalStateException("Cannot find iter to remove");
+ }
+
+ private static final class Node<E> {
+
+ Node<E> next;
+
+ Node<E> prev;
+
+ final E val;
+
+ int iterCount;
+
+ Node(E e) {
+ val = e;
+ }
+
+ @Override
+ public String toString() {
+ return "Node, value = " + val;
+ }
+ }
+
+ private class Iterator implements LinkedListIterator<E> {
+
+ Node<E> last;
+
+ Node<E> current = head.next;
+
+ boolean repeat;
+
+ Iterator() {
+ if (current != null) {
+ current.iterCount++;
+ }
+
+ addIter(this);
+ }
+
+ @Override
+ public void repeat() {
+ repeat = true;
+ }
+
+ @Override
+ public boolean hasNext() {
+ Node<E> e = getNode();
+
+ if (e != null && (e != last || repeat)) {
+ return true;
+ }
+
+ return canAdvance();
+ }
+
+ @Override
+ public E next() {
+ Node<E> e = getNode();
+
+ if (repeat) {
+ repeat = false;
+
+ if (e != null) {
+ return e.val;
+ } else {
+ if (canAdvance()) {
+ advance();
+
+ e = getNode();
+
+ return e.val;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+ }
+
+ if (e == null || e == last) {
+ if (canAdvance()) {
+ advance();
+
+ e = getNode();
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ last = e;
+
+ repeat = false;
+
+ return e.val;
+ }
+
+ @Override
+ public void remove() {
+ if (last == null) {
+ throw new NoSuchElementException();
+ }
+
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+
+ LinkedListImpl.this.removeAfter(current.prev);
+
+ last = null;
+ }
+
+ @Override
+ public void close() {
+ removeIter(this);
+ }
+
+ public void nudged(Node<E> node) {
+ if (current == node) {
+ if (canAdvance()) {
+ advance();
+ } else {
+ if (current.prev != head) {
+ current.iterCount--;
+
+ current = current.prev;
+
+ current.iterCount++;
+ } else {
+ current = null;
+ }
+ }
+ }
+ }
+
+ private Node<E> getNode() {
+ if (current == null) {
+ current = head.next;
+
+ if (current != null) {
+ current.iterCount++;
+ }
+ }
+
+ if (current != null) {
+ return current;
+ } else {
+ return null;
+ }
+ }
+
+ private boolean canAdvance() {
+ if (current == null) {
+ current = head.next;
+
+ if (current != null) {
+ current.iterCount++;
+ }
+ }
+
+ return current != null && current.next != null;
+ }
+
+ private void advance() {
+ if (current == null || current.next == null) {
+ throw new NoSuchElementException();
+ }
+
+ current.iterCount--;
+
+ current = current.next;
+
+ current.iterCount++;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc26ac96/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListIterator.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListIterator.java
new file mode 100644
index 0000000..62ab097
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListIterator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * A LinkedListIterator
+ *
+ * This iterator allows the last element to be repeated in the next call to hasNext or next
+ */
+public interface LinkedListIterator<E> extends Iterator<E>, AutoCloseable {
+
+ void repeat();
+
+ @Override
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc26ac96/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java
new file mode 100644
index 0000000..79a99f3
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+/**
+ * A type of linked list which maintains items according to a priority
+ * and allows adding and removing of elements at both ends, and peeking
+ */
+public interface PriorityLinkedList<T> {
+
+ void addHead(T t, int priority);
+
+ void addTail(T t, int priority);
+
+ T poll();
+
+ void clear();
+
+ int size();
+
+ LinkedListIterator<T> iterator();
+
+ boolean isEmpty();
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc26ac96/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
new file mode 100644
index 0000000..39d6b6d
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.lang.reflect.Array;
+import java.util.NoSuchElementException;
+
+/**
+ * A priority linked list implementation
+ * <p>
+ * It implements this by maintaining an individual LinkedBlockingDeque for each priority level.
+ */
+public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T> {
+
+ protected LinkedListImpl<T>[] levels;
+
+ private int size;
+
+ private int lastReset;
+
+ private int highestPriority = -1;
+
+ private int lastPriority = -1;
+
+ public PriorityLinkedListImpl(final int priorities) {
+ levels = (LinkedListImpl<T>[]) Array.newInstance(LinkedListImpl.class, priorities);
+
+ for (int i = 0; i < priorities; i++) {
+ levels[i] = new LinkedListImpl<>();
+ }
+ }
+
+ private void checkHighest(final int priority) {
+ if (lastPriority != priority || priority > highestPriority) {
+ lastPriority = priority;
+ if (lastReset == Integer.MAX_VALUE) {
+ lastReset = 0;
+ } else {
+ lastReset++;
+ }
+ }
+
+ if (priority > highestPriority) {
+ highestPriority = priority;
+ }
+ }
+
+ @Override
+ public void addHead(final T t, final int priority) {
+ checkHighest(priority);
+
+ levels[priority].addHead(t);
+
+ size++;
+ }
+
+ @Override
+ public void addTail(final T t, final int priority) {
+ checkHighest(priority);
+
+ levels[priority].addTail(t);
+
+ size++;
+ }
+
+ @Override
+ public T poll() {
+ T t = null;
+
+ // We are just using a simple prioritization algorithm:
+ // Highest priority refs always get returned first.
+ // This could cause starvation of lower priority refs.
+
+ // TODO - A better prioritization algorithm
+
+ for (int i = highestPriority; i >= 0; i--) {
+ LinkedListImpl<T> ll = levels[i];
+
+ if (ll.size() != 0) {
+ t = ll.poll();
+
+ if (t != null) {
+ size--;
+
+ if (ll.size() == 0) {
+ if (highestPriority == i) {
+ highestPriority--;
+ }
+ }
+ }
+
+ break;
+ }
+ }
+
+ return t;
+ }
+
+ @Override
+ public void clear() {
+ for (LinkedListImpl<T> list : levels) {
+ list.clear();
+ }
+
+ size = 0;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ @Override
+ public LinkedListIterator<T> iterator() {
+ return new PriorityLinkedListIterator();
+ }
+
+ private class PriorityLinkedListIterator implements LinkedListIterator<T> {
+
+ private int index;
+
+ private final LinkedListIterator<T>[] cachedIters = new LinkedListIterator[levels.length];
+
+ private LinkedListIterator<T> lastIter;
+
+ private int resetCount = lastReset;
+
+ volatile boolean closed = false;
+
+ PriorityLinkedListIterator() {
+ index = levels.length - 1;
+ }
+
+ @Override
+ protected void finalize() {
+ close();
+ }
+
+ @Override
+ public void repeat() {
+ if (lastIter == null) {
+ throw new NoSuchElementException();
+ }
+
+ lastIter.repeat();
+ }
+
+ @Override
+ public void close() {
+ if (!closed) {
+ closed = true;
+ lastIter = null;
+
+ for (LinkedListIterator<T> iter : cachedIters) {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+ }
+ }
+
+ private void checkReset() {
+ if (lastReset != resetCount) {
+ index = highestPriority;
+
+ resetCount = lastReset;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ checkReset();
+
+ while (index >= 0) {
+ lastIter = cachedIters[index];
+
+ if (lastIter == null) {
+ lastIter = cachedIters[index] = levels[index].iterator();
+ }
+
+ boolean b = lastIter.hasNext();
+
+ if (b) {
+ return true;
+ }
+
+ index--;
+
+ if (index < 0) {
+ index = levels.length - 1;
+
+ break;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public T next() {
+ if (lastIter == null) {
+ throw new NoSuchElementException();
+ }
+
+ return lastIter.next();
+ }
+
+ @Override
+ public void remove() {
+ if (lastIter == null) {
+ throw new NoSuchElementException();
+ }
+
+ lastIter.remove();
+
+ // This next statement would be the equivalent of:
+ // if (index == highestPriority && levels[index].size() == 0)
+ // However we have to keep checking all the previous levels
+ // otherwise we would cache a max that will not exist
+ // what would make us eventually having hasNext() returning false
+ // as a bug
+ // Part of the fix for HORNETQ-705
+ for (int i = index; i >= 0 && levels[index].size() == 0; i--) {
+ highestPriority = i;
+ }
+
+ size--;
+ }
+ }
+}