You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/02 15:05:50 UTC
[16/29] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 548b62c..b997d80 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -17,10 +17,12 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -43,11 +45,11 @@ public class MQTTSessionCallback implements SessionCallback {
@Override
public int sendMessage(MessageReference reference,
- ServerMessage message,
+ Message message,
ServerConsumer consumer,
int deliveryCount) {
try {
- session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount);
+ session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount);
} catch (Exception e) {
log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
}
@@ -70,7 +72,7 @@ public class MQTTSessionCallback implements SessionCallback {
@Override
public int sendLargeMessage(MessageReference reference,
- ServerMessage message,
+ Message message,
ServerConsumer consumer,
long bodySize,
int deliveryCount) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 7bc6b84..6891497 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -28,8 +28,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
/**
* A Utility Class for creating Server Side objects and converting MQTT concepts to/from Artemis.
@@ -93,13 +92,13 @@ public class MQTTUtil {
return MQTT_RETAIN_ADDRESS_PREFIX + MQTT_WILDCARD.convert(filter, wildcardConfiguration);
}
- private static ServerMessage createServerMessage(MQTTSession session,
+ private static Message createServerMessage(MQTTSession session,
SimpleString address,
boolean retain,
int qos) {
long id = session.getServer().getStorageManager().generateID();
- ServerMessageImpl message = new ServerMessageImpl(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
+ CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
message.setAddress(address);
message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
@@ -107,21 +106,21 @@ public class MQTTUtil {
return message;
}
- public static ServerMessage createServerMessageFromByteBuf(MQTTSession session,
+ public static Message createServerMessageFromByteBuf(MQTTSession session,
String topic,
boolean retain,
int qos,
ByteBuf payload) {
String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
- ServerMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
+ Message message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
// FIXME does this involve a copy?
message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes());
return message;
}
- public static ServerMessage createPubRelMessage(MQTTSession session, SimpleString address, int messageId) {
- ServerMessage message = createServerMessage(session, address, false, 1);
+ public static Message createPubRelMessage(MQTTSession session, SimpleString address, int messageId) {
+ Message message = createServerMessage(session, address, false, 1);
message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId);
message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value());
return message;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 9b27b81..550a63a 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -36,11 +36,10 @@ import java.util.zip.InflaterOutputStream;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.utils.DataConstants;
@@ -102,16 +101,16 @@ public class OpenWireMessageConverter implements MessageConverter {
}
@Override
- public Object outbound(ServerMessage message, int deliveryCount) {
+ public Object outbound(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) {
// TODO: implement this
return null;
}
@Override
- public ServerMessage inbound(Object message) throws Exception {
+ public org.apache.activemq.artemis.api.core.Message inbound(Object message) throws Exception {
Message messageSend = (Message) message;
- ServerMessageImpl coreMessage = new ServerMessageImpl(-1, messageSend.getSize());
+ CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize());
String type = messageSend.getType();
if (type != null) {
@@ -157,7 +156,7 @@ public class OpenWireMessageConverter implements MessageConverter {
mdataIn.close();
TypedProperties props = new TypedProperties();
loadMapIntoProperties(props, map);
- props.encode(body);
+ props.encode(body.byteBuf());
break;
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
if (messageCompressed) {
@@ -415,8 +414,9 @@ public class OpenWireMessageConverter implements MessageConverter {
}
public static MessageDispatch createMessageDispatch(MessageReference reference,
- ServerMessage message,
+ org.apache.activemq.artemis.api.core.Message message,
AMQConsumer consumer) throws IOException, JMSException {
+ // TODO-now: use new Encode here
ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination());
//we can use core message id for sequenceId
@@ -433,7 +433,7 @@ public class OpenWireMessageConverter implements MessageConverter {
}
private static ActiveMQMessage toAMQMessage(MessageReference reference,
- ServerMessage coreMessage,
+ org.apache.activemq.artemis.api.core.Message coreMessage,
WireFormat marshaller,
ActiveMQDestination actualDestination) throws IOException {
ActiveMQMessage amqMsg = null;
@@ -476,7 +476,7 @@ public class OpenWireMessageConverter implements MessageConverter {
}
amqMsg.setBrokerInTime(brokerInTime);
- ActiveMQBuffer buffer = coreMessage.getBodyBufferDuplicate();
+ ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
amqMsg.setCompressed(isCompressed);
@@ -503,7 +503,7 @@ public class OpenWireMessageConverter implements MessageConverter {
TypedProperties mapData = new TypedProperties();
//it could be a null map
if (buffer.readableBytes() > 0) {
- mapData.decode(buffer);
+ mapData.decode(buffer.byteBuf());
Map<String, Object> map = mapData.getMap();
ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
OutputStream os = out;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index f471a2a..6f83c2d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
@@ -35,7 +36,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -208,7 +208,7 @@ public class AMQConsumer {
}
- public int handleDeliver(MessageReference reference, ServerMessage message, int deliveryCount) {
+ public int handleDeliver(MessageReference reference, Message message, int deliveryCount) {
MessageDispatch dispatch;
try {
if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) {
@@ -394,7 +394,7 @@ public class AMQConsumer {
}
}
- public boolean checkForcedConsumer(ServerMessage message) {
+ public boolean checkForcedConsumer(Message message) {
if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
if (next >= 0) {
if (timeout <= 0) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 79004ae..1b7ed43 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -36,7 +36,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.reader.MessageUtil;
@@ -231,7 +230,7 @@ public class AMQSession implements SessionCallback {
@Override
public int sendMessage(MessageReference reference,
- ServerMessage message,
+ org.apache.activemq.artemis.api.core.Message message,
ServerConsumer consumer,
int deliveryCount) {
AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
@@ -240,7 +239,7 @@ public class AMQSession implements SessionCallback {
@Override
public int sendLargeMessage(MessageReference reference,
- ServerMessage message,
+ org.apache.activemq.artemis.api.core.Message message,
ServerConsumer consumerID,
long bodySize,
int deliveryCount) {
@@ -296,7 +295,7 @@ public class AMQSession implements SessionCallback {
actualDestinations = new ActiveMQDestination[]{destination};
}
- ServerMessage originalCoreMsg = getConverter().inbound(messageSend);
+ org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend);
if (connection.isNoLocal()) {
//Note: advisory messages are dealt with in
@@ -324,7 +323,7 @@ public class AMQSession implements SessionCallback {
for (int i = 0; i < actualDestinations.length; i++) {
ActiveMQDestination dest = actualDestinations[i];
SimpleString address = new SimpleString(dest.getPhysicalName());
- ServerMessage coreMsg = originalCoreMsg.copy();
+ org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy();
coreMsg.setAddress(address);
if (actualDestinations[i].isQueue()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
index 5355c63..2686907 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -18,8 +18,9 @@ package org.apache.activemq.artemis.core.protocol.openwire.util;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -53,8 +54,8 @@ public class OpenWireUtil {
* set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the
* consumer
*/
- public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
- String address = message.getAddress().toString();
+ public static ActiveMQDestination toAMQAddress(Message message, ActiveMQDestination actualDestination) {
+ String address = message.getAddress();
String strippedAddress = address;//.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, "");
if (actualDestination.isQueue()) {
return new ActiveMQQueue(strippedAddress);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
index 861c524..d377abd 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.jboss.logging.Messages;
import org.jboss.logging.annotations.Cause;
import org.jboss.logging.annotations.Message;
@@ -71,7 +70,7 @@ public interface ActiveMQStompProtocolMessageBundle {
ActiveMQStompException invalidConnection();
@Message(id = 339011, value = "Error sending message {0}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQStompException errorSendMessage(ServerMessageImpl message, @Cause Exception e);
+ ActiveMQStompException errorSendMessage(org.apache.activemq.artemis.api.core.Message message, @Cause Exception e);
@Message(id = 339012, value = "Error beginning a transaction {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQStompException errorBeginTx(String txID, @Cause Exception e);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index c004a0e..c64c1ea 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -30,18 +30,18 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@@ -569,7 +569,7 @@ public final class StompConnection implements RemotingConnection {
return valid;
}
- public ServerMessageImpl createServerMessage() {
+ public CoreMessage createServerMessage() {
return manager.createServerMessage();
}
@@ -598,7 +598,7 @@ public final class StompConnection implements RemotingConnection {
}
}
- protected void sendServerMessage(ServerMessageImpl message, String txID) throws ActiveMQStompException {
+ protected void sendServerMessage(Message message, String txID) throws ActiveMQStompException {
StompSession stompSession = getSession(txID);
if (stompSession.isNoLocal()) {
@@ -611,7 +611,7 @@ public final class StompConnection implements RemotingConnection {
if (minLargeMessageSize == -1 || (message.getBodyBuffer().writerIndex() < minLargeMessageSize)) {
stompSession.sendInternal(message, false);
} else {
- stompSession.sendInternalLarge(message, false);
+ stompSession.sendInternalLarge((CoreMessage)message, false);
}
} catch (Exception e) {
throw BUNDLE.errorSendMessage(message, e).setHandler(frameHandler);
@@ -726,7 +726,7 @@ public final class StompConnection implements RemotingConnection {
return SERVER_NAME;
}
- public StompFrame createStompMessage(ServerMessage serverMessage,
+ public StompFrame createStompMessage(Message serverMessage,
StompSubscription subscription,
int deliveryCount) throws Exception {
return frameHandler.createMessageFrame(serverMessage, subscription, deliveryCount);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 54339a4..2be0be4 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -33,12 +33,12 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
@@ -345,8 +345,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
return validated;
}
- public ServerMessageImpl createServerMessage() {
- return new ServerMessageImpl(server.getStorageManager().generateID(), 512);
+ public CoreMessage createServerMessage() {
+ return new CoreMessage(server.getStorageManager().generateID(), 512);
}
public void commitTransaction(StompConnection connection, String txID) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 1e103e9..d2d42b7 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -28,20 +28,18 @@ import java.util.zip.Inflater;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -127,11 +125,13 @@ public class StompSession implements SessionCallback {
@Override
public int sendMessage(MessageReference ref,
- ServerMessage serverMessage,
+ Message serverMessage,
final ServerConsumer consumer,
int deliveryCount) {
+
+ //TODO-now: fix encoders
LargeServerMessageImpl largeMessage = null;
- ServerMessage newServerMessage = serverMessage;
+ Message newServerMessage = serverMessage;
try {
StompSubscription subscription = subscriptions.get(consumer.getID());
StompFrame frame = null;
@@ -139,20 +139,23 @@ public class StompSession implements SessionCallback {
newServerMessage = serverMessage.copy();
largeMessage = (LargeServerMessageImpl) serverMessage;
- BodyEncoder encoder = largeMessage.getBodyEncoder();
+ LargeBodyEncoder encoder = largeMessage.getBodyEncoder();
encoder.open();
int bodySize = (int) encoder.getLargeBodySize();
+ // TODO-now: Convert large mesasge body into the stomp message
//large message doesn't have a body.
- ((ServerMessageImpl) newServerMessage).createBody(bodySize);
- encoder.encode(newServerMessage.getBodyBuffer(), bodySize);
- encoder.close();
+ // ((Message) newServerMessage).createBody(bodySize);
+// encoder.encode(((ServerMessage)newServerMessage).getBodyBuffer(), bodySize);
+// encoder.close();
+
+ throw new RuntimeException("Large message body won't work with stomp now");
}
if (serverMessage.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
//decompress
ActiveMQBuffer qbuff = newServerMessage.getBodyBuffer();
- int bytesToRead = qbuff.writerIndex() - MessageImpl.BODY_OFFSET;
+ int bytesToRead = qbuff.writerIndex() - CoreMessage.BODY_OFFSET;
Inflater inflater = new Inflater();
inflater.setInput(ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer()));
@@ -219,7 +222,7 @@ public class StompSession implements SessionCallback {
@Override
public int sendLargeMessage(MessageReference ref,
- ServerMessage msg,
+ Message msg,
ServerConsumer consumer,
long bodySize,
int deliveryCount) {
@@ -370,11 +373,11 @@ public class StompSession implements SessionCallback {
this.noLocal = noLocal;
}
- public void sendInternal(ServerMessageImpl message, boolean direct) throws Exception {
+ public void sendInternal(Message message, boolean direct) throws Exception {
session.send(message, direct);
}
- public void sendInternalLarge(ServerMessageImpl message, boolean direct) throws Exception {
+ public void sendInternalLarge(CoreMessage message, boolean direct) throws Exception {
int headerSize = message.getHeadersAndPropertiesEncodeSize();
if (headerSize >= connection.getMinLargeMessageSize()) {
throw BUNDLE.headerTooBig();
@@ -384,7 +387,7 @@ public class StompSession implements SessionCallback {
long id = storageManager.generateID();
LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message);
- byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - MessageImpl.BODY_OFFSET];
+ byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - CoreMessage.BODY_OFFSET];
message.getBodyBuffer().readBytes(bytes);
largeMessage.addBytes(bytes);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
index affab84..7db9d82 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
@@ -24,8 +24,6 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
public class StompUtils {
@@ -37,7 +35,7 @@ public class StompUtils {
// Static --------------------------------------------------------
- public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, ServerMessageImpl msg) throws Exception {
+ public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, Message msg) throws Exception {
Map<String, String> headers = new HashMap<>(frame.getHeadersMap());
String priority = headers.remove(Stomp.Headers.Send.PRIORITY);
@@ -79,7 +77,7 @@ public class StompUtils {
}
}
- public static void copyStandardHeadersFromMessageToFrame(MessageInternal message,
+ public static void copyStandardHeadersFromMessageToFrame(Message message,
StompFrame command,
int deliveryCount) throws Exception {
command.addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(message.getMessageID()));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index f91ba82..8d13613 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -21,15 +21,13 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp.Headers;
import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.ExecutorFactory;
@@ -180,7 +178,7 @@ public abstract class VersionedStompFrameHandler {
long timestamp = System.currentTimeMillis();
- ServerMessageImpl message = connection.createServerMessage();
+ CoreMessage message = connection.createServerMessage();
if (routingType != null) {
message.putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType());
}
@@ -289,7 +287,7 @@ public abstract class VersionedStompFrameHandler {
return response;
}
- public StompFrame createMessageFrame(ServerMessage serverMessage,
+ public StompFrame createMessageFrame(Message serverMessage,
StompSubscription subscription,
int deliveryCount) throws Exception {
StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
@@ -298,11 +296,12 @@ public abstract class VersionedStompFrameHandler {
frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
}
- ActiveMQBuffer buffer = serverMessage.getBodyBufferDuplicate();
+ // TODO-now fix encoders
+ ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer();
- int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition();
+ int bodyPos = ((CoreMessage)serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : ((CoreMessage)serverMessage).getEndOfBodyPosition();
- buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+ buffer.readerIndex(CoreMessage.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
int size = bodyPos - buffer.readerIndex();
@@ -321,7 +320,7 @@ public abstract class VersionedStompFrameHandler {
}
frame.setByteBody(data);
- StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
+ StompUtils.copyStandardHeadersFromMessageToFrame((serverMessage), frame, deliveryCount);
return frame;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
index 6b211d2..b14605d 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
@@ -27,7 +28,6 @@ import org.apache.activemq.artemis.core.protocol.stomp.StompSubscription;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameV11;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@@ -48,7 +48,7 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 {
}
@Override
- public StompFrame createMessageFrame(ServerMessage serverMessage,
+ public StompFrame createMessageFrame(Message serverMessage,
StompSubscription subscription,
int deliveryCount) throws Exception {
StompFrame frame = super.createMessageFrame(serverMessage, subscription, deliveryCount);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 7881470..30d6668 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -750,10 +750,6 @@ public interface Configuration {
Configuration setLogJournalWriteRate(boolean rate);
- int getJournalPerfBlastPages();
-
- Configuration setJournalPerfBlastPages(int pages);
-
long getServerDumpInterval();
Configuration setServerDumpInterval(long interval);
@@ -766,10 +762,6 @@ public interface Configuration {
Configuration setMemoryMeasureInterval(long memoryMeasureInterval);
- boolean isRunSyncSpeedTest();
-
- Configuration setRunSyncSpeedTest(boolean run);
-
// Paging Properties --------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index f4eda91..329f654 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -193,10 +193,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
protected boolean logJournalWriteRate = ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate();
- protected int journalPerfBlastPages = ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages();
-
- protected boolean runSyncSpeedTest = ActiveMQDefaultConfiguration.isDefaultRunSyncSpeedTest();
-
private WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
private boolean messageCounterEnabled = ActiveMQDefaultConfiguration.isDefaultMessageCounterEnabled();
@@ -854,28 +850,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
}
@Override
- public int getJournalPerfBlastPages() {
- return journalPerfBlastPages;
- }
-
- @Override
- public ConfigurationImpl setJournalPerfBlastPages(final int journalPerfBlastPages) {
- this.journalPerfBlastPages = journalPerfBlastPages;
- return this;
- }
-
- @Override
- public boolean isRunSyncSpeedTest() {
- return runSyncSpeedTest;
- }
-
- @Override
- public ConfigurationImpl setRunSyncSpeedTest(final boolean run) {
- runSyncSpeedTest = run;
- return this;
- }
-
- @Override
public boolean isCreateBindingsDir() {
return createBindingsDir;
}
@@ -1556,7 +1530,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
result = prime * result + journalMaxIO_AIO;
result = prime * result + journalMaxIO_NIO;
result = prime * result + journalMinFiles;
- result = prime * result + journalPerfBlastPages;
result = prime * result + (journalSyncNonTransactional ? 1231 : 1237);
result = prime * result + (journalSyncTransactional ? 1231 : 1237);
result = prime * result + ((journalType == null) ? 0 : journalType.hashCode());
@@ -1580,7 +1553,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
result = prime * result + (persistIDCache ? 1231 : 1237);
result = prime * result + (persistenceEnabled ? 1231 : 1237);
result = prime * result + ((queueConfigurations == null) ? 0 : queueConfigurations.hashCode());
- result = prime * result + (runSyncSpeedTest ? 1231 : 1237);
result = prime * result + scheduledThreadPoolMaxSize;
result = prime * result + (securityEnabled ? 1231 : 1237);
result = prime * result + (populateValidatedUser ? 1231 : 1237);
@@ -1723,8 +1695,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
return false;
if (journalMinFiles != other.journalMinFiles)
return false;
- if (journalPerfBlastPages != other.journalPerfBlastPages)
- return false;
if (journalSyncNonTransactional != other.journalSyncNonTransactional)
return false;
if (journalSyncTransactional != other.journalSyncTransactional)
@@ -1793,8 +1763,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
return false;
} else if (!queueConfigurations.equals(other.queueConfigurations))
return false;
- if (runSyncSpeedTest != other.runSyncSpeedTest)
- return false;
if (scheduledThreadPoolMaxSize != other.scheduledThreadPoolMaxSize)
return false;
if (securityEnabled != other.securityEnabled)
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index cea0598..4055b5c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -548,10 +548,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setLogJournalWriteRate(getBoolean(e, "log-journal-write-rate", ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate()));
- config.setJournalPerfBlastPages(getInteger(e, "perf-blast-pages", ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages(), Validators.MINUS_ONE_OR_GT_ZERO));
-
- config.setRunSyncSpeedTest(getBoolean(e, "run-sync-speed-test", config.isRunSyncSpeedTest()));
-
if (e.hasAttribute("wild-card-routing-enabled")) {
config.setWildcardRoutingEnabled(getBoolean(e, "wild-card-routing-enabled", config.isWildcardRoutingEnabled()));
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
index 41d5e54..3737e19 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
@@ -16,8 +16,8 @@
*/
package org.apache.activemq.artemis.core.filter;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerMessage;
public interface Filter {
@@ -31,7 +31,7 @@ public interface Filter {
*/
String GENERIC_IGNORED_FILTER = "__AMQX=-1";
- boolean match(ServerMessage message);
+ boolean match(Message message);
SimpleString getFilterString();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
index 0a459c9..9d321c7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
@@ -18,11 +18,11 @@ package org.apache.activemq.artemis.core.filter.impl;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.FilterConstants;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.selector.filter.BooleanExpression;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.filter.Filterable;
@@ -103,7 +103,7 @@ public class FilterImpl implements Filter {
}
@Override
- public synchronized boolean match(final ServerMessage message) {
+ public synchronized boolean match(final Message message) {
try {
boolean result = booleanExpression.matches(new FilterableServerMessage(message));
return result;
@@ -148,7 +148,7 @@ public class FilterImpl implements Filter {
// Private --------------------------------------------------------------------------
- private static Object getHeaderFieldValue(final ServerMessage msg, final SimpleString fieldName) {
+ private static Object getHeaderFieldValue(final Message msg, final SimpleString fieldName) {
if (FilterConstants.ACTIVEMQ_USERID.equals(fieldName)) {
if (msg.getUserID() == null) {
// Proton stores JMSMessageID as NATIVE_MESSAGE_ID that is an arbitrary string
@@ -178,9 +178,9 @@ public class FilterImpl implements Filter {
private static class FilterableServerMessage implements Filterable {
- private final ServerMessage message;
+ private final Message message;
- private FilterableServerMessage(ServerMessage message) {
+ private FilterableServerMessage(Message message) {
this.message = message;
}
@@ -191,7 +191,7 @@ public class FilterImpl implements Filter {
result = getHeaderFieldValue(message, new SimpleString(id));
}
if (result == null) {
- result = message.getObjectProperty(new SimpleString(id));
+ result = message.getObjectProperty(id);
}
if (result != null) {
if (result.getClass() == SimpleString.class) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index 09dd702..31e056c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -25,10 +25,12 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -40,9 +42,7 @@ import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.security.SecurityStore;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -282,7 +282,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
return null;
}
});
- ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50);
+ CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
for (String header : headers.keySet()) {
message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 4b84909..5ecea64 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -39,7 +39,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -53,8 +53,6 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -609,7 +607,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
try {
Filter singleMessageFilter = new Filter() {
@Override
- public boolean match(ServerMessage message) {
+ public boolean match(Message message) {
return message.getMessageID() == messageID;
}
@@ -738,7 +736,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
return null;
}
});
- ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50);
+ CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
for (String header : headers.keySet()) {
message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
}
@@ -755,7 +753,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
message.setAddress(queue.getAddress());
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putLong(queue.getID());
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+ message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
postOffice.route(message, true);
return "" + message.getMessageID();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
index ec6848b..9f36b7f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
@@ -35,7 +35,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
public final class OpenTypeSupport {
@@ -128,6 +128,7 @@ public final class OpenTypeSupport {
public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
Map<String, Object> rc = new HashMap<>();
+ // TODO-now: fix this
Message m = ref.getMessage();
rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
if (m.getUserID() != null) {
@@ -143,6 +144,11 @@ public final class OpenTypeSupport {
rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1);
+ ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
+ byte[] bytes = new byte[bodyCopy.readableBytes()];
+ bodyCopy.readBytes(bytes);
+ rc.put(CompositeDataConstants.BODY, bytes);
+
Map<String, Object> propertyMap = m.toPropertyMap();
rc.put(CompositeDataConstants.PROPERTIES, "" + propertyMap);
@@ -264,8 +270,8 @@ public final class OpenTypeSupport {
@Override
public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
Map<String, Object> rc = super.getFields(ref);
- ServerMessage m = ref.getMessage();
- ActiveMQBuffer bodyCopy = m.getBodyBufferDuplicate();
+ Message m = ref.getMessage();
+ ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
byte[] bytes = new byte[bodyCopy.readableBytes()];
bodyCopy.readBytes(bytes);
rc.put(CompositeDataConstants.BODY, bytes);
@@ -285,8 +291,8 @@ public final class OpenTypeSupport {
@Override
public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
Map<String, Object> rc = super.getFields(ref);
- ServerMessage m = ref.getMessage();
- SimpleString text = m.getBodyBuffer().copy().readNullableSimpleString();
+ Message m = ref.getMessage();
+ SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString();
rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : "");
return rc;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
index 9b1e243..b3d8adb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
@@ -16,9 +16,10 @@
*/
package org.apache.activemq.artemis.core.paging;
+
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.server.ServerMessage;
/**
* A Paged message.
@@ -28,7 +29,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
*/
public interface PagedMessage extends EncodingSupport {
- ServerMessage getMessage();
+ Message getMessage();
/**
* The queues that were routed during paging
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 5ead1a2..a7de713 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -20,13 +20,15 @@ import java.io.File;
import java.util.Collection;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessageListener;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -41,7 +43,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
*
* @see PagingManager
*/
-public interface PagingStore extends ActiveMQComponent {
+public interface PagingStore extends ActiveMQComponent, RefCountMessageListener {
SimpleString getAddress();
@@ -90,7 +92,7 @@ public interface PagingStore extends ActiveMQComponent {
* needs to be sent to the journal
* @throws NullPointerException if {@code readLock} is null
*/
- boolean page(ServerMessage message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception;
+ boolean page(Message message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception;
Page createPage(final int page) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 768b43f..823eef4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -20,11 +20,11 @@ import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message;
+
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.jboss.logging.Logger;
@@ -41,7 +41,7 @@ public class PagedReferenceImpl implements PagedReference {
private int persistedCount;
- private int messageEstimate;
+ private int messageEstimate = -1;
private Long consumerId;
@@ -64,7 +64,7 @@ public class PagedReferenceImpl implements PagedReference {
}
@Override
- public ServerMessage getMessage() {
+ public Message getMessage() {
return getPagedMessage().getMessage();
}
@@ -93,12 +93,6 @@ public class PagedReferenceImpl implements PagedReference {
final PagedMessage message,
final PageSubscription subscription) {
this.position = position;
-
- if (message == null) {
- this.messageEstimate = -1;
- } else {
- this.messageEstimate = message.getMessage().getMemoryEstimate();
- }
this.message = new WeakReference<>(message);
this.subscription = subscription;
}
@@ -120,7 +114,7 @@ public class PagedReferenceImpl implements PagedReference {
@Override
public int getMessageMemoryEstimate() {
- if (messageEstimate < 0) {
+ if (messageEstimate <= 0) {
try {
messageEstimate = getMessage().getMemoryEstimate();
} catch (Throwable e) {
@@ -139,7 +133,7 @@ public class PagedReferenceImpl implements PagedReference {
public long getScheduledDeliveryTime() {
if (deliveryTime == null) {
try {
- ServerMessage msg = getMessage();
+ Message msg = getMessage();
if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index c40d20d..ab10eb4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
@@ -50,7 +51,6 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
@@ -772,7 +772,7 @@ final class PageSubscriptionImpl implements PageSubscription {
// Protected -----------------------------------------------------
- private boolean match(final ServerMessage message) {
+ private boolean match(final Message message) {
if (filter == null) {
return true;
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 4993d0c..aabec54 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -132,7 +132,7 @@ public final class Page implements Comparable<Page> {
int messageSize = fileBuffer.readInt();
int oldPos = fileBuffer.readerIndex();
if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == Page.END_BYTE) {
- PagedMessage msg = new PagedMessageImpl();
+ PagedMessage msg = new PagedMessageImpl(storageManager);
msg.decode(fileBuffer);
byte b = fileBuffer.readByte();
if (b != Page.END_BYTE) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
index e40d107..d50dd2e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
@@ -19,12 +19,12 @@ package org.apache.activemq.artemis.core.paging.impl;
import java.util.Arrays;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.utils.DataConstants;
/**
@@ -38,39 +38,37 @@ public class PagedMessageImpl implements PagedMessage {
*/
private byte[] largeMessageLazyData;
- private ServerMessage message;
+ private Message message;
private long[] queueIDs;
private long transactionID = 0;
- public PagedMessageImpl(final ServerMessage message, final long[] queueIDs, final long transactionID) {
+ private volatile StorageManager storageManager;
+
+ public PagedMessageImpl(final Message message, final long[] queueIDs, final long transactionID) {
this(message, queueIDs);
this.transactionID = transactionID;
}
- public PagedMessageImpl(final ServerMessage message, final long[] queueIDs) {
+ public PagedMessageImpl(final Message message, final long[] queueIDs) {
this.queueIDs = queueIDs;
this.message = message;
}
- public PagedMessageImpl() {
+ public PagedMessageImpl(StorageManager storageManager) {
+ this.storageManager = storageManager;
}
@Override
- public ServerMessage getMessage() {
+ public Message getMessage() {
return message;
}
@Override
public void initMessage(StorageManager storage) {
if (largeMessageLazyData != null) {
- LargeServerMessage lgMessage = storage.createLargeMessage();
- ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(largeMessageLazyData);
- lgMessage.decodeHeadersAndProperties(buffer);
- lgMessage.incrementDelayDeletionCount();
- lgMessage.setPaged();
- message = lgMessage;
+ // TODO-now: use the largeMessagePersister
largeMessageLazyData = null;
}
}
@@ -96,15 +94,15 @@ public class PagedMessageImpl implements PagedMessage {
if (isLargeMessage) {
int largeMessageHeaderSize = buffer.readInt();
- largeMessageLazyData = new byte[largeMessageHeaderSize];
-
- buffer.readBytes(largeMessageLazyData);
+ if (storageManager == null) {
+ largeMessageLazyData = new byte[largeMessageHeaderSize];
+ buffer.readBytes(largeMessageLazyData);
+ } else {
+ this.message = storageManager.createLargeMessage();
+ LargeMessagePersister.getInstance().decode(buffer, (LargeServerMessage) message);
+ }
} else {
- buffer.readInt(); // This value is only used on LargeMessages for now
-
- message = new ServerMessageImpl(-1, 50);
-
- message.decode(buffer);
+ this.message = MessagePersister.getInstance().decode(buffer, null);
}
int queueIDsSize = buffer.readInt();
@@ -120,11 +118,16 @@ public class PagedMessageImpl implements PagedMessage {
public void encode(final ActiveMQBuffer buffer) {
buffer.writeLong(transactionID);
- buffer.writeBoolean(message instanceof LargeServerMessage);
+ boolean isLargeMessage = isLargeMessage();
- buffer.writeInt(message.getEncodeSize());
+ buffer.writeBoolean(isLargeMessage);
- message.encode(buffer);
+ if (isLargeMessage) {
+ buffer.writeInt(LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message));
+ LargeMessagePersister.getInstance().encode(buffer, (LargeServerMessage) message);
+ } else {
+ message.getPersister().encode(buffer, message);
+ }
buffer.writeInt(queueIDs.length);
@@ -133,10 +136,19 @@ public class PagedMessageImpl implements PagedMessage {
}
}
+ private boolean isLargeMessage() {
+ return message.isLargeMessage();
+ }
+
@Override
public int getEncodeSize() {
- return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize() +
- DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
+ if (isLargeMessage()) {
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message) +
+ DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
+ } else {
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + message.getPersister().getEncodeSize(message) +
+ DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 4e57c85..e39fe40 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@@ -54,7 +55,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
+import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -699,7 +701,6 @@ public class PagingStoreImpl implements PagingStore {
@Override
public void addSize(final int size) {
-
boolean globalFull = pagingManager.addSize(size).isGlobalFull();
long newSize = sizeInBytes.addAndGet(size);
@@ -747,7 +748,7 @@ public class PagingStoreImpl implements PagingStore {
}
@Override
- public boolean page(ServerMessage message,
+ public boolean page(Message message,
final Transaction tx,
RouteContextList listCtx,
final ReadLock managerLock) throws Exception {
@@ -806,11 +807,12 @@ public class PagingStoreImpl implements PagingStore {
return false;
}
- if (!message.isDurable()) {
- // The address should never be transient when paging (even for non-persistent messages when paging)
- // This will force everything to be persisted
- message.forceAddress(address);
- }
+ message.setAddress(address);
+// if (!message.isDurable()) {
+// // The address should never be transient when paging (even for non-persistent messages when paging)
+// // This will force everything to be persisted
+// message.forceAddress(address);
+// }
final long transactionID = tx == null ? -1 : tx.getID();
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);
@@ -920,6 +922,40 @@ public class PagingStoreImpl implements PagingStore {
}
+ @Override
+ public void durableDown(Message message, int durableCount) {
+ }
+
+ @Override
+ public void durableUp(Message message, int durableCount) {
+ }
+
+ @Override
+ public void nonDurableUp(Message message, int count) {
+ if (count == 1) {
+ this.addSize(message.getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate());
+ } else {
+ this.addSize(MessageReferenceImpl.getMemoryEstimate());
+ }
+ }
+
+ @Override
+ public void nonDurableDown(Message message, int count) {
+ if (count < 0) {
+ // this could happen on paged messages since they are not routed and incrementRefCount is never called
+ return;
+ }
+
+ if (count == 0) {
+ this.addSize(-message.getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate());
+
+ } else {
+ this.addSize(-MessageReferenceImpl.getMemoryEstimate());
+ }
+
+
+ }
+
private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception {
FinishPageMessageOperation pgOper = (FinishPageMessageOperation) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
if (pgOper == null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index b45775c..e27ed30 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -23,13 +23,13 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
@@ -45,7 +45,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -172,7 +171,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
*/
void confirmPendingLargeMessage(long recordID) throws Exception;
- void storeMessage(ServerMessage message) throws Exception;
+ void storeMessage(Message message) throws Exception;
void storeReference(long queueID, long messageID, boolean last) throws Exception;
@@ -190,7 +189,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
void deleteDuplicateID(long recordID) throws Exception;
- void storeMessageTransactional(long txID, ServerMessage message) throws Exception;
+ void storeMessageTransactional(long txID, Message message) throws Exception;
void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception;
@@ -225,7 +224,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
* @return a large message object
* @throws Exception
*/
- LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception;
+ LargeServerMessage createLargeMessage(long id, Message message) throws Exception;
enum LargeMessageExtension {
DURABLE(".msg"), TEMPORARY(".tmp"), SYNC(".sync");
@@ -265,11 +264,6 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception;
- /**
- * FIXME Unused
- */
- void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception;
-
void deletePageTransactional(long recordID) throws Exception;
JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
@@ -383,7 +377,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
* needs to be sent to the journal
* @throws Exception
*/
- boolean addToPage(PagingStore store, ServerMessage msg, Transaction tx, RouteContextList listCtx) throws Exception;
+ boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception;
/**
* Stops the replication of data from the live to the backup.