You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/05 16:50:08 UTC
[07/17] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
new file mode 100644
index 0000000..186900b
--- /dev/null
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.openwire;
+
+import java.util.Set;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessageListener;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.Persister;
+
+// TODO: Implement this
+public class OpenwireMessage implements Message {
+
+ @Override
+ public boolean containsProperty(SimpleString key) {
+ return false;
+ }
+
+ @Override
+ public void messageChanged() {
+
+ }
+
+ @Override
+ public RoutingType getRouteType() {
+ return null;
+ }
+
+ @Override
+ public SimpleString getReplyTo() {
+ return null;
+ }
+
+ @Override
+ public Message setReplyTo(SimpleString address) {
+ return null;
+ }
+
+ @Override
+ public boolean containsDeliveryAnnotationProperty(SimpleString property) {
+ return false;
+ }
+
+ @Override
+ public Object removeDeliveryAnnoationProperty(SimpleString key) {
+ return null;
+ }
+
+ @Override
+ public Object getDeliveryAnnotationProperty(SimpleString key) {
+ return null;
+ }
+
+ @Override
+ public Long getScheduledDeliveryTime() {
+ return null;
+ }
+
+ @Override
+ public RefCountMessageListener getContext() {
+ return null;
+ }
+
+ @Override
+ public Message setContext(RefCountMessageListener context) {
+ return null;
+ }
+
+ @Override
+ public Message setBuffer(ByteBuf buffer) {
+ return null;
+ }
+
+ @Override
+ public ByteBuf getBuffer() {
+ return null;
+ }
+
+ @Override
+ public Message copy() {
+ return null;
+ }
+
+ @Override
+ public Message copy(long newID) {
+ return null;
+ }
+
+ @Override
+ public long getMessageID() {
+ return 0;
+ }
+
+ @Override
+ public Message setMessageID(long id) {
+ return null;
+ }
+
+ @Override
+ public long getExpiration() {
+ return 0;
+ }
+
+ @Override
+ public Message setExpiration(long expiration) {
+ return null;
+ }
+
+ @Override
+ public Object getUserID() {
+ return null;
+ }
+
+ @Override
+ public Message setUserID(Object userID) {
+ return null;
+ }
+
+ @Override
+ public void copyHeadersAndProperties(Message msg) {
+
+ }
+
+ @Override
+ public boolean isDurable() {
+ return false;
+ }
+
+ @Override
+ public Message setDurable(boolean durable) {
+ return null;
+ }
+
+ @Override
+ public Persister<Message> getPersister() {
+ return null;
+ }
+
+ @Override
+ public String getAddress() {
+ return null;
+ }
+
+ @Override
+ public Message setAddress(String address) {
+ return null;
+ }
+
+ @Override
+ public SimpleString getAddressSimpleString() {
+ return null;
+ }
+
+ @Override
+ public Message setAddress(SimpleString address) {
+ return null;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return 0;
+ }
+
+ @Override
+ public Message setTimestamp(long timestamp) {
+ return null;
+ }
+
+ @Override
+ public byte getPriority() {
+ return 0;
+ }
+
+ @Override
+ public Message setPriority(byte priority) {
+ return null;
+ }
+
+ @Override
+ public void receiveBuffer(ByteBuf buffer) {
+
+ }
+
+ @Override
+ public void sendBuffer(ByteBuf buffer, int deliveryCount) {
+
+ }
+
+ @Override
+ public int getPersistSize() {
+ return 0;
+ }
+
+ @Override
+ public void persist(ActiveMQBuffer targetRecord) {
+
+ }
+
+ @Override
+ public void reloadPersistence(ActiveMQBuffer record) {
+
+ }
+
+ @Override
+ public Message putBooleanProperty(String key, boolean value) {
+ return null;
+ }
+
+ @Override
+ public Message putByteProperty(String key, byte value) {
+ return null;
+ }
+
+ @Override
+ public Message putBytesProperty(String key, byte[] value) {
+ return null;
+ }
+
+ @Override
+ public Message putShortProperty(String key, short value) {
+ return null;
+ }
+
+ @Override
+ public Message putCharProperty(String key, char value) {
+ return null;
+ }
+
+ @Override
+ public Message putIntProperty(String key, int value) {
+ return null;
+ }
+
+ @Override
+ public Message putLongProperty(String key, long value) {
+ return null;
+ }
+
+ @Override
+ public Message putFloatProperty(String key, float value) {
+ return null;
+ }
+
+ @Override
+ public Message putDoubleProperty(String key, double value) {
+ return null;
+ }
+
+ @Override
+ public Message putBooleanProperty(SimpleString key, boolean value) {
+ return null;
+ }
+
+ @Override
+ public Message putByteProperty(SimpleString key, byte value) {
+ return null;
+ }
+
+ @Override
+ public Message putBytesProperty(SimpleString key, byte[] value) {
+ return null;
+ }
+
+ @Override
+ public Message putShortProperty(SimpleString key, short value) {
+ return null;
+ }
+
+ @Override
+ public Message putCharProperty(SimpleString key, char value) {
+ return null;
+ }
+
+ @Override
+ public Message putIntProperty(SimpleString key, int value) {
+ return null;
+ }
+
+ @Override
+ public Message putLongProperty(SimpleString key, long value) {
+ return null;
+ }
+
+ @Override
+ public Message putFloatProperty(SimpleString key, float value) {
+ return null;
+ }
+
+ @Override
+ public Message putDoubleProperty(SimpleString key, double value) {
+ return null;
+ }
+
+ @Override
+ public Message putStringProperty(String key, String value) {
+ return null;
+ }
+
+ @Override
+ public Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Object removeProperty(String key) {
+ return null;
+ }
+
+ @Override
+ public boolean containsProperty(String key) {
+ return false;
+ }
+
+ @Override
+ public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Object getObjectProperty(String key) {
+ return null;
+ }
+
+ @Override
+ public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public String getStringProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException {
+ return new byte[0];
+ }
+
+ @Override
+ public Object removeProperty(SimpleString key) {
+ return null;
+ }
+
+ @Override
+ public Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Object getObjectProperty(SimpleString key) {
+ return null;
+ }
+
+ @Override
+ public Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return new byte[0];
+ }
+
+ @Override
+ public Message putStringProperty(SimpleString key, SimpleString value) {
+ return null;
+ }
+
+ @Override
+ public int getEncodeSize() {
+ return 0;
+ }
+
+ @Override
+ public Set<SimpleString> getPropertyNames() {
+ return null;
+ }
+
+ @Override
+ public int getRefCount() {
+ return 0;
+ }
+
+ @Override
+ public int incrementRefCount() throws Exception {
+ return 0;
+ }
+
+ @Override
+ public int decrementRefCount() throws Exception {
+ return 0;
+ }
+
+ @Override
+ public int incrementDurableRefCount() {
+ return 0;
+ }
+
+ @Override
+ public int decrementDurableRefCount() {
+ return 0;
+ }
+
+ @Override
+ public ICoreMessage toCore() {
+ return null;
+ }
+
+ @Override
+ public int getMemoryEstimate() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index f471a2a..3bdee8b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -27,15 +27,16 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -208,7 +209,7 @@ public class AMQConsumer {
}
- public int handleDeliver(MessageReference reference, ServerMessage message, int deliveryCount) {
+ public int handleDeliver(MessageReference reference, ICoreMessage message, int deliveryCount) {
MessageDispatch dispatch;
try {
if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) {
@@ -394,7 +395,7 @@ public class AMQConsumer {
}
}
- public boolean checkForcedConsumer(ServerMessage message) {
+ public boolean checkForcedConsumer(Message message) {
if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
if (next >= 0) {
if (timeout <= 0) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 79004ae..b5d2c86 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
@@ -34,9 +35,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.reader.MessageUtil;
@@ -231,16 +230,17 @@ public class AMQSession implements SessionCallback {
@Override
public int sendMessage(MessageReference reference,
- ServerMessage message,
+ org.apache.activemq.artemis.api.core.Message message,
ServerConsumer consumer,
int deliveryCount) {
AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
- return theConsumer.handleDeliver(reference, message, deliveryCount);
+ // TODO: use encoders and proper conversions here
+ return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
}
@Override
public int sendLargeMessage(MessageReference reference,
- ServerMessage message,
+ org.apache.activemq.artemis.api.core.Message message,
ServerConsumer consumerID,
long bodySize,
int deliveryCount) {
@@ -296,7 +296,7 @@ public class AMQSession implements SessionCallback {
actualDestinations = new ActiveMQDestination[]{destination};
}
- ServerMessage originalCoreMsg = getConverter().inbound(messageSend);
+ org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend);
if (connection.isNoLocal()) {
//Note: advisory messages are dealt with in
@@ -324,7 +324,7 @@ public class AMQSession implements SessionCallback {
for (int i = 0; i < actualDestinations.length; i++) {
ActiveMQDestination dest = actualDestinations[i];
SimpleString address = new SimpleString(dest.getPhysicalName());
- ServerMessage coreMsg = originalCoreMsg.copy();
+ org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy();
coreMsg.setAddress(address);
if (actualDestinations[i].isQueue()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
index 5355c63..c84776b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -18,8 +18,9 @@ package org.apache.activemq.artemis.core.protocol.openwire.util;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -53,9 +54,14 @@ public class OpenWireUtil {
* set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the
* consumer
*/
- public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
- String address = message.getAddress().toString();
+ public static ActiveMQDestination toAMQAddress(Message message, ActiveMQDestination actualDestination) {
+ String address = message.getAddress();
String strippedAddress = address;//.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, "");
+
+ if (address == null) {
+ return actualDestination;
+ }
+
if (actualDestination.isQueue()) {
return new ActiveMQQueue(strippedAddress);
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
index 861c524..d377abd 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.jboss.logging.Messages;
import org.jboss.logging.annotations.Cause;
import org.jboss.logging.annotations.Message;
@@ -71,7 +70,7 @@ public interface ActiveMQStompProtocolMessageBundle {
ActiveMQStompException invalidConnection();
@Message(id = 339011, value = "Error sending message {0}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQStompException errorSendMessage(ServerMessageImpl message, @Cause Exception e);
+ ActiveMQStompException errorSendMessage(org.apache.activemq.artemis.api.core.Message message, @Cause Exception e);
@Message(id = 339012, value = "Error beginning a transaction {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQStompException errorBeginTx(String txID, @Cause Exception e);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index c004a0e..56067f1 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -30,18 +30,18 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@@ -569,7 +569,7 @@ public final class StompConnection implements RemotingConnection {
return valid;
}
- public ServerMessageImpl createServerMessage() {
+ public CoreMessage createServerMessage() {
return manager.createServerMessage();
}
@@ -598,7 +598,7 @@ public final class StompConnection implements RemotingConnection {
}
}
- protected void sendServerMessage(ServerMessageImpl message, String txID) throws ActiveMQStompException {
+ protected void sendServerMessage(ICoreMessage message, String txID) throws ActiveMQStompException {
StompSession stompSession = getSession(txID);
if (stompSession.isNoLocal()) {
@@ -611,7 +611,7 @@ public final class StompConnection implements RemotingConnection {
if (minLargeMessageSize == -1 || (message.getBodyBuffer().writerIndex() < minLargeMessageSize)) {
stompSession.sendInternal(message, false);
} else {
- stompSession.sendInternalLarge(message, false);
+ stompSession.sendInternalLarge((CoreMessage)message, false);
}
} catch (Exception e) {
throw BUNDLE.errorSendMessage(message, e).setHandler(frameHandler);
@@ -726,10 +726,11 @@ public final class StompConnection implements RemotingConnection {
return SERVER_NAME;
}
- public StompFrame createStompMessage(ServerMessage serverMessage,
+ public StompFrame createStompMessage(ICoreMessage serverMessage,
+ ActiveMQBuffer bodyBuffer,
StompSubscription subscription,
int deliveryCount) throws Exception {
- return frameHandler.createMessageFrame(serverMessage, subscription, deliveryCount);
+ return frameHandler.createMessageFrame(serverMessage, bodyBuffer, subscription, deliveryCount);
}
public void addStompEventListener(FrameEventListener listener) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 54339a4..39d2fe9 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -33,15 +33,14 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@@ -109,13 +108,6 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
}
@Override
- public MessageConverter getConverter() {
- return null;
- }
-
- // ProtocolManager implementation --------------------------------
-
- @Override
public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) {
StompConnection conn = new StompConnection(acceptorUsed, connection, this, server.getScheduledPool(), server.getExecutorFactory());
@@ -345,8 +337,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
return validated;
}
- public ServerMessageImpl createServerMessage() {
- return new ServerMessageImpl(server.getStorageManager().generateID(), 512);
+ public CoreMessage createServerMessage() {
+ return new CoreMessage(server.getStorageManager().generateID(), 512);
}
public void commitTransaction(StompConnection connection, String txID) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 1e103e9..797a966 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -25,23 +25,24 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.zip.Inflater;
+import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -127,32 +128,35 @@ public class StompSession implements SessionCallback {
@Override
public int sendMessage(MessageReference ref,
- ServerMessage serverMessage,
+ Message serverMessage,
final ServerConsumer consumer,
int deliveryCount) {
+
+ ICoreMessage coreMessage = serverMessage.toCore();
+
LargeServerMessageImpl largeMessage = null;
- ServerMessage newServerMessage = serverMessage;
+ ICoreMessage newServerMessage = serverMessage.toCore();
try {
StompSubscription subscription = subscriptions.get(consumer.getID());
- StompFrame frame = null;
- if (serverMessage.isLargeMessage()) {
- newServerMessage = serverMessage.copy();
+ StompFrame frame;
+ ActiveMQBuffer buffer;
- largeMessage = (LargeServerMessageImpl) serverMessage;
- BodyEncoder encoder = largeMessage.getBodyEncoder();
+ if (coreMessage.isLargeMessage()) {
+ LargeBodyEncoder encoder = coreMessage.getBodyEncoder();
encoder.open();
int bodySize = (int) encoder.getLargeBodySize();
- //large message doesn't have a body.
- ((ServerMessageImpl) newServerMessage).createBody(bodySize);
- encoder.encode(newServerMessage.getBodyBuffer(), bodySize);
+ buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
+
+ encoder.encode(buffer, bodySize);
encoder.close();
+ } else {
+ buffer = coreMessage.getReadOnlyBodyBuffer();
}
if (serverMessage.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
- //decompress
- ActiveMQBuffer qbuff = newServerMessage.getBodyBuffer();
- int bytesToRead = qbuff.writerIndex() - MessageImpl.BODY_OFFSET;
+ ActiveMQBuffer qbuff = buffer;
+ int bytesToRead = qbuff.readerIndex();
Inflater inflater = new Inflater();
inflater.setInput(ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer()));
@@ -165,9 +169,10 @@ public class StompSession implements SessionCallback {
qbuff.resetReaderIndex();
qbuff.resetWriterIndex();
qbuff.writeBytes(data);
+ buffer = qbuff;
}
- frame = connection.createStompMessage(newServerMessage, subscription, deliveryCount);
+ frame = connection.createStompMessage(newServerMessage, buffer, subscription, deliveryCount);
int length = frame.getEncodedSize();
@@ -219,7 +224,7 @@ public class StompSession implements SessionCallback {
@Override
public int sendLargeMessage(MessageReference ref,
- ServerMessage msg,
+ Message msg,
ServerConsumer consumer,
long bodySize,
int deliveryCount) {
@@ -370,11 +375,11 @@ public class StompSession implements SessionCallback {
this.noLocal = noLocal;
}
- public void sendInternal(ServerMessageImpl message, boolean direct) throws Exception {
+ public void sendInternal(Message message, boolean direct) throws Exception {
session.send(message, direct);
}
- public void sendInternalLarge(ServerMessageImpl message, boolean direct) throws Exception {
+ public void sendInternalLarge(CoreMessage message, boolean direct) throws Exception {
int headerSize = message.getHeadersAndPropertiesEncodeSize();
if (headerSize >= connection.getMinLargeMessageSize()) {
throw BUNDLE.headerTooBig();
@@ -384,7 +389,7 @@ public class StompSession implements SessionCallback {
long id = storageManager.generateID();
LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message);
- byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - MessageImpl.BODY_OFFSET];
+ byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - CoreMessage.BODY_OFFSET];
message.getBodyBuffer().readBytes(bytes);
largeMessage.addBytes(bytes);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
index affab84..7db9d82 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
@@ -24,8 +24,6 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
public class StompUtils {
@@ -37,7 +35,7 @@ public class StompUtils {
// Static --------------------------------------------------------
- public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, ServerMessageImpl msg) throws Exception {
+ public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, Message msg) throws Exception {
Map<String, String> headers = new HashMap<>(frame.getHeadersMap());
String priority = headers.remove(Stomp.Headers.Send.PRIORITY);
@@ -79,7 +77,7 @@ public class StompUtils {
}
}
- public static void copyStandardHeadersFromMessageToFrame(MessageInternal message,
+ public static void copyStandardHeadersFromMessageToFrame(Message message,
StompFrame command,
int deliveryCount) throws Exception {
command.addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(message.getMessageID()));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index f91ba82..3f68c6f 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -20,17 +20,15 @@ import java.nio.charset.StandardCharsets;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp.Headers;
import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
-import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@@ -180,7 +178,7 @@ public abstract class VersionedStompFrameHandler {
long timestamp = System.currentTimeMillis();
- ServerMessageImpl message = connection.createServerMessage();
+ CoreMessage message = connection.createServerMessage();
if (routingType != null) {
message.putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType());
}
@@ -289,7 +287,8 @@ public abstract class VersionedStompFrameHandler {
return response;
}
- public StompFrame createMessageFrame(ServerMessage serverMessage,
+ public StompFrame createMessageFrame(ICoreMessage serverMessage,
+ ActiveMQBuffer bodyBuffer,
StompSubscription subscription,
int deliveryCount) throws Exception {
StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
@@ -298,13 +297,11 @@ public abstract class VersionedStompFrameHandler {
frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
}
- ActiveMQBuffer buffer = serverMessage.getBodyBufferDuplicate();
-
- int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition();
+ ActiveMQBuffer buffer = bodyBuffer != null ? bodyBuffer : serverMessage.getReadOnlyBodyBuffer();
- buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+ int bodyPos = (serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : (serverMessage).getEndOfBodyPosition();
- int size = bodyPos - buffer.readerIndex();
+ int size = buffer.writerIndex();
byte[] data = new byte[size];
@@ -321,7 +318,7 @@ public abstract class VersionedStompFrameHandler {
}
frame.setByteBody(data);
- StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
+ StompUtils.copyStandardHeadersFromMessageToFrame((serverMessage), frame, deliveryCount);
return frame;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
index 6b211d2..77a9225 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
@@ -27,7 +29,6 @@ import org.apache.activemq.artemis.core.protocol.stomp.StompSubscription;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameV11;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@@ -48,10 +49,11 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 {
}
@Override
- public StompFrame createMessageFrame(ServerMessage serverMessage,
+ public StompFrame createMessageFrame(ICoreMessage serverMessage,
+ ActiveMQBuffer bodyBuffer,
StompSubscription subscription,
int deliveryCount) throws Exception {
- StompFrame frame = super.createMessageFrame(serverMessage, subscription, deliveryCount);
+ StompFrame frame = super.createMessageFrame(serverMessage, bodyBuffer, subscription, deliveryCount);
if (!subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
frame.addHeader(Stomp.Headers.Message.ACK, String.valueOf(serverMessage.getMessageID()));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 7881470..30d6668 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -750,10 +750,6 @@ public interface Configuration {
Configuration setLogJournalWriteRate(boolean rate);
- int getJournalPerfBlastPages();
-
- Configuration setJournalPerfBlastPages(int pages);
-
long getServerDumpInterval();
Configuration setServerDumpInterval(long interval);
@@ -766,10 +762,6 @@ public interface Configuration {
Configuration setMemoryMeasureInterval(long memoryMeasureInterval);
- boolean isRunSyncSpeedTest();
-
- Configuration setRunSyncSpeedTest(boolean run);
-
// Paging Properties --------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index f4eda91..329f654 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -193,10 +193,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
protected boolean logJournalWriteRate = ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate();
- protected int journalPerfBlastPages = ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages();
-
- protected boolean runSyncSpeedTest = ActiveMQDefaultConfiguration.isDefaultRunSyncSpeedTest();
-
private WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
private boolean messageCounterEnabled = ActiveMQDefaultConfiguration.isDefaultMessageCounterEnabled();
@@ -854,28 +850,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
}
@Override
- public int getJournalPerfBlastPages() {
- return journalPerfBlastPages;
- }
-
- @Override
- public ConfigurationImpl setJournalPerfBlastPages(final int journalPerfBlastPages) {
- this.journalPerfBlastPages = journalPerfBlastPages;
- return this;
- }
-
- @Override
- public boolean isRunSyncSpeedTest() {
- return runSyncSpeedTest;
- }
-
- @Override
- public ConfigurationImpl setRunSyncSpeedTest(final boolean run) {
- runSyncSpeedTest = run;
- return this;
- }
-
- @Override
public boolean isCreateBindingsDir() {
return createBindingsDir;
}
@@ -1556,7 +1530,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
result = prime * result + journalMaxIO_AIO;
result = prime * result + journalMaxIO_NIO;
result = prime * result + journalMinFiles;
- result = prime * result + journalPerfBlastPages;
result = prime * result + (journalSyncNonTransactional ? 1231 : 1237);
result = prime * result + (journalSyncTransactional ? 1231 : 1237);
result = prime * result + ((journalType == null) ? 0 : journalType.hashCode());
@@ -1580,7 +1553,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
result = prime * result + (persistIDCache ? 1231 : 1237);
result = prime * result + (persistenceEnabled ? 1231 : 1237);
result = prime * result + ((queueConfigurations == null) ? 0 : queueConfigurations.hashCode());
- result = prime * result + (runSyncSpeedTest ? 1231 : 1237);
result = prime * result + scheduledThreadPoolMaxSize;
result = prime * result + (securityEnabled ? 1231 : 1237);
result = prime * result + (populateValidatedUser ? 1231 : 1237);
@@ -1723,8 +1695,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
return false;
if (journalMinFiles != other.journalMinFiles)
return false;
- if (journalPerfBlastPages != other.journalPerfBlastPages)
- return false;
if (journalSyncNonTransactional != other.journalSyncNonTransactional)
return false;
if (journalSyncTransactional != other.journalSyncTransactional)
@@ -1793,8 +1763,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
return false;
} else if (!queueConfigurations.equals(other.queueConfigurations))
return false;
- if (runSyncSpeedTest != other.runSyncSpeedTest)
- return false;
if (scheduledThreadPoolMaxSize != other.scheduledThreadPoolMaxSize)
return false;
if (securityEnabled != other.securityEnabled)
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index cea0598..4055b5c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -548,10 +548,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setLogJournalWriteRate(getBoolean(e, "log-journal-write-rate", ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate()));
- config.setJournalPerfBlastPages(getInteger(e, "perf-blast-pages", ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages(), Validators.MINUS_ONE_OR_GT_ZERO));
-
- config.setRunSyncSpeedTest(getBoolean(e, "run-sync-speed-test", config.isRunSyncSpeedTest()));
-
if (e.hasAttribute("wild-card-routing-enabled")) {
config.setWildcardRoutingEnabled(getBoolean(e, "wild-card-routing-enabled", config.isWildcardRoutingEnabled()));
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
index 41d5e54..3737e19 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
@@ -16,8 +16,8 @@
*/
package org.apache.activemq.artemis.core.filter;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerMessage;
public interface Filter {
@@ -31,7 +31,7 @@ public interface Filter {
*/
String GENERIC_IGNORED_FILTER = "__AMQX=-1";
- boolean match(ServerMessage message);
+ boolean match(Message message);
SimpleString getFilterString();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
index 0a459c9..33a1187 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
@@ -18,11 +18,11 @@ package org.apache.activemq.artemis.core.filter.impl;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.FilterConstants;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.selector.filter.BooleanExpression;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.filter.Filterable;
@@ -103,7 +103,7 @@ public class FilterImpl implements Filter {
}
@Override
- public synchronized boolean match(final ServerMessage message) {
+ public synchronized boolean match(final Message message) {
try {
boolean result = booleanExpression.matches(new FilterableServerMessage(message));
return result;
@@ -148,7 +148,7 @@ public class FilterImpl implements Filter {
// Private --------------------------------------------------------------------------
- private static Object getHeaderFieldValue(final ServerMessage msg, final SimpleString fieldName) {
+ private static Object getHeaderFieldValue(final Message msg, final SimpleString fieldName) {
if (FilterConstants.ACTIVEMQ_USERID.equals(fieldName)) {
if (msg.getUserID() == null) {
// Proton stores JMSMessageID as NATIVE_MESSAGE_ID that is an arbitrary string
@@ -158,7 +158,12 @@ public class FilterImpl implements Filter {
}
}
// It's the stringified (hex) representation of a user id that can be used in a selector expression
- return new SimpleString("ID:" + msg.getUserID());
+ String userID = msg.getUserID().toString();
+ if (userID.startsWith("ID:")) {
+ return SimpleString.toSimpleString(userID);
+ } else {
+ return new SimpleString("ID:" + msg.getUserID());
+ }
} else if (FilterConstants.ACTIVEMQ_PRIORITY.equals(fieldName)) {
return Integer.valueOf(msg.getPriority());
} else if (FilterConstants.ACTIVEMQ_TIMESTAMP.equals(fieldName)) {
@@ -178,9 +183,9 @@ public class FilterImpl implements Filter {
private static class FilterableServerMessage implements Filterable {
- private final ServerMessage message;
+ private final Message message;
- private FilterableServerMessage(ServerMessage message) {
+ private FilterableServerMessage(Message message) {
this.message = message;
}
@@ -191,7 +196,7 @@ public class FilterImpl implements Filter {
result = getHeaderFieldValue(message, new SimpleString(id));
}
if (result == null) {
- result = message.getObjectProperty(new SimpleString(id));
+ result = message.getObjectProperty(id);
}
if (result != null) {
if (result.getClass() == SimpleString.class) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index 09dd702..31e056c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -25,10 +25,12 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -40,9 +42,7 @@ import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.security.SecurityStore;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -282,7 +282,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
return null;
}
});
- ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50);
+ CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
for (String header : headers.keySet()) {
message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 4b84909..5ecea64 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -39,7 +39,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -53,8 +53,6 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -609,7 +607,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
try {
Filter singleMessageFilter = new Filter() {
@Override
- public boolean match(ServerMessage message) {
+ public boolean match(Message message) {
return message.getMessageID() == messageID;
}
@@ -738,7 +736,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
return null;
}
});
- ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50);
+ CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
for (String header : headers.keySet()) {
message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
}
@@ -755,7 +753,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
message.setAddress(queue.getAddress());
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putLong(queue.getID());
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+ message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
postOffice.route(message, true);
return "" + message.getMessageID();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
index ec6848b..098c61c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
@@ -32,10 +32,10 @@ import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
public final class OpenTypeSupport {
@@ -48,8 +48,10 @@ public final class OpenTypeSupport {
public static CompositeData convert(MessageReference ref) throws OpenDataException {
CompositeType ct;
+ ICoreMessage message = ref.getMessage().toCore();
+
Map<String, Object> fields;
- byte type = ref.getMessage().getType();
+ byte type = message.getType();
switch(type) {
case Message.TEXT_TYPE:
@@ -128,7 +130,7 @@ public final class OpenTypeSupport {
public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
Map<String, Object> rc = new HashMap<>();
- Message m = ref.getMessage();
+ ICoreMessage m = ref.getMessage().toCore();
rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
if (m.getUserID() != null) {
rc.put(CompositeDataConstants.USER_ID, "ID:" + m.getUserID().toString());
@@ -143,6 +145,11 @@ public final class OpenTypeSupport {
rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1);
+ ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
+ byte[] bytes = new byte[bodyCopy.readableBytes()];
+ bodyCopy.readBytes(bytes);
+ rc.put(CompositeDataConstants.BODY, bytes);
+
Map<String, Object> propertyMap = m.toPropertyMap();
rc.put(CompositeDataConstants.PROPERTIES, "" + propertyMap);
@@ -264,8 +271,8 @@ public final class OpenTypeSupport {
@Override
public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
Map<String, Object> rc = super.getFields(ref);
- ServerMessage m = ref.getMessage();
- ActiveMQBuffer bodyCopy = m.getBodyBufferDuplicate();
+ ICoreMessage m = ref.getMessage().toCore();
+ ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
byte[] bytes = new byte[bodyCopy.readableBytes()];
bodyCopy.readBytes(bytes);
rc.put(CompositeDataConstants.BODY, bytes);
@@ -285,8 +292,8 @@ public final class OpenTypeSupport {
@Override
public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
Map<String, Object> rc = super.getFields(ref);
- ServerMessage m = ref.getMessage();
- SimpleString text = m.getBodyBuffer().copy().readNullableSimpleString();
+ ICoreMessage m = ref.getMessage().toCore();
+ SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString();
rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : "");
return rc;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
index 9b1e243..0124f09 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
@@ -16,9 +16,9 @@
*/
package org.apache.activemq.artemis.core.paging;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.server.ServerMessage;
/**
* A Paged message.
@@ -28,7 +28,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
*/
public interface PagedMessage extends EncodingSupport {
- ServerMessage getMessage();
+ Message getMessage();
/**
* The queues that were routed during paging
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 5ead1a2..2d4c646 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -20,13 +20,14 @@ import java.io.File;
import java.util.Collection;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessageListener;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -41,7 +42,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
*
* @see PagingManager
*/
-public interface PagingStore extends ActiveMQComponent {
+public interface PagingStore extends ActiveMQComponent, RefCountMessageListener {
SimpleString getAddress();
@@ -90,7 +91,7 @@ public interface PagingStore extends ActiveMQComponent {
* needs to be sent to the journal
* @throws NullPointerException if {@code readLock} is null
*/
- boolean page(ServerMessage message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception;
+ boolean page(Message message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception;
Page createPage(final int page) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 768b43f..823eef4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -20,11 +20,11 @@ import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message;
+
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.jboss.logging.Logger;
@@ -41,7 +41,7 @@ public class PagedReferenceImpl implements PagedReference {
private int persistedCount;
- private int messageEstimate;
+ private int messageEstimate = -1;
private Long consumerId;
@@ -64,7 +64,7 @@ public class PagedReferenceImpl implements PagedReference {
}
@Override
- public ServerMessage getMessage() {
+ public Message getMessage() {
return getPagedMessage().getMessage();
}
@@ -93,12 +93,6 @@ public class PagedReferenceImpl implements PagedReference {
final PagedMessage message,
final PageSubscription subscription) {
this.position = position;
-
- if (message == null) {
- this.messageEstimate = -1;
- } else {
- this.messageEstimate = message.getMessage().getMemoryEstimate();
- }
this.message = new WeakReference<>(message);
this.subscription = subscription;
}
@@ -120,7 +114,7 @@ public class PagedReferenceImpl implements PagedReference {
@Override
public int getMessageMemoryEstimate() {
- if (messageEstimate < 0) {
+ if (messageEstimate <= 0) {
try {
messageEstimate = getMessage().getMemoryEstimate();
} catch (Throwable e) {
@@ -139,7 +133,7 @@ public class PagedReferenceImpl implements PagedReference {
public long getScheduledDeliveryTime() {
if (deliveryTime == null) {
try {
- ServerMessage msg = getMessage();
+ Message msg = getMessage();
if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index c40d20d..ab10eb4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
@@ -50,7 +51,6 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
@@ -772,7 +772,7 @@ final class PageSubscriptionImpl implements PageSubscription {
// Protected -----------------------------------------------------
- private boolean match(final ServerMessage message) {
+ private boolean match(final Message message) {
if (filter == null) {
return true;
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 4993d0c..7d21316 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@@ -132,7 +133,7 @@ public final class Page implements Comparable<Page> {
int messageSize = fileBuffer.readInt();
int oldPos = fileBuffer.readerIndex();
if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == Page.END_BYTE) {
- PagedMessage msg = new PagedMessageImpl();
+ PagedMessage msg = new PagedMessageImpl(storageManager);
msg.decode(fileBuffer);
byte b = fileBuffer.readByte();
if (b != Page.END_BYTE) {
@@ -255,7 +256,7 @@ public final class Page implements Comparable<Page> {
if (messages != null) {
for (PagedMessage msg : messages) {
- if (msg.getMessage().isLargeMessage()) {
+ if (msg.getMessage() instanceof ICoreMessage && (msg.getMessage()).isLargeMessage()) {
LargeServerMessage lmsg = (LargeServerMessage) msg.getMessage();
// Remember, cannot call delete directly here
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
index e40d107..7d43a2e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
@@ -19,12 +19,13 @@ package org.apache.activemq.artemis.core.paging.impl;
import java.util.Arrays;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.utils.DataConstants;
/**
@@ -38,39 +39,37 @@ public class PagedMessageImpl implements PagedMessage {
*/
private byte[] largeMessageLazyData;
- private ServerMessage message;
+ private Message message;
private long[] queueIDs;
private long transactionID = 0;
- public PagedMessageImpl(final ServerMessage message, final long[] queueIDs, final long transactionID) {
+ private volatile StorageManager storageManager;
+
+ public PagedMessageImpl(final Message message, final long[] queueIDs, final long transactionID) {
this(message, queueIDs);
this.transactionID = transactionID;
}
- public PagedMessageImpl(final ServerMessage message, final long[] queueIDs) {
+ public PagedMessageImpl(final Message message, final long[] queueIDs) {
this.queueIDs = queueIDs;
this.message = message;
}
- public PagedMessageImpl() {
+ public PagedMessageImpl(StorageManager storageManager) {
+ this.storageManager = storageManager;
}
@Override
- public ServerMessage getMessage() {
+ public Message getMessage() {
return message;
}
@Override
public void initMessage(StorageManager storage) {
if (largeMessageLazyData != null) {
- LargeServerMessage lgMessage = storage.createLargeMessage();
- ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(largeMessageLazyData);
- lgMessage.decodeHeadersAndProperties(buffer);
- lgMessage.incrementDelayDeletionCount();
- lgMessage.setPaged();
- message = lgMessage;
+ // TODO-now: use the largeMessagePersister
largeMessageLazyData = null;
}
}
@@ -96,15 +95,15 @@ public class PagedMessageImpl implements PagedMessage {
if (isLargeMessage) {
int largeMessageHeaderSize = buffer.readInt();
- largeMessageLazyData = new byte[largeMessageHeaderSize];
-
- buffer.readBytes(largeMessageLazyData);
+ if (storageManager == null) {
+ largeMessageLazyData = new byte[largeMessageHeaderSize];
+ buffer.readBytes(largeMessageLazyData);
+ } else {
+ this.message = storageManager.createLargeMessage();
+ LargeMessagePersister.getInstance().decode(buffer, (LargeServerMessage) message);
+ }
} else {
- buffer.readInt(); // This value is only used on LargeMessages for now
-
- message = new ServerMessageImpl(-1, 50);
-
- message.decode(buffer);
+ this.message = MessagePersister.getInstance().decode(buffer, null);
}
int queueIDsSize = buffer.readInt();
@@ -120,11 +119,16 @@ public class PagedMessageImpl implements PagedMessage {
public void encode(final ActiveMQBuffer buffer) {
buffer.writeLong(transactionID);
- buffer.writeBoolean(message instanceof LargeServerMessage);
+ boolean isLargeMessage = isLargeMessage();
- buffer.writeInt(message.getEncodeSize());
+ buffer.writeBoolean(isLargeMessage);
- message.encode(buffer);
+ if (isLargeMessage) {
+ buffer.writeInt(LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message));
+ LargeMessagePersister.getInstance().encode(buffer, (LargeServerMessage) message);
+ } else {
+ message.getPersister().encode(buffer, message);
+ }
buffer.writeInt(queueIDs.length);
@@ -133,10 +137,19 @@ public class PagedMessageImpl implements PagedMessage {
}
}
+ public boolean isLargeMessage() {
+ return message instanceof ICoreMessage && ((ICoreMessage)message).isLargeMessage();
+ }
+
@Override
public int getEncodeSize() {
- return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize() +
- DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
+ if (isLargeMessage()) {
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message) +
+ DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
+ } else {
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + message.getPersister().getEncodeSize(message) +
+ DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 4e57c85..c300bd1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@@ -54,7 +55,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -699,7 +700,6 @@ public class PagingStoreImpl implements PagingStore {
@Override
public void addSize(final int size) {
-
boolean globalFull = pagingManager.addSize(size).isGlobalFull();
long newSize = sizeInBytes.addAndGet(size);
@@ -747,7 +747,7 @@ public class PagingStoreImpl implements PagingStore {
}
@Override
- public boolean page(ServerMessage message,
+ public boolean page(Message message,
final Transaction tx,
RouteContextList listCtx,
final ReadLock managerLock) throws Exception {
@@ -806,11 +806,12 @@ public class PagingStoreImpl implements PagingStore {
return false;
}
- if (!message.isDurable()) {
- // The address should never be transient when paging (even for non-persistent messages when paging)
- // This will force everything to be persisted
- message.forceAddress(address);
- }
+ message.setAddress(address);
+// if (!message.isDurable()) {
+// // The address should never be transient when paging (even for non-persistent messages when paging)
+// // This will force everything to be persisted
+// message.forceAddress(address);
+// }
final long transactionID = tx == null ? -1 : tx.getID();
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);
@@ -920,6 +921,40 @@ public class PagingStoreImpl implements PagingStore {
}
+ @Override
+ public void durableDown(Message message, int durableCount) {
+ }
+
+ @Override
+ public void durableUp(Message message, int durableCount) {
+ }
+
+ @Override
+ public void nonDurableUp(Message message, int count) {
+ if (count == 1) {
+ this.addSize(message.getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate());
+ } else {
+ this.addSize(MessageReferenceImpl.getMemoryEstimate());
+ }
+ }
+
+ @Override
+ public void nonDurableDown(Message message, int count) {
+ if (count < 0) {
+ // this could happen on paged messages since they are not routed and incrementRefCount is never called
+ return;
+ }
+
+ if (count == 0) {
+ this.addSize(-message.getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate());
+
+ } else {
+ this.addSize(-MessageReferenceImpl.getMemoryEstimate());
+ }
+
+
+ }
+
private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception {
FinishPageMessageOperation pgOper = (FinishPageMessageOperation) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
if (pgOper == null) {