You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/06 03:58:38 UTC
[20/21] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 80116ed..c7a831b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -16,10 +16,13 @@
*/
package org.apache.activemq.artemis.api.core;
+import java.io.InputStream;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import org.apache.activemq.artemis.utils.UUID;
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.core.persistence.Persister;
/**
* A Message is a routable instance that has a payload.
@@ -48,9 +51,41 @@ import org.apache.activemq.artemis.utils.UUID;
* <p>
* If conversion is not allowed (for example calling {@code getFloatProperty} on a property set a
* {@code boolean}), a {@link ActiveMQPropertyConversionException} will be thrown.
+ *
+ *
+ * User cases that will be covered by Message
+ *
+ * Receiving a buffer:
+ *
+ * Message encode = new CoreMessage(); // or any other implementation
+ * encode.receiveBuffer(buffer);
+ *
+ *
+ * Sending to a buffer:
+ *
+ * Message encode;
+ * size = encode.getEncodeSize();
+ * encode.encodeDirectly(bufferOutput);
+ *
*/
public interface Message {
+ // This is an estimate of how much memory a Message takes up, exclusing body and properties
+ // Note, it is only an estimate, it's not possible to be entirely sure with Java
+ // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof
+ // The value is somewhat higher on 64 bit architectures, probably due to different alignment
+ int memoryOffset = 352;
+
+
+ SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_AMQ_ROUTE_TO");
+
+ SimpleString HDR_SCALEDOWN_TO_IDS = new SimpleString("_AMQ_SCALEDOWN_TO");
+
+ SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_AMQ_ACK_ROUTE_TO");
+
+ // used by the bridges to set duplicates
+ SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_AMQ_BRIDGE_DUP");
+
/**
* the actual time the message was expired.
* * *
@@ -129,6 +164,91 @@ public interface Message {
byte STREAM_TYPE = 6;
+
+ default SimpleString getDeliveryAnnotationPropertyString(SimpleString property) {
+ Object obj = getDeliveryAnnotationProperty(property);
+ if (obj instanceof SimpleString) {
+ return (SimpleString)obj;
+ } else {
+ return SimpleString.toSimpleString(obj.toString());
+ }
+ }
+
+ default void cleanupInternalProperties() {
+ // only on core
+ }
+
+ RoutingType getRouteType();
+
+ boolean containsDeliveryAnnotationProperty(SimpleString property);
+
+ /**
+ * @deprecated do not use this, use through ICoreMessage or ClientMessage
+ */
+ @Deprecated
+ default InputStream getBodyInputStream() {
+ return null;
+ }
+
+ /**
+ * @deprecated do not use this, use through ICoreMessage or ClientMessage
+ */
+ @Deprecated
+ default ActiveMQBuffer getBodyBuffer() {
+ return null;
+ }
+
+ /**
+ * @deprecated do not use this, use through ICoreMessage or ClientMessage
+ */
+ @Deprecated
+ default byte getType() {
+ return (byte)0;
+ }
+
+ /**
+ * @deprecated do not use this, use through ICoreMessage or ClientMessage
+ */
+ @Deprecated
+ default Message setType(byte type) {
+ return this;
+ }
+
+
+ void messageChanged();
+
+ /** Used to calculate what is the delivery time.
+ * Return null if not scheduled. */
+ Long getScheduledDeliveryTime();
+
+ default Message setScheduledDeliveryTime(Long time) {
+ return this;
+ }
+
+ /** Context can be used by the application server to inject extra control, like a protocol specific on the server.
+ * There is only one per Object, use it wisely!
+ *
+ * Note: the intent of this was to replace PageStore reference on Message, but it will be later increased by adidn a ServerPojo
+ * */
+ RefCountMessageListener getContext();
+
+ SimpleString getReplyTo();
+
+ Message setReplyTo(SimpleString address);
+
+ Message setContext(RefCountMessageListener context);
+
+ /** The buffer will belong to this message, until release is called. */
+ Message setBuffer(ByteBuf buffer);
+
+ ByteBuf getBuffer();
+
+ /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
+ Message copy();
+
+ /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
+ Message copy(long newID);
+
/**
* Returns the messageID.
* <br>
@@ -136,39 +256,45 @@ public interface Message {
*/
long getMessageID();
+ Message setMessageID(long id);
+
+ default boolean isLargeMessage() {
+ return false;
+ }
+
/**
- * Returns the userID - this is an optional user specified UUID that can be set to identify the message
- * and will be passed around with the message
- *
- * @return the user id
+ * Returns the expiration time of this message.
*/
- UUID getUserID();
+ long getExpiration();
/**
- * Sets the user ID
+ * Sets the expiration of this message.
*
- * @param userID
+ * @param expiration expiration time
*/
- Message setUserID(UUID userID);
+ Message setExpiration(long expiration);
/**
- * Returns the address this message is sent to.
+ * Returns whether this message is expired or not.
*/
- SimpleString getAddress();
+ default boolean isExpired() {
+ if (getExpiration() == 0) {
+ return false;
+ }
+
+ return System.currentTimeMillis() - getExpiration() >= 0;
+ }
+
/**
- * Sets the address to send this message to.
+ * Returns the userID - this is an optional user specified UUID that can be set to identify the message
+ * and will be passed around with the message
*
- * @param address address to send the message to
+ * @return the user id
*/
- Message setAddress(SimpleString address);
+ Object getUserID();
- /**
- * Returns this message type.
- * <p>
- * See fields {@literal *_TYPE} for possible values.
- */
- byte getType();
+ Message setUserID(Object userID);
/**
* Returns whether this message is durable or not.
@@ -182,36 +308,18 @@ public interface Message {
*/
Message setDurable(boolean durable);
- /**
- * Returns the expiration time of this message.
- */
- long getExpiration();
+ Persister<Message> getPersister();
- /**
- * Returns whether this message is expired or not.
- */
- boolean isExpired();
+ String getAddress();
- /**
- * Sets the expiration of this message.
- *
- * @param expiration expiration time
- */
- Message setExpiration(long expiration);
+ Message setAddress(String address);
+
+ SimpleString getAddressSimpleString();
+
+ Message setAddress(SimpleString address);
- /**
- * Returns the message timestamp.
- * <br>
- * The timestamp corresponds to the time this message
- * was handled by an ActiveMQ Artemis server.
- */
long getTimestamp();
- /**
- * Sets the message timestamp.
- *
- * @param timestamp timestamp
- */
Message setTimestamp(long timestamp);
/**
@@ -230,164 +338,116 @@ public interface Message {
*/
Message setPriority(byte priority);
- /**
- * Returns the size of the <em>encoded</em> message.
- */
- int getEncodeSize();
+ /** Used to receive this message from an encoded medium buffer */
+ void receiveBuffer(ByteBuf buffer);
- /**
- * Returns whether this message is a <em>large message</em> or a regular message.
- */
- boolean isLargeMessage();
+ /** Used to send this message to an encoded medium buffer.
+ * @param buffer the buffer used.
+ * @param deliveryCount Some protocols (AMQP) will have this as part of the message. */
+ void sendBuffer(ByteBuf buffer, int deliveryCount);
- /**
- * Returns the message body as an ActiveMQBuffer
- */
- ActiveMQBuffer getBodyBuffer();
+ int getPersistSize();
- /**
- * Writes the input byte array to the message body ActiveMQBuffer
- */
- Message writeBodyBufferBytes(byte[] bytes);
+ void persist(ActiveMQBuffer targetRecord);
- /**
- * Writes the input String to the message body ActiveMQBuffer
- */
- Message writeBodyBufferString(String string);
+ void reloadPersistence(ActiveMQBuffer record);
- /**
- * Returns a <em>copy</em> of the message body as an ActiveMQBuffer. Any modification
- * of this buffer should not impact the underlying buffer.
- */
- ActiveMQBuffer getBodyBufferDuplicate();
+ default void releaseBuffer() {
+ ByteBuf buffer = getBuffer();
+ if (buffer != null) {
+ buffer.release();
+ }
+ setBuffer(null);
+ }
+ default void referenceOriginalMessage(final Message original, String originalQueue) {
+ String queueOnMessage = original.getStringProperty(Message.HDR_ORIGINAL_QUEUE.toString());
- // Properties
- // -----------------------------------------------------------------
+ if (queueOnMessage != null) {
+ putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), queueOnMessage);
+ } else if (originalQueue != null) {
+ putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), originalQueue);
+ }
- /**
- * Puts a boolean property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putBooleanProperty(SimpleString key, boolean value);
+ if (original.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) {
+ putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString()));
- /**
- * @see #putBooleanProperty(SimpleString, boolean)
- */
- Message putBooleanProperty(String key, boolean value);
+ putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString()));
+ } else {
+ putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getAddress());
- /**
- * Puts a byte property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putByteProperty(SimpleString key, byte value);
+ putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getMessageID());
+ }
- /**
- * @see #putByteProperty(SimpleString, byte)
- */
- Message putByteProperty(String key, byte value);
+ // reset expiry
+ setExpiration(0);
+ }
/**
- * Puts a byte[] property in this message.
- *
- * @param key property name
- * @param value property value
+ * it will translate a property named HDR_DUPLICATE_DETECTION_ID.
+ * @return
*/
- Message putBytesProperty(SimpleString key, byte[] value);
+ default byte[] getDuplicateIDBytes() {
+ Object duplicateID = getDuplicateProperty();
- /**
- * @see #putBytesProperty(SimpleString, byte[])
- */
- Message putBytesProperty(String key, byte[] value);
+ if (duplicateID == null) {
+ return null;
+ } else {
+ if (duplicateID instanceof SimpleString) {
+ return ((SimpleString) duplicateID).getData();
+ } else if (duplicateID instanceof String) {
+ return new SimpleString(duplicateID.toString()).getData();
+ } else {
+ return (byte[]) duplicateID;
+ }
+ }
+ }
/**
- * Puts a short property in this message.
- *
- * @param key property name
- * @param value property value
+ * it will translate a property named HDR_DUPLICATE_DETECTION_ID.
+ * @return
*/
- Message putShortProperty(SimpleString key, short value);
+ default Object getDuplicateProperty() {
+ return getDeliveryAnnotationProperty(Message.HDR_DUPLICATE_DETECTION_ID);
+ }
- /**
- * @see #putShortProperty(SimpleString, short)
- */
- Message putShortProperty(String key, short value);
- /**
- * Puts a char property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putCharProperty(SimpleString key, char value);
+ Message putBooleanProperty(String key, boolean value);
- /**
- * @see #putCharProperty(SimpleString, char)
- */
- Message putCharProperty(String key, char value);
+ Message putByteProperty(String key, byte value);
- /**
- * Puts an int property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putIntProperty(SimpleString key, int value);
+ Message putBytesProperty(String key, byte[] value);
- /**
- * @see #putIntProperty(SimpleString, int)
- */
- Message putIntProperty(String key, int value);
+ Message putShortProperty(String key, short value);
- /**
- * Puts a long property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putLongProperty(SimpleString key, long value);
+ Message putCharProperty(String key, char value);
- /**
- * @see #putLongProperty(SimpleString, long)
- */
- Message putLongProperty(String key, long value);
+ Message putIntProperty(String key, int value);
- /**
- * Puts a float property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putFloatProperty(SimpleString key, float value);
+ Message putLongProperty(String key, long value);
- /**
- * @see #putFloatProperty(SimpleString, float)
- */
Message putFloatProperty(String key, float value);
- /**
- * Puts a double property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putDoubleProperty(SimpleString key, double value);
-
- /**
- * @see #putDoubleProperty(SimpleString, double)
- */
Message putDoubleProperty(String key, double value);
- /**
- * Puts a SimpleString property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putStringProperty(SimpleString key, SimpleString value);
+
+
+ Message putBooleanProperty(SimpleString key, boolean value);
+
+ Message putByteProperty(SimpleString key, byte value);
+
+ Message putBytesProperty(SimpleString key, byte[] value);
+
+ Message putShortProperty(SimpleString key, short value);
+
+ Message putCharProperty(SimpleString key, char value);
+
+ Message putIntProperty(SimpleString key, int value);
+
+ Message putLongProperty(SimpleString key, long value);
+
+ Message putFloatProperty(SimpleString key, float value);
+
+ Message putDoubleProperty(SimpleString key, double value);
/**
* Puts a String property in this message.
@@ -397,202 +457,127 @@ public interface Message {
*/
Message putStringProperty(String key, String value);
- /**
- * Puts an Object property in this message. <br>
- * Accepted types are:
- * <ul>
- * <li>Boolean</li>
- * <li>Byte</li>
- * <li>Short</li>
- * <li>Character</li>
- * <li>Integer</li>
- * <li>Long</li>
- * <li>Float</li>
- * <li>Double</li>
- * <li>String</li>
- * <li>SimpleString</li>
- * </ul>
- * Using any other type will throw a PropertyConversionException.
- *
- * @param key property name
- * @param value property value
- * @throws ActiveMQPropertyConversionException if the value is not one of the accepted property
- * types.
- */
- Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #putObjectProperty(SimpleString, Object)
- */
Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException;
- /**
- * Removes the property corresponding to the specified key.
- *
- * @param key property name
- * @return the value corresponding to the specified key or @{code null}
- */
- Object removeProperty(SimpleString key);
+ Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException;
- /**
- * @see #removeProperty(SimpleString)
- */
Object removeProperty(String key);
- /**
- * Returns {@code true} if this message contains a property with the given key, {@code false} else.
- *
- * @param key property name
- */
- boolean containsProperty(SimpleString key);
-
- /**
- * @see #containsProperty(SimpleString)
- */
boolean containsProperty(String key);
- /**
- * Returns the property corresponding to the specified key as a Boolean.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a Boolean
- */
- Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #getBooleanProperty(SimpleString)
- */
Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key as a Byte.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a Byte
- */
- Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #getByteProperty(SimpleString)
- */
Byte getByteProperty(String key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key as a Double.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a Double
- */
- Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #getDoubleProperty(SimpleString)
- */
Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key as an Integer.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to an Integer
- */
- Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #getIntProperty(SimpleString)
- */
Integer getIntProperty(String key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key as a Long.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a Long
- */
- Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #getLongProperty(SimpleString)
- */
Long getLongProperty(String key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key
- */
+ Object getObjectProperty(String key);
+
+ Short getShortProperty(String key) throws ActiveMQPropertyConversionException;
+
+ Float getFloatProperty(String key) throws ActiveMQPropertyConversionException;
+
+ String getStringProperty(String key) throws ActiveMQPropertyConversionException;
+
+ SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException;
+
+ byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException;
+
+ Object removeProperty(SimpleString key);
+
+ boolean containsProperty(SimpleString key);
+
+ Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+ Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+ Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+ Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+ Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
Object getObjectProperty(SimpleString key);
- /**
- * @see #getBooleanProperty(SimpleString)
- */
- Object getObjectProperty(String key);
+ Object removeDeliveryAnnoationProperty(SimpleString key);
- /**
- * Returns the property corresponding to the specified key as a Short.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a Short
- */
- Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+ Object getDeliveryAnnotationProperty(SimpleString key);
- /**
- * @see #getShortProperty(SimpleString)
- */
- Short getShortProperty(String key) throws ActiveMQPropertyConversionException;
+ Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key as a Float.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a Float
- */
Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException;
- /**
- * @see #getFloatProperty(SimpleString)
- */
- Float getFloatProperty(String key) throws ActiveMQPropertyConversionException;
-
- /**
- * Returns the property corresponding to the specified key as a String.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a String
- */
String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException;
- /**
- * @see #getStringProperty(SimpleString)
- */
- String getStringProperty(String key) throws ActiveMQPropertyConversionException;
-
- /**
- * Returns the property corresponding to the specified key as a SimpleString.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a SimpleString
- */
SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException;
- /**
- * @see #getSimpleStringProperty(SimpleString)
- */
- SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException;
-
- /**
- * Returns the property corresponding to the specified key as a byte[].
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a byte[]
- */
byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+ Message putStringProperty(SimpleString key, SimpleString value);
+
/**
- * @see #getBytesProperty(SimpleString)
+ * Returns the size of the <em>encoded</em> message.
*/
- byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException;
+ int getEncodeSize();
/**
* Returns all the names of the properties for this message.
*/
Set<SimpleString> getPropertyNames();
+
+
+ int getRefCount();
+
+ int incrementRefCount() throws Exception;
+
+ int decrementRefCount() throws Exception;
+
+ int incrementDurableRefCount();
+
+ int decrementDurableRefCount();
+
/**
* @return Returns the message in Map form, useful when encoding to JSON
*/
- Map<String, Object> toMap();
+ default Map<String, Object> toMap() {
+ Map map = toPropertyMap();
+ map.put("messageID", getMessageID());
+ Object userID = getUserID();
+ if (getUserID() != null) {
+ map.put("userID", "ID:" + userID.toString());
+ }
+
+ map.put("address", getAddress());
+ map.put("durable", isDurable());
+ map.put("expiration", getExpiration());
+ map.put("timestamp", getTimestamp());
+ map.put("priority", (int)getPriority());
+
+ return map;
+ }
/**
* @return Returns the message properties in Map form, useful when encoding to JSON
*/
- Map<String, Object> toPropertyMap();
+ default Map<String, Object> toPropertyMap() {
+ Map map = new HashMap<>();
+ for (SimpleString name : getPropertyNames()) {
+ map.put(name.toString(), getObjectProperty(name.toString()));
+ }
+ return map;
+ }
+
+
+ /** This should make you convert your message into Core format. */
+ ICoreMessage toCore();
+
+ int getMemoryEstimate();
+
+
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
new file mode 100644
index 0000000..64dd44d
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class RefCountMessage implements Message {
+
+ private final AtomicInteger durableRefCount = new AtomicInteger();
+
+ private final AtomicInteger refCount = new AtomicInteger();
+
+ private RefCountMessageListener context;
+
+ @Override
+ public Message setContext(RefCountMessageListener context) {
+ this.context = context;
+ return this;
+ }
+
+ @Override
+ public RefCountMessageListener getContext() {
+ return context;
+ }
+
+ @Override
+ public int getRefCount() {
+ return refCount.get();
+ }
+
+ @Override
+ public int incrementRefCount() throws Exception {
+ int count = refCount.incrementAndGet();
+ if (context != null) {
+ context.nonDurableUp(this, count);
+ }
+ return count;
+ }
+
+ @Override
+ public int incrementDurableRefCount() {
+ int count = durableRefCount.incrementAndGet();
+ if (context != null) {
+ context.durableUp(this, count);
+ }
+ return count;
+ }
+
+ @Override
+ public int decrementDurableRefCount() {
+ int count = durableRefCount.decrementAndGet();
+ if (context != null) {
+ context.durableDown(this, count);
+ }
+ return count;
+ }
+
+ @Override
+ public int decrementRefCount() throws Exception {
+ int count = refCount.decrementAndGet();
+ if (context != null) {
+ context.nonDurableDown(this, count);
+ }
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java
new file mode 100644
index 0000000..e68dffd
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core;
+
+/** If {@link Message#getContext()} != null and is implementing this interface.
+ * These methods will be called during refCount operations */
+public interface RefCountMessageListener {
+
+ void durableUp(Message message, int durableCount);
+
+ void durableDown(Message message, int durableCount);
+
+ void nonDurableUp(Message message, int nonDurableCoun);
+
+ void nonDurableDown(Message message, int nonDurableCoun);
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
index e87d365..67f2150 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
@@ -19,14 +19,15 @@ package org.apache.activemq.artemis.api.core.client;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
/**
* A ClientMessage represents a message sent and/or received by ActiveMQ Artemis.
*/
-public interface ClientMessage extends Message {
+public interface ClientMessage extends ICoreMessage {
/**
* Returns the number of times this message was delivered.
@@ -123,135 +124,141 @@ public interface ClientMessage extends Message {
ClientMessage setBodyInputStream(InputStream bodyInputStream);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Return the bodyInputStream for large messages
+ * @return
+ */
+ @Override
+ InputStream getBodyInputStream();
+
+ /**
+ * The buffer to write the body.
+ * @return
+ */
+ @Override
+ ActiveMQBuffer getBodyBuffer();
+
+ /**
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putBooleanProperty(SimpleString key, boolean value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putBooleanProperty(String key, boolean value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putByteProperty(SimpleString key, byte value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putByteProperty(String key, byte value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putBytesProperty(SimpleString key, byte[] value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putBytesProperty(String key, byte[] value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putShortProperty(SimpleString key, short value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putShortProperty(String key, short value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putCharProperty(SimpleString key, char value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putCharProperty(String key, char value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putIntProperty(SimpleString key, int value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putIntProperty(String key, int value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putLongProperty(SimpleString key, long value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putLongProperty(String key, long value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putFloatProperty(SimpleString key, float value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putFloatProperty(String key, float value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putDoubleProperty(SimpleString key, double value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putDoubleProperty(String key, double value);
/**
- * Overridden from {@link Message} to enable fluent API
- */
- @Override
- ClientMessage putStringProperty(SimpleString key, SimpleString value);
-
- /**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
@Override
ClientMessage putStringProperty(String key, String value);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
- @Override
ClientMessage writeBodyBufferBytes(byte[] bytes);
/**
- * Overridden from {@link Message} to enable fluent API
+ * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
*/
- @Override
ClientMessage writeBodyBufferString(String string);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java
new file mode 100644
index 0000000..743583b
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core.encode;
+
+public enum BodyType {
+ Undefined, Bytes, Map, Object, Stream, Text
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java
new file mode 100644
index 0000000..f7821b9
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core.encode;
+
+import java.nio.ByteBuffer;
+
+public interface MessageBody {
+ Object getBody();
+
+ ByteBuffer getBodyArray();
+
+ BodyType getType();
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
index 40211c1..946285d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
@@ -18,9 +18,11 @@ package org.apache.activemq.artemis.api.core.management;
import javax.json.JsonArray;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
/**
* Helper class to use ActiveMQ Artemis Core messages to manage server resources.
@@ -86,7 +88,7 @@ public final class ManagementHelper {
* @param attribute the name of the attribute
* @see ResourceNames
*/
- public static void putAttribute(final Message message, final String resourceName, final String attribute) {
+ public static void putAttribute(final ICoreMessage message, final String resourceName, final String attribute) {
message.putStringProperty(ManagementHelper.HDR_RESOURCE_NAME, new SimpleString(resourceName));
message.putStringProperty(ManagementHelper.HDR_ATTRIBUTE, new SimpleString(attribute));
}
@@ -99,7 +101,7 @@ public final class ManagementHelper {
* @param operationName the name of the operation to invoke on the resource
* @see ResourceNames
*/
- public static void putOperationInvocation(final Message message,
+ public static void putOperationInvocation(final ICoreMessage message,
final String resourceName,
final String operationName) throws Exception {
ManagementHelper.putOperationInvocation(message, resourceName, operationName, (Object[]) null);
@@ -114,7 +116,7 @@ public final class ManagementHelper {
* @param parameters the parameters to use to invoke the server resource
* @see ResourceNames
*/
- public static void putOperationInvocation(final Message message,
+ public static void putOperationInvocation(final ICoreMessage message,
final String resourceName,
final String operationName,
final Object... parameters) throws Exception {
@@ -141,7 +143,7 @@ public final class ManagementHelper {
* Used by ActiveMQ Artemis management service.
*/
public static Object[] retrieveOperationParameters(final Message message) throws Exception {
- SimpleString sstring = message.getBodyBuffer().readNullableSimpleString();
+ SimpleString sstring = message.toCore().getReadOnlyBodyBuffer().readNullableSimpleString();
String jsonString = (sstring == null) ? null : sstring.toString();
if (jsonString != null) {
@@ -170,7 +172,7 @@ public final class ManagementHelper {
/**
* Used by ActiveMQ Artemis management service.
*/
- public static void storeResult(final Message message, final Object result) throws Exception {
+ public static void storeResult(final CoreMessage message, final Object result) throws Exception {
String resultString;
if (result != null) {
@@ -192,7 +194,7 @@ public final class ManagementHelper {
* If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}.
* and the result will be a String corresponding to the server exception.
*/
- public static Object[] getResults(final Message message) throws Exception {
+ public static Object[] getResults(final ICoreMessage message) throws Exception {
SimpleString sstring = message.getBodyBuffer().readNullableSimpleString();
String jsonString = (sstring == null) ? null : sstring.toString();
@@ -210,7 +212,7 @@ public final class ManagementHelper {
* If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}.
* and the result will be a String corresponding to the server exception.
*/
- public static Object getResult(final Message message) throws Exception {
+ public static Object getResult(final ICoreMessage message) throws Exception {
return getResult(message, null);
}
@@ -220,7 +222,7 @@ public final class ManagementHelper {
* If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}.
* and the result will be a String corresponding to the server exception.
*/
- public static Object getResult(final Message message, Class desiredType) throws Exception {
+ public static Object getResult(final ICoreMessage message, Class desiredType) throws Exception {
Object[] res = ManagementHelper.getResults(message);
if (res != null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
index 900305f..b5d5474 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
@@ -20,18 +20,18 @@ import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
/**
* A ResetLimitWrappedActiveMQBuffer
- * TODO: Move this to commons
*/
public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper {
private final int limit;
- private MessageInternal message;
+ private Message message;
/**
* We need to turn of notifications of body changes on reset on the server side when dealing with AMQP conversions,
@@ -39,17 +39,17 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
*
* @param message
*/
- public void setMessage(MessageInternal message) {
+ public void setMessage(Message message) {
this.message = message;
}
- public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) {
+ public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final Message message) {
// a wrapped inside a wrapper will increase the stack size.
// we fixed this here due to some profiling testing
this(limit, unwrap(buffer.byteBuf()).duplicate(), message);
}
- public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final MessageInternal message) {
+ public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final Message message) {
// a wrapped inside a wrapper will increase the stack size.
// we fixed this here due to some profiling testing
super(buffer);
@@ -67,7 +67,7 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
private void changed() {
if (message != null) {
- message.bodyChanged();
+ message.messageChanged();
}
}
@@ -94,8 +94,6 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
@Override
public void resetReaderIndex() {
- changed();
-
buffer.readerIndex(limit);
}
@@ -256,6 +254,14 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
super.writeBytes(src);
}
+
+ @Override
+ public void writeBytes(final ByteBuf src, final int srcIndex, final int length) {
+ changed();
+
+ super.writeBytes(src, srcIndex, length);
+ }
+
@Override
public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) {
changed();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index 2b4ab7e..82af968 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -569,7 +569,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
private void handleRegularMessage(ClientMessageInternal message) {
if (message.getAddress() == null) {
- message.setAddressTransient(queueInfo.getAddress());
+ message.setAddress(queueInfo.getAddress());
}
message.onReceipt(this);
@@ -625,7 +625,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
currentLargeMessageController.setLocal(true);
//sets the packet
- ActiveMQBuffer qbuff = clMessage.getBodyBuffer();
+ ActiveMQBuffer qbuff = clMessage.toCore().getBodyBuffer();
int bytesToRead = qbuff.writerIndex() - qbuff.readerIndex();
final byte[] body = ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
index c3cbceb..cbfaf6f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
@@ -59,7 +59,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
@Override
public int getEncodeSize() {
- if (bodyBuffer != null) {
+ if (writableBuffer != null) {
return super.getEncodeSize();
} else {
return DataConstants.SIZE_INT + DataConstants.SIZE_INT + getHeadersAndPropertiesEncodeSize();
@@ -93,7 +93,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
throw new RuntimeException(e.getMessage(), e);
}
- return bodyBuffer;
+ return writableBuffer;
}
@Override
@@ -108,7 +108,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
@Override
public void saveToOutputStream(final OutputStream out) throws ActiveMQException {
- if (bodyBuffer != null) {
+ if (writableBuffer != null) {
// The body was rebuilt on the client, so we need to behave as a regular message on this case
super.saveToOutputStream(out);
} else {
@@ -118,7 +118,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
@Override
public ClientLargeMessageImpl setOutputStream(final OutputStream out) throws ActiveMQException {
- if (bodyBuffer != null) {
+ if (writableBuffer != null) {
super.setOutputStream(out);
} else {
largeMessageController.setOutputStream(out);
@@ -129,7 +129,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
@Override
public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws ActiveMQException {
- if (bodyBuffer != null) {
+ if (writableBuffer != null) {
return super.waitOutputStreamCompletion(timeMilliseconds);
} else {
return largeMessageController.waitCompletion(timeMilliseconds);
@@ -138,7 +138,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
@Override
public void discardBody() {
- if (bodyBuffer != null) {
+ if (writableBuffer != null) {
super.discardBody();
} else {
largeMessageController.discardUnusedPackets();
@@ -146,17 +146,17 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
}
private void checkBuffer() throws ActiveMQException {
- if (bodyBuffer == null) {
+ if (writableBuffer == null) {
long bodySize = this.largeMessageSize + BODY_OFFSET;
if (bodySize > Integer.MAX_VALUE) {
bodySize = Integer.MAX_VALUE;
}
- createBody((int) bodySize);
+ initBuffer((int) bodySize);
- bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
+ writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer.duplicate(), this);
- largeMessageController.saveBuffer(new ActiveMQOutputStream(bodyBuffer));
+ largeMessageController.saveBuffer(new ActiveMQOutputStream(writableBuffer));
}
}
@@ -178,7 +178,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
public void retrieveExistingData(ClientMessageInternal clMessage) {
this.messageID = clMessage.getMessageID();
- this.address = clMessage.getAddress();
+ this.address = clMessage.getAddressSimpleString();
this.setUserID(clMessage.getUserID());
this.setFlowControlSize(clMessage.getFlowControlSize());
this.setDeliveryCount(clMessage.getDeliveryCount());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
index 7bf8eb7..252ae86 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
@@ -28,14 +28,16 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.utils.TypedProperties;
+import org.apache.activemq.artemis.utils.UUID;
/**
* A ClientMessageImpl
*/
-public class ClientMessageImpl extends MessageImpl implements ClientMessageInternal {
+public class ClientMessageImpl extends CoreMessage implements ClientMessageInternal {
// added this constant here so that the client package have no dependency on JMS
public static final SimpleString REPLYTO_HEADER_NAME = MessageUtil.REPLYTO_HEADER_NAME;
@@ -57,6 +59,35 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
public ClientMessageImpl() {
}
+ protected ClientMessageImpl(ClientMessageImpl other) {
+ super(other);
+ }
+
+ @Override
+ public ClientMessageImpl setDurable(boolean durable) {
+ super.setDurable(durable);
+ return this;
+ }
+
+ @Override
+ public ClientMessageImpl setExpiration(long expiration) {
+ super.setExpiration(expiration);
+ return this;
+ }
+
+ @Override
+ public ClientMessageImpl setPriority(byte priority) {
+ super.setPriority(priority);
+ return this;
+ }
+
+ @Override
+ public ClientMessageImpl setUserID(UUID userID) {
+
+ return this;
+ }
+
+
/*
* Construct messages before sending
*/
@@ -66,12 +97,13 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
final long timestamp,
final byte priority,
final int initialMessageBufferSize) {
- super(type, durable, expiration, timestamp, priority, initialMessageBufferSize);
+ this.setType(type).setExpiration(expiration).setTimestamp(timestamp).setDurable(durable).
+ setPriority(priority).initBuffer(initialMessageBufferSize);
}
@Override
- public boolean isServerMessage() {
- return false;
+ public TypedProperties getProperties() {
+ return this.checkProperties();
}
@Override
@@ -108,6 +140,11 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
return this;
}
+
+ @Override
+ public void checkCompletion() throws ActiveMQException {
+ }
+
@Override
public int getFlowControlSize() {
if (flowControlSize < 0) {
@@ -141,7 +178,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
@Override
public String toString() {
- return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]";
+ return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + getProperties().toString() + "]";
}
@Override
@@ -189,7 +226,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
}
@Override
- public BodyEncoder getBodyEncoder() throws ActiveMQException {
+ public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
return new DecodingContext();
}
@@ -307,15 +344,17 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
@Override
public ClientMessageImpl writeBodyBufferBytes(byte[] bytes) {
- return (ClientMessageImpl) super.writeBodyBufferBytes(bytes);
+ getBodyBuffer().writeBytes(bytes);
+ return this;
}
@Override
public ClientMessageImpl writeBodyBufferString(String string) {
- return (ClientMessageImpl) super.writeBodyBufferString(string);
+ getBodyBuffer().writeString(string);
+ return this;
}
- private final class DecodingContext implements BodyEncoder {
+ private final class DecodingContext implements LargeBodyEncoder {
private DecodingContext() {
}
@@ -347,9 +386,15 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
@Override
public int encode(final ActiveMQBuffer bufferOut, final int size) {
byte[] bytes = new byte[size];
- getWholeBuffer().readBytes(bytes);
+ buffer.readBytes(bytes);
bufferOut.writeBytes(bytes, 0, size);
return size;
}
}
+
+ @Override
+ public Message copy() {
+ return new ClientMessageImpl(this);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
index 07d4719..4b87878 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.client.impl;
-import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.utils.TypedProperties;
@@ -34,8 +33,6 @@ public interface ClientMessageInternal extends ClientMessage {
*/
void setFlowControlSize(int flowControlSize);
- void setAddressTransient(SimpleString address);
-
void onReceipt(ClientConsumerInternal consumer);
/**
@@ -44,4 +41,5 @@ public interface ClientMessageInternal extends ClientMessage {
void discardBody();
boolean isCompressed();
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index 1dfbe72..ce4a8a1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -23,12 +23,12 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
import org.apache.activemq.artemis.utils.DeflaterReader;
@@ -208,7 +208,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
}
private void doSend(SimpleString sendingAddress,
- final Message msg,
+ final Message msgToSend,
final SendAcknowledgementHandler handler,
final boolean forceAsync) throws ActiveMQException {
if (sendingAddress == null) {
@@ -217,7 +217,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
session.startCall();
try {
- MessageInternal msgI = (MessageInternal) msg;
+ // In case we received message from another protocol, we first need to convert it to core as the ClientProducer only understands core
+ ICoreMessage msg = msgToSend.toCore();
ClientProducerCredits theCredits;
@@ -225,8 +226,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
// a note about the second check on the writerIndexSize,
// If it's a server's message, it means this is being done through the bridge or some special consumer on the
// server's on which case we can't' convert the message into large at the servers
- if (sessionContext.supportsLargeMessage() && (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
- msgI.getBodyBuffer().writerIndex() > minLargeMessageSize && !msgI.isServerMessage())) {
+ if (sessionContext.supportsLargeMessage() && (getBodyInputStream(msg) != null || msg.isLargeMessage() ||
+ msg.getBodyBuffer().writerIndex() > minLargeMessageSize)) {
isLarge = true;
} else {
isLarge = false;
@@ -248,27 +249,31 @@ public class ClientProducerImpl implements ClientProducerInternal {
}
if (groupID != null) {
- msgI.putStringProperty(Message.HDR_GROUP_ID, groupID);
+ msg.putStringProperty(Message.HDR_GROUP_ID, groupID);
}
- final boolean sendBlockingConfig = msgI.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
+ final boolean sendBlockingConfig = msg.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
final boolean forceAsyncOverride = handler != null;
final boolean sendBlocking = sendBlockingConfig && !forceAsyncOverride;
session.workDone();
if (isLarge) {
- largeMessageSend(sendBlocking, msgI, theCredits, handler);
+ largeMessageSend(sendBlocking, msg, theCredits, handler);
} else {
- sendRegularMessage(sendingAddress, msgI, sendBlocking, theCredits, handler);
+ sendRegularMessage(sendingAddress, msg, sendBlocking, theCredits, handler);
}
} finally {
session.endCall();
}
}
+ private InputStream getBodyInputStream(ICoreMessage msgI) {
+ return msgI.getBodyInputStream();
+ }
+
private void sendRegularMessage(final SimpleString sendingAddress,
- final MessageInternal msgI,
+ final ICoreMessage msgI,
final boolean sendBlocking,
final ClientProducerCredits theCredits,
final SendAcknowledgementHandler handler) throws ActiveMQException {
@@ -301,7 +306,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
* @throws ActiveMQException
*/
private void largeMessageSend(final boolean sendBlocking,
- final MessageInternal msgI,
+ final ICoreMessage msgI,
final ClientProducerCredits credits,
SendAcknowledgementHandler handler) throws ActiveMQException {
logger.tracef("largeMessageSend::%s, Blocking=%s", msgI, sendBlocking);
@@ -313,22 +318,22 @@ public class ClientProducerImpl implements ClientProducerInternal {
}
// msg.getBody() could be Null on LargeServerMessage
- if (msgI.getBodyInputStream() == null && msgI.getWholeBuffer() != null) {
- msgI.getWholeBuffer().readerIndex(0);
+ if (getBodyInputStream(msgI) == null && msgI.getBuffer() != null) {
+ msgI.getBuffer().readerIndex(0);
}
InputStream input;
if (msgI.isServerMessage()) {
largeMessageSendServer(sendBlocking, msgI, credits, handler);
- } else if ((input = msgI.getBodyInputStream()) != null) {
+ } else if ((input = getBodyInputStream(msgI)) != null) {
largeMessageSendStreamed(sendBlocking, msgI, input, credits, handler);
} else {
largeMessageSendBuffered(sendBlocking, msgI, credits, handler);
}
}
- private void sendInitialLargeMessageHeader(MessageInternal msgI,
+ private void sendInitialLargeMessageHeader(Message msgI,
ClientProducerCredits credits) throws ActiveMQException {
int creditsUsed = sessionContext.sendInitialChunkOnLargeMessage(msgI);
@@ -348,17 +353,14 @@ public class ClientProducerImpl implements ClientProducerInternal {
* @throws ActiveMQException
*/
private void largeMessageSendServer(final boolean sendBlocking,
- final MessageInternal msgI,
+ final ICoreMessage msgI,
final ClientProducerCredits credits,
SendAcknowledgementHandler handler) throws ActiveMQException {
sendInitialLargeMessageHeader(msgI, credits);
- BodyEncoder context = msgI.getBodyEncoder();
+ LargeBodyEncoder context = msgI.getBodyEncoder();
final long bodySize = context.getLargeBodySize();
-
- final int reconnectID = sessionContext.getReconnectID();
-
context.open();
try {
@@ -392,7 +394,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
* @throws ActiveMQException
*/
private void largeMessageSendBuffered(final boolean sendBlocking,
- final MessageInternal msgI,
+ final ICoreMessage msgI,
final ClientProducerCredits credits,
SendAcknowledgementHandler handler) throws ActiveMQException {
msgI.getBodyBuffer().readerIndex(0);
@@ -407,7 +409,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
* @throws ActiveMQException
*/
private void largeMessageSendStreamed(final boolean sendBlocking,
- final MessageInternal msgI,
+ final ICoreMessage msgI,
final InputStream inputStreamParameter,
final ClientProducerCredits credits,
SendAcknowledgementHandler handler) throws ActiveMQException {
@@ -478,7 +480,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize());
msgI.getBodyBuffer().writeBytes(buff, 0, pos);
- sendRegularMessage(msgI.getAddress(), msgI, sendBlocking, credits, handler);
+ sendRegularMessage(msgI.getAddressSimpleString(), msgI, sendBlocking, credits, handler);
return;
} else {
if (!headerSent) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
index 55f9129..ce652d2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
@@ -513,6 +513,12 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
}
@Override
+ public void writeBytes(ByteBuf src, int srcIndex, int length) {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+
+ @Override
public ByteBuffer toByteBuffer() {
throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index 951aea2..0bb5690 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -863,6 +863,21 @@ public class LargeMessageControllerImpl implements LargeMessageController {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
+ /**
+ * Transfers the specified source buffer's data to this buffer starting at
+ * the current {@code writerIndex} until the source buffer's position
+ * reaches its limit, and increases the {@code writerIndex} by the
+ * number of the transferred bytes.
+ *
+ * @param src The source buffer
+ * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than
+ * {@code this.writableBytes}
+ */
+ @Override
+ public void writeBytes(ByteBuf src, int srcIndex, int length) {
+ throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
+ }
+
public int writeBytes(final InputStream in, final int length) throws IOException {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
deleted file mode 100644
index baafaac..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.message;
-
-import java.nio.ByteBuffer;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-
-/**
- * Class used to encode message body into buffers.
- * <br>
- * Used to send large streams over the wire
- */
-public interface BodyEncoder {
-
- /**
- * This method must not be called directly by ActiveMQ Artemis clients.
- */
- void open() throws ActiveMQException;
-
- /**
- * This method must not be called directly by ActiveMQ Artemis clients.
- */
- void close() throws ActiveMQException;
-
- /**
- * This method must not be called directly by ActiveMQ Artemis clients.
- */
- int encode(ByteBuffer bufferRead) throws ActiveMQException;
-
- /**
- * This method must not be called directly by ActiveMQ Artemis clients.
- */
- int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException;
-
- /**
- * This method must not be called directly by ActiveMQ Artemis clients.
- */
- long getLargeBodySize();
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
new file mode 100644
index 0000000..8b96282
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.message;
+
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+
+/**
+ * Class used to encode message body into buffers.
+ * <br>
+ * Used to send large streams over the wire
+ */
+public interface LargeBodyEncoder {
+
+ /**
+ * This method must not be called directly by ActiveMQ Artemis clients.
+ */
+ void open() throws ActiveMQException;
+
+ /**
+ * This method must not be called directly by ActiveMQ Artemis clients.
+ */
+ void close() throws ActiveMQException;
+
+ /**
+ * This method must not be called directly by ActiveMQ Artemis clients.
+ */
+ int encode(ByteBuffer bufferRead) throws ActiveMQException;
+
+ /**
+ * This method must not be called directly by ActiveMQ Artemis clients.
+ */
+ int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException;
+
+ /**
+ * This method must not be called directly by ActiveMQ Artemis clients.
+ */
+ long getLargeBodySize();
+}