You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/01 17:21:10 UTC
[16/23] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java
new file mode 100644
index 0000000..f7821b9
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core.encode;
+
+import java.nio.ByteBuffer;
+
+public interface MessageBody {
+ Object getBody();
+
+ ByteBuffer getBodyArray();
+
+ BodyType getType();
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
index 900305f..b5d5474 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
@@ -20,18 +20,18 @@ import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
/**
* A ResetLimitWrappedActiveMQBuffer
- * TODO: Move this to commons
*/
public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper {
private final int limit;
- private MessageInternal message;
+ private Message message;
/**
* We need to turn of notifications of body changes on reset on the server side when dealing with AMQP conversions,
@@ -39,17 +39,17 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
*
* @param message
*/
- public void setMessage(MessageInternal message) {
+ public void setMessage(Message message) {
this.message = message;
}
- public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) {
+ public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final Message message) {
// a wrapped inside a wrapper will increase the stack size.
// we fixed this here due to some profiling testing
this(limit, unwrap(buffer.byteBuf()).duplicate(), message);
}
- public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final MessageInternal message) {
+ public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final Message message) {
// a wrapped inside a wrapper will increase the stack size.
// we fixed this here due to some profiling testing
super(buffer);
@@ -67,7 +67,7 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
private void changed() {
if (message != null) {
- message.bodyChanged();
+ message.messageChanged();
}
}
@@ -94,8 +94,6 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
@Override
public void resetReaderIndex() {
- changed();
-
buffer.readerIndex(limit);
}
@@ -256,6 +254,14 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
super.writeBytes(src);
}
+
+ @Override
+ public void writeBytes(final ByteBuf src, final int srcIndex, final int length) {
+ changed();
+
+ super.writeBytes(src, srcIndex, length);
+ }
+
@Override
public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) {
changed();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
index c3cbceb..cbfaf6f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
@@ -59,7 +59,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
@Override
public int getEncodeSize() {
- if (bodyBuffer != null) {
+ if (writableBuffer != null) {
return super.getEncodeSize();
} else {
return DataConstants.SIZE_INT + DataConstants.SIZE_INT + getHeadersAndPropertiesEncodeSize();
@@ -93,7 +93,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
throw new RuntimeException(e.getMessage(), e);
}
- return bodyBuffer;
+ return writableBuffer;
}
@Override
@@ -108,7 +108,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
@Override
public void saveToOutputStream(final OutputStream out) throws ActiveMQException {
- if (bodyBuffer != null) {
+ if (writableBuffer != null) {
// The body was rebuilt on the client, so we need to behave as a regular message on this case
super.saveToOutputStream(out);
} else {
@@ -118,7 +118,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
@Override
public ClientLargeMessageImpl setOutputStream(final OutputStream out) throws ActiveMQException {
- if (bodyBuffer != null) {
+ if (writableBuffer != null) {
super.setOutputStream(out);
} else {
largeMessageController.setOutputStream(out);
@@ -129,7 +129,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
@Override
public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws ActiveMQException {
- if (bodyBuffer != null) {
+ if (writableBuffer != null) {
return super.waitOutputStreamCompletion(timeMilliseconds);
} else {
return largeMessageController.waitCompletion(timeMilliseconds);
@@ -138,7 +138,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
@Override
public void discardBody() {
- if (bodyBuffer != null) {
+ if (writableBuffer != null) {
super.discardBody();
} else {
largeMessageController.discardUnusedPackets();
@@ -146,17 +146,17 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
}
private void checkBuffer() throws ActiveMQException {
- if (bodyBuffer == null) {
+ if (writableBuffer == null) {
long bodySize = this.largeMessageSize + BODY_OFFSET;
if (bodySize > Integer.MAX_VALUE) {
bodySize = Integer.MAX_VALUE;
}
- createBody((int) bodySize);
+ initBuffer((int) bodySize);
- bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
+ writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer.duplicate(), this);
- largeMessageController.saveBuffer(new ActiveMQOutputStream(bodyBuffer));
+ largeMessageController.saveBuffer(new ActiveMQOutputStream(writableBuffer));
}
}
@@ -178,7 +178,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
public void retrieveExistingData(ClientMessageInternal clMessage) {
this.messageID = clMessage.getMessageID();
- this.address = clMessage.getAddress();
+ this.address = clMessage.getAddressSimpleString();
this.setUserID(clMessage.getUserID());
this.setFlowControlSize(clMessage.getFlowControlSize());
this.setDeliveryCount(clMessage.getDeliveryCount());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
index 7bf8eb7..9472b01 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
@@ -28,14 +28,16 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.utils.TypedProperties;
+import org.apache.activemq.artemis.utils.UUID;
/**
* A ClientMessageImpl
*/
-public class ClientMessageImpl extends MessageImpl implements ClientMessageInternal {
+public class ClientMessageImpl extends CoreMessage implements ClientMessageInternal {
// added this constant here so that the client package have no dependency on JMS
public static final SimpleString REPLYTO_HEADER_NAME = MessageUtil.REPLYTO_HEADER_NAME;
@@ -57,6 +59,35 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
public ClientMessageImpl() {
}
+ protected ClientMessageImpl(ClientMessageImpl other) {
+ super(other);
+ }
+
+ @Override
+ public ClientMessageImpl setDurable(boolean durable) {
+ super.setDurable(durable);
+ return this;
+ }
+
+ @Override
+ public ClientMessageImpl setExpiration(long expiration) {
+ super.setExpiration(expiration);
+ return this;
+ }
+
+ @Override
+ public ClientMessageImpl setPriority(byte priority) {
+ super.setPriority(priority);
+ return this;
+ }
+
+ @Override
+ public ClientMessageImpl setUserID(UUID userID) {
+
+ return this;
+ }
+
+
/*
* Construct messages before sending
*/
@@ -66,12 +97,18 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
final long timestamp,
final byte priority,
final int initialMessageBufferSize) {
- super(type, durable, expiration, timestamp, priority, initialMessageBufferSize);
+ this.setType(type).setExpiration(expiration).setTimestamp(timestamp).setDurable(durable).
+ setPriority(priority).initBuffer(initialMessageBufferSize);
}
@Override
- public boolean isServerMessage() {
- return false;
+ public void setAddressTransient(SimpleString address) {
+ this.address = address;
+ }
+
+ @Override
+ public TypedProperties getProperties() {
+ return this.checkProperties();
}
@Override
@@ -108,6 +145,11 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
return this;
}
+
+ @Override
+ public void checkCompletion() throws ActiveMQException {
+ }
+
@Override
public int getFlowControlSize() {
if (flowControlSize < 0) {
@@ -141,7 +183,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
@Override
public String toString() {
- return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]";
+ return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + getProperties().toString() + "]";
}
@Override
@@ -189,7 +231,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
}
@Override
- public BodyEncoder getBodyEncoder() throws ActiveMQException {
+ public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
return new DecodingContext();
}
@@ -307,15 +349,17 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
@Override
public ClientMessageImpl writeBodyBufferBytes(byte[] bytes) {
- return (ClientMessageImpl) super.writeBodyBufferBytes(bytes);
+ getBodyBuffer().writeBytes(bytes);
+ return this;
}
@Override
public ClientMessageImpl writeBodyBufferString(String string) {
- return (ClientMessageImpl) super.writeBodyBufferString(string);
+ getBodyBuffer().writeString(string);
+ return this;
}
- private final class DecodingContext implements BodyEncoder {
+ private final class DecodingContext implements LargeBodyEncoder {
private DecodingContext() {
}
@@ -347,9 +391,15 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
@Override
public int encode(final ActiveMQBuffer bufferOut, final int size) {
byte[] bytes = new byte[size];
- getWholeBuffer().readBytes(bytes);
+ buffer.readBytes(bytes);
bufferOut.writeBytes(bytes, 0, size);
return size;
}
}
+
+ @Override
+ public Message copy() {
+ return new ClientMessageImpl(this);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
index 07d4719..878f799 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.client.impl;
+import java.io.InputStream;
+
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.utils.TypedProperties;
@@ -44,4 +46,7 @@ public interface ClientMessageInternal extends ClientMessage {
void discardBody();
boolean isCompressed();
+
+ InputStream getBodyInputStream();
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index 1dfbe72..ce16011 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -27,8 +27,8 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
import org.apache.activemq.artemis.utils.DeflaterReader;
@@ -217,7 +217,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
session.startCall();
try {
- MessageInternal msgI = (MessageInternal) msg;
+ Message msgI = msg;
ClientProducerCredits theCredits;
@@ -225,8 +225,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
// a note about the second check on the writerIndexSize,
// If it's a server's message, it means this is being done through the bridge or some special consumer on the
// server's on which case we can't' convert the message into large at the servers
- if (sessionContext.supportsLargeMessage() && (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
- msgI.getBodyBuffer().writerIndex() > minLargeMessageSize && !msgI.isServerMessage())) {
+ if (sessionContext.supportsLargeMessage() && (getBodyInputStream(msgI) != null || msgI.isLargeMessage() ||
+ msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)) {
isLarge = true;
} else {
isLarge = false;
@@ -258,7 +258,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
session.workDone();
if (isLarge) {
- largeMessageSend(sendBlocking, msgI, theCredits, handler);
+ largeMessageSend(sendBlocking, (CoreMessage)msgI, theCredits, handler);
} else {
sendRegularMessage(sendingAddress, msgI, sendBlocking, theCredits, handler);
}
@@ -267,8 +267,12 @@ public class ClientProducerImpl implements ClientProducerInternal {
}
}
+ private InputStream getBodyInputStream(Message msgI) {
+ return ((ClientMessageInternal)msgI).getBodyInputStream();
+ }
+
private void sendRegularMessage(final SimpleString sendingAddress,
- final MessageInternal msgI,
+ final Message msgI,
final boolean sendBlocking,
final ClientProducerCredits theCredits,
final SendAcknowledgementHandler handler) throws ActiveMQException {
@@ -301,7 +305,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
* @throws ActiveMQException
*/
private void largeMessageSend(final boolean sendBlocking,
- final MessageInternal msgI,
+ final CoreMessage msgI,
final ClientProducerCredits credits,
SendAcknowledgementHandler handler) throws ActiveMQException {
logger.tracef("largeMessageSend::%s, Blocking=%s", msgI, sendBlocking);
@@ -313,22 +317,22 @@ public class ClientProducerImpl implements ClientProducerInternal {
}
// msg.getBody() could be Null on LargeServerMessage
- if (msgI.getBodyInputStream() == null && msgI.getWholeBuffer() != null) {
- msgI.getWholeBuffer().readerIndex(0);
+ if (getBodyInputStream(msgI) == null && msgI.getBuffer() != null) {
+ msgI.getBuffer().readerIndex(0);
}
InputStream input;
if (msgI.isServerMessage()) {
largeMessageSendServer(sendBlocking, msgI, credits, handler);
- } else if ((input = msgI.getBodyInputStream()) != null) {
+ } else if ((input = getBodyInputStream(msgI)) != null) {
largeMessageSendStreamed(sendBlocking, msgI, input, credits, handler);
} else {
largeMessageSendBuffered(sendBlocking, msgI, credits, handler);
}
}
- private void sendInitialLargeMessageHeader(MessageInternal msgI,
+ private void sendInitialLargeMessageHeader(Message msgI,
ClientProducerCredits credits) throws ActiveMQException {
int creditsUsed = sessionContext.sendInitialChunkOnLargeMessage(msgI);
@@ -348,17 +352,14 @@ public class ClientProducerImpl implements ClientProducerInternal {
* @throws ActiveMQException
*/
private void largeMessageSendServer(final boolean sendBlocking,
- final MessageInternal msgI,
+ final Message msgI,
final ClientProducerCredits credits,
SendAcknowledgementHandler handler) throws ActiveMQException {
sendInitialLargeMessageHeader(msgI, credits);
- BodyEncoder context = msgI.getBodyEncoder();
+ LargeBodyEncoder context = msgI.getBodyEncoder();
final long bodySize = context.getLargeBodySize();
-
- final int reconnectID = sessionContext.getReconnectID();
-
context.open();
try {
@@ -392,7 +393,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
* @throws ActiveMQException
*/
private void largeMessageSendBuffered(final boolean sendBlocking,
- final MessageInternal msgI,
+ final Message msgI,
final ClientProducerCredits credits,
SendAcknowledgementHandler handler) throws ActiveMQException {
msgI.getBodyBuffer().readerIndex(0);
@@ -407,7 +408,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
* @throws ActiveMQException
*/
private void largeMessageSendStreamed(final boolean sendBlocking,
- final MessageInternal msgI,
+ final Message msgI,
final InputStream inputStreamParameter,
final ClientProducerCredits credits,
SendAcknowledgementHandler handler) throws ActiveMQException {
@@ -478,7 +479,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize());
msgI.getBodyBuffer().writeBytes(buff, 0, pos);
- sendRegularMessage(msgI.getAddress(), msgI, sendBlocking, credits, handler);
+ sendRegularMessage(msgI.getAddressSimpleString(), msgI, sendBlocking, credits, handler);
return;
} else {
if (!headerSent) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
index 55f9129..ce652d2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
@@ -513,6 +513,12 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
}
@Override
+ public void writeBytes(ByteBuf src, int srcIndex, int length) {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+
+ @Override
public ByteBuffer toByteBuffer() {
throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index 951aea2..0bb5690 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -863,6 +863,21 @@ public class LargeMessageControllerImpl implements LargeMessageController {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
+ /**
+ * Transfers the specified source buffer's data to this buffer starting at
+ * the current {@code writerIndex} until the source buffer's position
+ * reaches its limit, and increases the {@code writerIndex} by the
+ * number of the transferred bytes.
+ *
+ * @param src The source buffer
+ * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than
+ * {@code this.writableBytes}
+ */
+ @Override
+ public void writeBytes(ByteBuf src, int srcIndex, int length) {
+ throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
+ }
+
public int writeBytes(final InputStream in, final int length) throws IOException {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
deleted file mode 100644
index baafaac..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.message;
-
-import java.nio.ByteBuffer;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-
-/**
- * Class used to encode message body into buffers.
- * <br>
- * Used to send large streams over the wire
- */
-public interface BodyEncoder {
-
- /**
- * This method must not be called directly by ActiveMQ Artemis clients.
- */
- void open() throws ActiveMQException;
-
- /**
- * This method must not be called directly by ActiveMQ Artemis clients.
- */
- void close() throws ActiveMQException;
-
- /**
- * This method must not be called directly by ActiveMQ Artemis clients.
- */
- int encode(ByteBuffer bufferRead) throws ActiveMQException;
-
- /**
- * This method must not be called directly by ActiveMQ Artemis clients.
- */
- int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException;
-
- /**
- * This method must not be called directly by ActiveMQ Artemis clients.
- */
- long getLargeBodySize();
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
new file mode 100644
index 0000000..8b96282
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.message;
+
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+
+/**
+ * Class used to encode message body into buffers.
+ * <br>
+ * Used to send large streams over the wire
+ */
+public interface LargeBodyEncoder {
+
+ /**
+ * This method must not be called directly by ActiveMQ Artemis clients.
+ */
+ void open() throws ActiveMQException;
+
+ /**
+ * This method must not be called directly by ActiveMQ Artemis clients.
+ */
+ void close() throws ActiveMQException;
+
+ /**
+ * This method must not be called directly by ActiveMQ Artemis clients.
+ */
+ int encode(ByteBuffer bufferRead) throws ActiveMQException;
+
+ /**
+ * This method must not be called directly by ActiveMQ Artemis clients.
+ */
+ int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException;
+
+ /**
+ * This method must not be called directly by ActiveMQ Artemis clients.
+ */
+ long getLargeBodySize();
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
new file mode 100644
index 0000000..fd09751
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -0,0 +1,1066 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.message.impl;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.encode.BodyType;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.TypedProperties;
+import org.apache.activemq.artemis.utils.UUID;
+import org.jboss.logging.Logger;
+
+/** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple
+ * consumers */
+public class CoreMessage extends RefCountMessage {
+
+ public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
+
+ // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties
+ // Note, it is only an estimate, it's not possible to be entirely sure with Java
+ // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof
+ // The value is somewhat higher on 64 bit architectures, probably due to different alignment
+ private static final int memoryOffset = 352;
+
+ private volatile int memoryEstimate = -1;
+
+
+ private static final Logger logger = Logger.getLogger(CoreMessage.class);
+
+ // There's an integer with the number of bytes for the body
+ public static final int BODY_OFFSET = DataConstants.SIZE_INT;
+
+ /** That is the encode for the whole message, including properties..
+ it does not include the buffer for the Packet send and receive header on core protocol */
+ protected ByteBuf buffer;
+
+ private volatile boolean validBuffer = false;
+
+ protected volatile ResetLimitWrappedActiveMQBuffer writableBuffer;
+
+ Object body;
+
+ protected int endOfBodyPosition = -1;
+
+ protected int messageIDPosition = -1;
+
+ protected long messageID;
+
+ protected SimpleString address;
+
+ protected byte type;
+
+ protected boolean durable;
+
+ /**
+ * GMT milliseconds at which this message expires. 0 means never expires *
+ */
+ private long expiration;
+
+ protected long timestamp;
+
+ protected byte priority;
+
+ private UUID userID;
+
+ private int propertiesLocation = -1;
+
+ protected volatile TypedProperties properties;
+
+ private Object protocol;
+
+ public CoreMessage() {
+ }
+
+ @Override
+ public CoreMessage setProtocol(Object protocol) {
+ this.protocol = protocol;
+ return this;
+ }
+
+ @Override
+ public Object getProtocol() {
+ return protocol;
+ }
+
+ @Override
+ public Persister<Message> getPersister() {
+ return CoreMessagePersister.getInstance();
+ }
+
+ public CoreMessage initBuffer(final int initialMessageBufferSize) {
+ buffer = ActiveMQBuffers.dynamicBuffer(initialMessageBufferSize).byteBuf();
+
+ // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
+ buffer.writeByte((byte) 0);
+
+ buffer.setIndex(BODY_OFFSET, BODY_OFFSET);
+
+ return this;
+ }
+
+ @Override
+ public void receiveBuffer(ByteBuf buffer) {
+ this.buffer = buffer;
+ this.buffer.retain();
+ decode();
+ this.validBuffer = true;
+ }
+
+ @Override
+ public ActiveMQBuffer getReadOnlyBodyBuffer() {
+ internalWritableBuffer();
+ return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
+ }
+
+ /**
+ *
+ * @param sendBuffer
+ * @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core
+ */
+ @Override
+ public void sendBuffer(ByteBuf sendBuffer, int deliveryCount) {
+ checkEncode();
+ sendBuffer.writeBytes(buffer, 0, buffer.writerIndex());
+ }
+
+ private synchronized void checkEncode() {
+ if (!validBuffer) {
+ encode();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ActiveMQBuffer getBodyBuffer() {
+ // if using the writable buffer, we must parse properties
+ checkProperties();
+
+ internalWritableBuffer();
+
+ return writableBuffer;
+ }
+
+ private void internalWritableBuffer() {
+ if (writableBuffer == null) {
+ writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer.duplicate(), this);
+ if (endOfBodyPosition > 0) {
+ writableBuffer.byteBuf().setIndex(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE + BODY_OFFSET);
+ writableBuffer.resetReaderIndex();
+ }
+ }
+ }
+
+ public int getEndOfBodyPosition() {
+ if (endOfBodyPosition < 0) {
+ endOfBodyPosition = getBodyBuffer().writerIndex();
+ }
+ return endOfBodyPosition;
+ }
+
+
+ public TypedProperties getTypedProperties() {
+ return checkProperties();
+ }
+
+
+ @Override
+ public void messageChanged() {
+ validBuffer = false;
+ }
+
+ protected CoreMessage(CoreMessage other) {
+ this(other, other.properties);
+ }
+
+ public CoreMessage(long id, int bufferSize) {
+ this.initBuffer(bufferSize);
+ this.setMessageID(id);
+ }
+
+ protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
+ this.body = other.body;
+ this.endOfBodyPosition = other.endOfBodyPosition;
+ this.messageID = other.messageID;
+ this.address = other.address;
+ this.type = other.type;
+ this.durable = other.durable;
+ this.expiration = other.expiration;
+ this.timestamp = other.timestamp;
+ this.priority = other.priority;
+ this.userID = other.userID;
+ if (copyProperties != null) {
+ this.properties = new TypedProperties(copyProperties);
+ }
+ if (other.buffer != null) {
+ this.buffer = other.buffer.copy();
+ }
+ }
+
+ @Override
+ public void copyHeadersAndProperties(final Message msg) {
+ messageID = msg.getMessageID();
+ address = msg.getAddressSimpleString();
+ userID = (UUID)msg.getUserID();
+ type = msg.getType();
+ durable = msg.isDurable();
+ expiration = msg.getExpiration();
+ timestamp = msg.getTimestamp();
+ priority = msg.getPriority();
+
+ if (msg instanceof CoreMessage) {
+ properties = ((CoreMessage)msg).getTypedProperties();
+ } else {
+ // TODO-now: copy stuff
+ logger.warn("Must implement copyHeaderAndProperties for other messages");
+ }
+ }
+
+
+ @Override
+ public Message copy() {
+ return new CoreMessage(this);
+ }
+
+ @Override
+ public Message copy(long newID) {
+ return copy().setMessageID(newID);
+ }
+
+ @Override
+ public long getExpiration() {
+ return expiration;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public CoreMessage setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ @Override
+ public long getMessageID() {
+ return messageID;
+ }
+
+ @Override
+ public byte getPriority() {
+ return priority;
+ }
+
+ @Override
+ public UUID getUserID() {
+ return userID;
+ }
+
+ @Override
+ public CoreMessage setUserID(Object uuid) {
+ this.userID = (UUID)uuid;
+ return this;
+ }
+
+ @Override
+ public CoreMessage setMessageID(long messageID) {
+ this.messageID = messageID;
+ if (messageIDPosition >= 0 && validBuffer) {
+ buffer.setLong(messageIDPosition, messageID);
+ }
+ return this;
+ }
+
+ @Override
+ public CoreMessage setAddress(SimpleString address) {
+ if (validBuffer && !address.equals(this.address)) {
+ messageChanged();
+ }
+ this.address = address;
+ return this;
+ }
+
+ @Override
+ public SimpleString getAddressSimpleString() {
+ return address;
+ }
+
+
+ @Override
+ public CoreMessage setExpiration(long expiration) {
+ this.expiration = expiration;
+ return this;
+ }
+
+ @Override
+ public CoreMessage setPriority(byte priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ public CoreMessage setUserID(UUID userID) {
+ this.userID = userID;
+ return this;
+ }
+
+ /**
+ * I am keeping this synchronized as the decode of the Properties is lazy
+ */
+ protected TypedProperties checkProperties() {
+ if (properties == null) {
+ TypedProperties properties = new TypedProperties();
+ if (buffer != null && propertiesLocation >= 0) {
+ properties.decode(buffer.duplicate().readerIndex(propertiesLocation));
+ }
+ this.properties = properties;
+ }
+
+ return this.properties;
+ }
+
+
+ @Override
+ public int getMemoryEstimate() {
+ if (memoryEstimate == -1) {
+ if (buffer == null) {
+ new Exception("It is null").printStackTrace();
+ }
+ if (properties == null) {
+ new Exception("Properties It is null").printStackTrace();
+ }
+ memoryEstimate = memoryOffset +
+ (buffer != null ? buffer.capacity() : 0) +
+ (properties != null ? properties.getMemoryOffset() : 0);
+ }
+
+ return memoryEstimate;
+ }
+
+ @Override
+ public byte getType() {
+ return type;
+ }
+
+ @Override
+ public CoreMessage setType(byte type) {
+ this.type = type;
+ return this;
+ }
+
+ private void decode() {
+ endOfBodyPosition = buffer.readInt();
+
+ buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE);
+
+ decodeHeadersAndProperties(buffer, true);
+ buffer.readerIndex(0);
+
+ internalWritableBuffer();
+ }
+
+
+ public void decodeHeadersAndProperties(final ByteBuf buffer) {
+ decodeHeadersAndProperties(buffer, false);
+ }
+
+ private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) {
+ messageIDPosition = buffer.readerIndex();
+ messageID = buffer.readLong();
+
+ address = SimpleString.readNullableSimpleString(buffer);
+ if (buffer.readByte() == DataConstants.NOT_NULL) {
+ byte[] bytes = new byte[16];
+ buffer.readBytes(bytes);
+ userID = new UUID(UUID.TYPE_TIME_BASED, bytes);
+ } else {
+ userID = null;
+ }
+ type = buffer.readByte();
+ durable = buffer.readBoolean();
+ expiration = buffer.readLong();
+ timestamp = buffer.readLong();
+ priority = buffer.readByte();
+ if (lazyProperties) {
+ properties = null;
+ propertiesLocation = buffer.readerIndex();
+ } else {
+ properties = new TypedProperties();
+ properties.decode(buffer);
+ }
+ }
+
+
+ public synchronized CoreMessage encode() {
+
+ checkProperties();
+
+ if (writableBuffer != null) {
+ // The message encode takes into consideration the PacketImpl which is not part of this encoding
+ // so we always need to take the BUFFER_HEADER_SPACE from packet impl into consideration
+ endOfBodyPosition = writableBuffer.writerIndex() + BUFFER_HEADER_SPACE - 4;
+ } else if (endOfBodyPosition <= 0) {
+ endOfBodyPosition = BUFFER_HEADER_SPACE;
+ }
+
+ buffer.setIndex(0, 0);
+ buffer.writeInt(endOfBodyPosition);
+
+ // The end of body position
+ buffer.writerIndex(endOfBodyPosition - BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+
+ encodeHeadersAndProperties(buffer);
+
+ validBuffer = true;
+
+ return this;
+ }
+
+ public void encodeHeadersAndProperties(final ByteBuf buffer) {
+ checkProperties();
+ messageIDPosition = buffer.writerIndex();
+ buffer.writeLong(messageID);
+ SimpleString.writeNullableSimpleString(buffer, address);
+ if (userID == null) {
+ buffer.writeByte(DataConstants.NULL);
+ } else {
+ buffer.writeByte(DataConstants.NOT_NULL);
+ buffer.writeBytes(userID.asBytes());
+ }
+ buffer.writeByte(type);
+ buffer.writeBoolean(durable);
+ buffer.writeLong(expiration);
+ buffer.writeLong(timestamp);
+ buffer.writeByte(priority);
+ properties.encode(buffer);
+ }
+
+ @Override
+ public Object getBody() {
+
+ if (body == null) {
+ decodeBody();
+ }
+
+ return body;
+ }
+
+ private void decodeBody() {
+ buffer.readerIndex(DataConstants.SIZE_INT);
+ switch (getBodyType()) {
+ case Text:
+ body = SimpleString.readNullableSimpleString(buffer);
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ public int getHeadersAndPropertiesEncodeSize() {
+ return DataConstants.SIZE_LONG + // Message ID
+ DataConstants.SIZE_BYTE + // user id null?
+ (userID == null ? 0 : 16) +
+ /* address */SimpleString.sizeofNullableString(address) +
+ DataConstants./* Type */SIZE_BYTE +
+ DataConstants./* Durable */SIZE_BOOLEAN +
+ DataConstants./* Expiration */SIZE_LONG +
+ DataConstants./* Timestamp */SIZE_LONG +
+ DataConstants./* Priority */SIZE_BYTE +
+ /* PropertySize and Properties */checkProperties().getEncodeSize();
+ }
+
+ @Override
+ public BodyType getBodyType() {
+ return getBodyType(type);
+ }
+
+ public static BodyType getBodyType(byte type) {
+ switch (type) {
+
+ case Message.DEFAULT_TYPE:
+ return BodyType.Undefined;
+
+ case Message.OBJECT_TYPE:
+ return BodyType.Object;
+
+ case Message.TEXT_TYPE:
+ return BodyType.Text;
+
+ case Message.BYTES_TYPE:
+ return BodyType.Text;
+
+ case Message.MAP_TYPE:
+ return BodyType.Map;
+
+ case Message.STREAM_TYPE:
+ return BodyType.Stream;
+
+ default:
+ return BodyType.Undefined;
+
+ }
+ }
+
+ @Override
+ public int getEncodeSize() {
+ checkEncode();
+ return buffer == null ? -1 : buffer.writerIndex();
+ }
+
+ @Override
+ public CoreMessage setBody(final BodyType bodyType, Object body) {
+ messageChanged();
+
+ this.type = Message.TEXT_TYPE;
+ this.body = body;
+
+ return this;
+ }
+
+ @Override
+ public boolean isLargeMessage() {
+ return false;
+ }
+
+ private void encodeBody(ByteBuf intoBuffer) {
+ intoBuffer.writerIndex(DataConstants.SIZE_INT);
+
+ switch (getBodyType()) {
+
+ // TODO-now implement other types
+ case Text:
+ SimpleString.writeNullableSimpleString(intoBuffer, SimpleString.toSimpleString(body == null ? null : body.toString()));
+ break;
+
+ default:
+ break;
+ }
+
+
+ endOfBodyPosition = buffer.writerIndex() + BUFFER_HEADER_SPACE;
+ buffer.setInt(0, endOfBodyPosition);
+ }
+
+ @Override
+ public String getAddress() {
+ if (address == null) {
+ return null;
+ } else {
+ return address.toString();
+ }
+ }
+
+ @Override
+ public CoreMessage setAddress(String address) {
+ messageChanged();
+ this.address = SimpleString.toSimpleString(address);
+ return this;
+ }
+
+ @Override
+ public CoreMessage setBuffer(ByteBuf buffer) {
+ this.buffer = buffer;
+
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public boolean isDurable() {
+ return durable;
+ }
+
+ @Override
+ public CoreMessage setDurable(boolean durable) {
+ messageChanged();
+ this.durable = durable;
+ return this;
+ }
+
+
+ @Override
+ public CoreMessage putBooleanProperty(final String key, final boolean value) {
+ messageChanged();
+ checkProperties();
+ properties.putBooleanProperty(new SimpleString(key), value);
+ return this;
+ }
+
+ @Override
+ public CoreMessage putBooleanProperty(final SimpleString key, final boolean value) {
+ messageChanged();
+ checkProperties();
+ properties.putBooleanProperty(key, value);
+ return this;
+ }
+
+ @Override
+ public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+ checkProperties();
+ return properties.getBooleanProperty(key);
+ }
+
+ @Override
+ public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException {
+ checkProperties();
+ return properties.getBooleanProperty(new SimpleString(key));
+ }
+
+
+ @Override
+ public CoreMessage putByteProperty(final SimpleString key, final byte value) {
+ messageChanged();
+ checkProperties();
+ properties.putByteProperty(key, value);
+ return this;
+ }
+
+
+ @Override
+ public CoreMessage putByteProperty(final String key, final byte value) {
+ messageChanged();
+ checkProperties();
+ properties.putByteProperty(new SimpleString(key), value);
+
+ return this;
+ }
+
+
+ @Override
+ public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+ checkProperties();
+ return properties.getByteProperty(key);
+ }
+
+ @Override
+ public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException {
+ return getByteProperty(SimpleString.toSimpleString(key));
+ }
+
+ @Override
+ public CoreMessage putBytesProperty(final SimpleString key, final byte[] value) {
+ messageChanged();
+ checkProperties();
+ properties.putBytesProperty(key, value);
+
+ return this;
+ }
+
+ @Override
+ public CoreMessage putBytesProperty(final String key, final byte[] value) {
+ messageChanged();
+ checkProperties();
+ properties.putBytesProperty(new SimpleString(key), value);
+ return this;
+ }
+
+
+ @Override
+ public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+ checkProperties();
+ return properties.getBytesProperty(key);
+ }
+
+ @Override
+ public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException {
+ return getBytesProperty(new SimpleString(key));
+ }
+
+ @Override
+ public CoreMessage putCharProperty(SimpleString key, char value) {
+ messageChanged();
+ checkProperties();
+ properties.putCharProperty(key, value);
+ return this;
+ }
+
+ @Override
+ public CoreMessage putCharProperty(String key, char value) {
+ messageChanged();
+ checkProperties();
+ properties.putCharProperty(new SimpleString(key), value);
+ return this;
+ }
+
+ @Override
+ public CoreMessage putShortProperty(final SimpleString key, final short value) {
+ messageChanged();
+ checkProperties();
+ properties.putShortProperty(key, value);
+ return this;
+ }
+
+ @Override
+ public CoreMessage putShortProperty(final String key, final short value) {
+ messageChanged();
+ checkProperties();
+ properties.putShortProperty(new SimpleString(key), value);
+ return this;
+ }
+
+
+ @Override
+ public CoreMessage putIntProperty(final SimpleString key, final int value) {
+ messageChanged();
+ checkProperties();
+ properties.putIntProperty(key, value);
+ return this;
+ }
+
+ @Override
+ public CoreMessage putIntProperty(final String key, final int value) {
+ messageChanged();
+ checkProperties();
+ properties.putIntProperty(new SimpleString(key), value);
+ return this;
+ }
+
+ @Override
+ public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+ checkProperties();
+ return properties.getIntProperty(key);
+ }
+
+ @Override
+ public Integer getIntProperty(final String key) throws ActiveMQPropertyConversionException {
+ return getIntProperty(SimpleString.toSimpleString(key));
+ }
+
+
+ @Override
+ public CoreMessage putLongProperty(final SimpleString key, final long value) {
+ messageChanged();
+ checkProperties();
+ properties.putLongProperty(key, value);
+ return this;
+ }
+
+ @Override
+ public CoreMessage putLongProperty(final String key, final long value) {
+ messageChanged();
+ checkProperties();
+ properties.putLongProperty(new SimpleString(key), value);
+ return this;
+ }
+
+ @Override
+ public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+ checkProperties();
+ return properties.getLongProperty(key);
+ }
+
+ @Override
+ public Long getLongProperty(final String key) throws ActiveMQPropertyConversionException {
+ checkProperties();
+ return getLongProperty(SimpleString.toSimpleString(key));
+ }
+
+
+ @Override
+ public CoreMessage putFloatProperty(final SimpleString key, final float value) {
+ messageChanged();
+ checkProperties();
+ properties.putFloatProperty(key, value);
+ return this;
+ }
+
+ @Override
+ public CoreMessage putFloatProperty(final String key, final float value) {
+ messageChanged();
+ checkProperties();
+ properties.putFloatProperty(new SimpleString(key), value);
+ return this;
+ }
+
+ @Override
+ public CoreMessage putDoubleProperty(final SimpleString key, final double value) {
+ messageChanged();
+ checkProperties();
+ properties.putDoubleProperty(key, value);
+ return this;
+ }
+
+ @Override
+ public CoreMessage putDoubleProperty(final String key, final double value) {
+ messageChanged();
+ checkProperties();
+ properties.putDoubleProperty(new SimpleString(key), value);
+ return this;
+ }
+
+
+ @Override
+ public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+ messageChanged();
+ checkProperties();
+ return properties.getDoubleProperty(key);
+ }
+
+ @Override
+ public Double getDoubleProperty(final String key) throws ActiveMQPropertyConversionException {
+ checkProperties();
+ return getDoubleProperty(SimpleString.toSimpleString(key));
+ }
+
+ @Override
+ public CoreMessage putStringProperty(final SimpleString key, final SimpleString value) {
+ messageChanged();
+ checkProperties();
+ properties.putSimpleStringProperty(key, value);
+ return this;
+ }
+
+ @Override
+ public CoreMessage putStringProperty(final String key, final String value) {
+ messageChanged();
+ checkProperties();
+ properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value));
+ return this;
+ }
+
+ @Override
+ public CoreMessage putObjectProperty(final SimpleString key,
+ final Object value) throws ActiveMQPropertyConversionException {
+ messageChanged();
+ checkProperties();
+ TypedProperties.setObjectProperty(key, value, properties);
+ return this;
+ }
+
+ @Override
+ public Object getObjectProperty(final String key) {
+ checkProperties();
+ return getObjectProperty(SimpleString.toSimpleString(key));
+ }
+
+ @Override
+ public Object getObjectProperty(final SimpleString key) {
+ checkProperties();
+ return properties.getProperty(key);
+ }
+
+ @Override
+ public CoreMessage putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException {
+ messageChanged();
+ putObjectProperty(new SimpleString(key), value);
+ return this;
+ }
+
+ @Override
+ public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+ checkProperties();
+ return properties.getShortProperty(key);
+ }
+
+ @Override
+ public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException {
+ checkProperties();
+ return properties.getShortProperty(new SimpleString(key));
+ }
+
+ @Override
+ public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+ checkProperties();
+ return properties.getFloatProperty(key);
+ }
+
+ @Override
+ public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException {
+ checkProperties();
+ return properties.getFloatProperty(new SimpleString(key));
+ }
+
+ @Override
+ public String getStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+ SimpleString str = getSimpleStringProperty(key);
+
+ if (str == null) {
+ return null;
+ } else {
+ return str.toString();
+ }
+ }
+
+ @Override
+ public String getStringProperty(final String key) throws ActiveMQPropertyConversionException {
+ return getStringProperty(new SimpleString(key));
+ }
+
+ @Override
+ public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+ checkProperties();
+ return properties.getSimpleStringProperty(key);
+ }
+
+ @Override
+ public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException {
+ checkProperties();
+ return properties.getSimpleStringProperty(new SimpleString(key));
+ }
+
+ @Override
+ public Object removeProperty(final SimpleString key) {
+ checkProperties();
+ Object oldValue = properties.removeProperty(key);
+ if (oldValue != null) {
+ messageChanged();
+ }
+ return oldValue;
+ }
+
+ @Override
+ public Object removeProperty(final String key) {
+ messageChanged();
+ checkProperties();
+ Object oldValue = properties.removeProperty(new SimpleString(key));
+ if (oldValue != null) {
+ messageChanged();
+ }
+ return oldValue;
+ }
+
+ @Override
+ public boolean containsProperty(final SimpleString key) {
+ checkProperties();
+ return properties.containsProperty(key);
+ }
+
+ @Override
+ public boolean containsProperty(final String key) {
+ checkProperties();
+ return properties.containsProperty(new SimpleString(key));
+ }
+
+ @Override
+ public Set<SimpleString> getPropertyNames() {
+ checkProperties();
+ return properties.getPropertyNames();
+ }
+
+ @Override
+ public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
+ return new DecodingContext();
+ }
+
+ private final class DecodingContext implements LargeBodyEncoder {
+
+ private int lastPos = 0;
+
+ private DecodingContext() {
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public long getLargeBodySize() {
+ return buffer.writerIndex();
+ }
+
+ @Override
+ public int encode(final ByteBuffer bufferRead) throws ActiveMQException {
+ ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bufferRead);
+ return encode(buffer, bufferRead.capacity());
+ }
+
+ @Override
+ public int encode(final ActiveMQBuffer bufferOut, final int size) {
+ bufferOut.byteBuf().writeBytes(buffer, lastPos, size);
+ lastPos += size;
+ return size;
+ }
+ }
+
+ @Override
+ public int getPersistSize() {
+ checkEncode();
+ return buffer.writerIndex() + DataConstants.SIZE_INT;
+ }
+
+ @Override
+ public void persist(ActiveMQBuffer targetRecord) {
+ targetRecord.writeInt(buffer.writerIndex());
+ targetRecord.writeBytes(buffer, 0, buffer.writerIndex());
+ }
+
+ @Override
+ public void reloadPersistence(ActiveMQBuffer record) {
+ int size = record.readInt();
+ initBuffer(size);
+ buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size);
+ decode();
+
+ }
+
+ @Override
+ public Message toCore() {
+ return this;
+ }
+
+
+
+ @Override
+ public String toString() {
+ try {
+ return "CoreMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() +
+ ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
+ ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
+ } catch (Throwable e) {
+ return "ServerMessage[messageID=" + messageID + "]";
+ }
+ }
+
+
+ private static String toDate(long timestamp) {
+ if (timestamp == 0) {
+ return "0";
+ } else {
+ return new java.util.Date(timestamp).toString();
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
new file mode 100644
index 0000000..ddf39d2
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.message.impl;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class CoreMessagePersister implements Persister<Message> {
+
+ public static CoreMessagePersister theInstance = new CoreMessagePersister();
+
+ public static CoreMessagePersister getInstance() {
+ return theInstance;
+ }
+
+ protected CoreMessagePersister() {
+ }
+
+
+ @Override
+ public int getEncodeSize(Message record) {
+ return DataConstants.SIZE_BYTE + record.getPersistSize() +
+ SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_LONG;
+ }
+
+
+ /** Sub classes must add the first short as the protocol-id */
+ @Override
+ public void encode(ActiveMQBuffer buffer, Message record) {
+ buffer.writeByte((byte)1);
+ buffer.writeLong(record.getMessageID());
+ buffer.writeNullableSimpleString(record.getAddressSimpleString());
+ record.persist(buffer);
+ }
+
+
+ @Override
+ public Message decode(ActiveMQBuffer buffer, Message record) {
+ // the caller must consume the first byte already, as that will be used to decide what persister (protocol) to use
+ long id = buffer.readLong();
+ SimpleString address = buffer.readNullableSimpleString();
+ record = new CoreMessage();
+ record.reloadPersistence(buffer);
+ record.setMessageID(id);
+ record.setAddress(address);
+ return record;
+ }
+}