You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2018/01/17 08:33:56 UTC
[2/6] activemq-artemis git commit: ARTEMIS-1586 Refactor to make more
generic
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 75faa97..57865b7 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -154,7 +154,7 @@ public class AMQConsumer {
}
addressInfo.setInternal(internalAddress);
if (isDurable) {
- queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName));
+ queueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName);
QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
if (result.isExists()) {
// Already exists
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 64d1353..bca7eae 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -27,6 +27,7 @@ import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
@@ -82,6 +83,8 @@ public class AMQSession implements SessionCallback {
private final OpenWireProtocolManager protocolManager;
+ private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+
public AMQSession(ConnectionInfo connInfo,
SessionInfo sessInfo,
ActiveMQServer server,
@@ -295,7 +298,7 @@ public class AMQSession implements SessionCallback {
}
@Override
- public void disconnect(ServerConsumer consumerId, String queueName) {
+ public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
// TODO Auto-generated method stub
}
@@ -315,7 +318,7 @@ public class AMQSession implements SessionCallback {
actualDestinations = new ActiveMQDestination[]{destination};
}
- org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend);
+ org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId());
@@ -338,12 +341,12 @@ public class AMQSession implements SessionCallback {
for (int i = 0; i < actualDestinations.length; i++) {
ActiveMQDestination dest = actualDestinations[i];
- SimpleString address = new SimpleString(dest.getPhysicalName());
+ SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy();
coreMsg.setAddress(address);
if (actualDestinations[i].isQueue()) {
- checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
+ checkAutoCreateQueue(SimpleString.toSimpleString(actualDestinations[i].getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()), actualDestinations[i].isTemporary());
coreMsg.setRoutingType(RoutingType.ANYCAST);
} else {
coreMsg.setRoutingType(RoutingType.MULTICAST);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 18e0b10..0a12b47 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -239,7 +239,7 @@ public class StompSession implements SessionCallback {
}
@Override
- public void disconnect(ServerConsumer consumerId, String queueName) {
+ public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
StompSubscription stompSubscription = subscriptions.remove(consumerId.getID());
if (stompSubscription != null) {
StompFrame frame = connection.getFrameHandler().createStompFrame(Stomp.Responses.ERROR);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
index 9133cdf..f343ec9 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
@@ -111,7 +111,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
// Create the message consumer
SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector);
if (activation.isTopic() && spec.isSubscriptionDurable()) {
- SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName()));
+ SimpleString queueName = ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName());
QueueQuery subResponse = session.queueQuery(queueName);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index 2276fdb..d38f45f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.protocol;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
@@ -54,7 +53,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
-import org.apache.activemq.artemis.utils.collections.TypedProperties;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@@ -85,34 +83,15 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
public class ServerPacketDecoder extends ClientPacketDecoder {
- private static final int UUID_LENGTH = 36;
- private static final int DEFAULT_INTERNER_CAPACITY = 32;
private static final long serialVersionUID = 3348673114388400766L;
- private SimpleString.Interner keysInterner;
- private TypedProperties.StringValue.Interner valuesInterner;
-
- public ServerPacketDecoder() {
- this.keysInterner = null;
- this.valuesInterner = null;
- }
-
- private void initializeInternersIfNeeded() {
- if (this.keysInterner == null) {
- this.keysInterner = new SimpleString.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
- }
- if (this.valuesInterner == null) {
- this.valuesInterner = new TypedProperties.StringValue.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
- }
- }
private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final SessionSendMessage sendMessage;
- initializeInternersIfNeeded();
if (connection.isVersionBeforeAddressChange()) {
- sendMessage = new SessionSendMessage_1X(new CoreMessage(this.keysInterner, this.valuesInterner));
+ sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools));
} else {
- sendMessage = new SessionSendMessage(new CoreMessage(this.keysInterner, this.valuesInterner));
+ sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools));
}
sendMessage.decode(in);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 8b281eb..f53d028 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler;
@@ -48,6 +49,8 @@ public final class CoreSessionCallback implements SessionCallback {
private ServerSessionPacketHandler handler;
+ private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+
public CoreSessionCallback(String name,
ProtocolManager protocolManager,
Channel channel,
@@ -115,9 +118,9 @@ public final class CoreSessionCallback implements SessionCallback {
Packet packet;
if (channel.getConnection().isVersionBeforeAddressChange()) {
- packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(), deliveryCount);
+ packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount);
} else {
- packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount);
+ packet = new SessionReceiveMessage(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount);
}
int size = 0;
@@ -159,11 +162,11 @@ public final class CoreSessionCallback implements SessionCallback {
}
@Override
- public void disconnect(ServerConsumer consumerId, String queueName) {
+ public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
if (channel.supports(PacketImpl.DISCONNECT_CONSUMER)) {
channel.send(new DisconnectConsumerMessage(consumerId.getID()));
} else {
- ActiveMQServerLogger.LOGGER.warnDisconnectOldClient(queueName);
+ ActiveMQServerLogger.LOGGER.warnDisconnectOldClient(queueName.toString());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 5dc1b93..15b1465 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -1045,7 +1045,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public void disconnect() {
- callback.disconnect(this, getQueue().getName().toString());
+ callback.disconnect(this, getQueue().getName());
}
public float getRate() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
index a440e31..2c81343 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
@@ -18,10 +18,11 @@ package org.apache.activemq.artemis.spi.core.protocol;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
public interface MessageConverter<ProtocolMessage extends Message> {
- ICoreMessage toCore(ProtocolMessage pureMessage) throws Exception;
+ ICoreMessage toCore(ProtocolMessage pureMessage, CoreMessageObjectPools coreMessageObjectPools) throws Exception;
ProtocolMessage fromCore(ICoreMessage coreMessage) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index ae1612f..c4a2dbe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -81,7 +81,7 @@ public interface SessionCallback {
void closed();
- void disconnect(ServerConsumer consumerId, String queueName);
+ void disconnect(ServerConsumer consumerId, SimpleString queueName);
boolean isWritable(ReadyListener callback, Object protocolContext);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 2707190..5cfac12 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
@@ -334,6 +335,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
@Override
public CoreMessage toCore() {
+ return toCore(null);
+ }
+
+ @Override
+ public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
return null;
}
@@ -591,6 +597,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public Message putStringProperty(SimpleString key, String value) {
+ return null;
+ }
+
+ @Override
public Message putStringProperty(String key, String value) {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index d7c9855..078c397 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -386,6 +387,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
@Override
public ICoreMessage toCore() {
+ return toCore(null);
+ }
+
+ @Override
+ public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
return null;
}
@@ -648,6 +654,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
}
@Override
+ public Message putStringProperty(SimpleString key, String value) {
+ return null;
+ }
+
+ @Override
public Message putStringProperty(String key, String value) {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 2f25480..dc57a12 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -585,7 +585,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
}
@Override
- public void disconnect(ServerConsumer consumerId, String queueName) {
+ public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
//To change body of implemented methods use File | Settings | File Templates.
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
index 790ed82..aaf29b0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
@@ -128,7 +128,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
msg.putStringProperty("myNonAsciiStringProperty", international.toString());
msg.putStringProperty("mySpecialCharacters", special);
msg.putStringProperty(new SimpleString("mySimpleStringProperty"), new SimpleString("mySimpleStringPropertyValue_" + i));
- msg.putStringProperty(new SimpleString("myNullSimpleStringProperty"), null);
+ msg.putStringProperty(new SimpleString("myNullSimpleStringProperty"), (SimpleString) null);
producer.send(msg);
}