You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/02 15:05:53 UTC
[19/29] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
deleted file mode 100644
index f93086c..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
+++ /dev/null
@@ -1,1059 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.message.impl;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
-import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-import org.apache.activemq.artemis.utils.ByteUtil;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.artemis.utils.TypedProperties;
-import org.apache.activemq.artemis.utils.UUID;
-
-/**
- * A concrete implementation of a message
- * <p>
- * All messages handled by ActiveMQ Artemis core are of this type
- */
-public abstract class MessageImpl implements MessageInternal {
-
- public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_AMQ_ROUTE_TO");
-
- public static final SimpleString HDR_SCALEDOWN_TO_IDS = new SimpleString("_AMQ_SCALEDOWN_TO");
-
- public static final SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_AMQ_ACK_ROUTE_TO");
-
- // used by the bridges to set duplicates
- public static final SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_AMQ_BRIDGE_DUP");
-
- public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
-
- public static final int BODY_OFFSET = BUFFER_HEADER_SPACE + DataConstants.SIZE_INT;
-
- protected long messageID;
-
- protected SimpleString address;
-
- protected byte type;
-
- protected boolean durable;
-
- /**
- * GMT milliseconds at which this message expires. 0 means never expires *
- */
- private long expiration;
-
- protected long timestamp;
-
- protected TypedProperties properties;
-
- protected byte priority;
-
- protected volatile ActiveMQBuffer buffer;
-
- protected volatile ResetLimitWrappedActiveMQBuffer bodyBuffer;
-
- protected volatile boolean bufferValid;
-
- private int endOfBodyPosition = -1;
-
- private int endOfMessagePosition;
-
- private UUID userID;
-
- // Constructors --------------------------------------------------
-
- protected MessageImpl() {
- properties = new TypedProperties();
- }
-
- /**
- * overridden by the client message, we need access to the connection so we can create the appropriate ActiveMQBuffer.
- *
- * @param type
- * @param durable
- * @param expiration
- * @param timestamp
- * @param priority
- * @param initialMessageBufferSize
- */
- protected MessageImpl(final byte type,
- final boolean durable,
- final long expiration,
- final long timestamp,
- final byte priority,
- final int initialMessageBufferSize) {
- this();
- this.type = type;
- this.durable = durable;
- this.expiration = expiration;
- this.timestamp = timestamp;
- this.priority = priority;
- createBody(initialMessageBufferSize);
- }
-
- protected MessageImpl(final int initialMessageBufferSize) {
- this();
- createBody(initialMessageBufferSize);
- }
-
- /*
- * Copy constructor
- */
- protected MessageImpl(final MessageImpl other) {
- this(other, other.getProperties());
- }
-
- /*
- * Copy constructor
- */
- protected MessageImpl(final MessageImpl other, TypedProperties properties) {
- messageID = other.getMessageID();
- userID = other.getUserID();
- address = other.getAddress();
- type = other.getType();
- durable = other.isDurable();
- expiration = other.getExpiration();
- timestamp = other.getTimestamp();
- priority = other.getPriority();
- this.properties = new TypedProperties(properties);
-
- // This MUST be synchronized using the monitor on the other message to prevent it running concurrently
- // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
- // many subscriptions and bridging to other nodes in a cluster
- synchronized (other) {
- bufferValid = false;
- endOfBodyPosition = -1;
- endOfMessagePosition = other.endOfMessagePosition;
-
- if (other.buffer != null) {
- // We need to copy the underlying buffer too, since the different messsages thereafter might have different
- // properties set on them, making their encoding different
- buffer = other.buffer.copy(0, other.buffer.capacity());
-
- buffer.setIndex(other.buffer.readerIndex(), buffer.capacity());
-
- bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
-
- bodyBuffer.readerIndex(BODY_OFFSET);
- bodyBuffer.writerIndex(other.getBodyBuffer().writerIndex());
- endOfBodyPosition = other.endOfBodyPosition;
- }
- }
- }
-
- // Message implementation ----------------------------------------
-
- @Override
- public int getEncodeSize() {
- int headersPropsSize = getHeadersAndPropertiesEncodeSize();
-
- int bodyPos = getEndOfBodyPosition();
-
- int bodySize = bodyPos - BUFFER_HEADER_SPACE - DataConstants.SIZE_INT;
-
- return DataConstants.SIZE_INT + bodySize + DataConstants.SIZE_INT + headersPropsSize;
- }
-
- @Override
- public int getHeadersAndPropertiesEncodeSize() {
- return DataConstants.SIZE_LONG + // Message ID
- DataConstants.SIZE_BYTE + // user id null?
- (userID == null ? 0 : 16) +
- /* address */SimpleString.sizeofNullableString(address) +
- DataConstants./* Type */SIZE_BYTE +
- DataConstants./* Durable */SIZE_BOOLEAN +
- DataConstants./* Expiration */SIZE_LONG +
- DataConstants./* Timestamp */SIZE_LONG +
- DataConstants./* Priority */SIZE_BYTE +
- /* PropertySize and Properties */properties.getEncodeSize();
- }
-
- @Override
- public void encodeHeadersAndProperties(final ActiveMQBuffer buffer) {
- buffer.writeLong(messageID);
- buffer.writeNullableSimpleString(address);
- if (userID == null) {
- buffer.writeByte(DataConstants.NULL);
- } else {
- buffer.writeByte(DataConstants.NOT_NULL);
- buffer.writeBytes(userID.asBytes());
- }
- buffer.writeByte(type);
- buffer.writeBoolean(durable);
- buffer.writeLong(expiration);
- buffer.writeLong(timestamp);
- buffer.writeByte(priority);
- properties.encode(buffer);
- }
-
- @Override
- public void decodeHeadersAndProperties(final ActiveMQBuffer buffer) {
- messageID = buffer.readLong();
- address = buffer.readNullableSimpleString();
- if (buffer.readByte() == DataConstants.NOT_NULL) {
- byte[] bytes = new byte[16];
- buffer.readBytes(bytes);
- userID = new UUID(UUID.TYPE_TIME_BASED, bytes);
- } else {
- userID = null;
- }
- type = buffer.readByte();
- durable = buffer.readBoolean();
- expiration = buffer.readLong();
- timestamp = buffer.readLong();
- priority = buffer.readByte();
- properties.decode(buffer);
- }
-
- public void copyHeadersAndProperties(final MessageInternal msg) {
- messageID = msg.getMessageID();
- address = msg.getAddress();
- userID = msg.getUserID();
- type = msg.getType();
- durable = msg.isDurable();
- expiration = msg.getExpiration();
- timestamp = msg.getTimestamp();
- priority = msg.getPriority();
- properties = msg.getTypedProperties();
- }
-
- @Override
- public ActiveMQBuffer getBodyBuffer() {
- if (bodyBuffer == null) {
- bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
- }
-
- return bodyBuffer;
- }
-
- @Override
- public Message writeBodyBufferBytes(byte[] bytes) {
- getBodyBuffer().writeBytes(bytes);
-
- return this;
- }
-
- @Override
- public Message writeBodyBufferString(String string) {
- getBodyBuffer().writeString(string);
-
- return this;
- }
-
- public void checkCompletion() throws ActiveMQException {
- // no op on regular messages
- }
-
- @Override
- public synchronized ActiveMQBuffer getBodyBufferDuplicate() {
-
- // Must copy buffer before sending it
-
- ByteBuf byteBuf = ChannelBufferWrapper.unwrap(getBodyBuffer().byteBuf());
- byteBuf = byteBuf.duplicate();
- byteBuf.readerIndex(getBodyBuffer().readerIndex());
- byteBuf.writerIndex(getBodyBuffer().writerIndex());
-
- return new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, byteBuf, null);
- }
-
- @Override
- public long getMessageID() {
- return messageID;
- }
-
- @Override
- public UUID getUserID() {
- return userID;
- }
-
- @Override
- public MessageImpl setUserID(final UUID userID) {
- this.userID = userID;
- return this;
- }
-
- /**
- * this doesn't need to be synchronized as setAddress is protecting the buffer,
- * not the address
- */
- @Override
- public SimpleString getAddress() {
- return address;
- }
-
- /**
- * The only reason this is synchronized is because of encoding a message versus invalidating the buffer.
- * This synchronization can probably be removed since setAddress is always called from a single thread.
- * However I will keep it as it's harmless and it's been well tested
- */
- @Override
- public Message setAddress(final SimpleString address) {
- // This is protecting the buffer
- synchronized (this) {
- if (this.address != address) {
- this.address = address;
-
- bufferValid = false;
- }
- }
-
- return this;
- }
-
- @Override
- public byte getType() {
- return type;
- }
-
- public void setType(byte type) {
- this.type = type;
- }
-
- @Override
- public boolean isDurable() {
- return durable;
- }
-
- @Override
- public MessageImpl setDurable(final boolean durable) {
- if (this.durable != durable) {
- this.durable = durable;
-
- bufferValid = false;
- }
- return this;
- }
-
- @Override
- public long getExpiration() {
- return expiration;
- }
-
- @Override
- public MessageImpl setExpiration(final long expiration) {
- if (this.expiration != expiration) {
- this.expiration = expiration;
-
- bufferValid = false;
- }
- return this;
- }
-
- @Override
- public long getTimestamp() {
- return timestamp;
- }
-
- @Override
- public MessageImpl setTimestamp(final long timestamp) {
- if (this.timestamp != timestamp) {
- this.timestamp = timestamp;
-
- bufferValid = false;
- }
- return this;
- }
-
- @Override
- public byte getPriority() {
- return priority;
- }
-
- @Override
- public MessageImpl setPriority(final byte priority) {
- if (this.priority != priority) {
- this.priority = priority;
-
- bufferValid = false;
- }
- return this;
- }
-
- @Override
- public boolean isExpired() {
- if (expiration == 0) {
- return false;
- }
-
- return System.currentTimeMillis() - expiration >= 0;
- }
-
- @Override
- public Map<String, Object> toMap() {
- Map<String, Object> map = new HashMap<>();
-
- map.put("messageID", messageID);
- if (userID != null) {
- map.put("userID", "ID:" + userID.toString());
- }
- map.put("address", address.toString());
- map.put("type", type);
- map.put("durable", durable);
- map.put("expiration", expiration);
- map.put("timestamp", timestamp);
- map.put("priority", priority);
- map.putAll(toPropertyMap());
- return map;
- }
-
- @Override
- public Map<String, Object> toPropertyMap() {
- Map<String, Object> map = new HashMap<>();
- for (SimpleString propName : properties.getPropertyNames()) {
- map.put(propName.toString(), properties.getProperty(propName));
- }
- return map;
- }
-
- @Override
- public void decodeFromBuffer(final ActiveMQBuffer buffer) {
-
- this.buffer = copyMessageBuffer(buffer);
-
- decode();
-
- //synchronize indexes
- buffer.setIndex(this.buffer.readerIndex(),this.buffer.writerIndex());
-
- // Setting up the BodyBuffer based on endOfBodyPosition set from decode
- ResetLimitWrappedActiveMQBuffer tmpbodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, this.buffer, null);
- tmpbodyBuffer.readerIndex(BODY_OFFSET);
- tmpbodyBuffer.writerIndex(endOfBodyPosition);
- // only set this after the writer and reader is set,
- // otherwise the buffer would be reset through the listener
- tmpbodyBuffer.setMessage(this);
- this.bodyBuffer = tmpbodyBuffer;
-
- }
-
- private ActiveMQBuffer copyMessageBuffer(ActiveMQBuffer buffer) {
- ActiveMQBuffer copiedBuffer;
-
- ByteBuf newNettyBuffer = Unpooled.buffer( buffer.byteBuf().capacity() );
-
- int read = buffer.byteBuf().readerIndex();
- int writ = buffer.byteBuf().writerIndex();
-
- int readArt = buffer.readerIndex();
- int writArt = buffer.writerIndex();
- buffer.byteBuf().readerIndex( 0 );
-
- buffer.byteBuf().readBytes( newNettyBuffer, 0, buffer.byteBuf().writerIndex() );
- buffer.byteBuf().setIndex( read, writ );
- newNettyBuffer.setIndex( read, writ );
-
- copiedBuffer = new ChannelBufferWrapper( newNettyBuffer );
-
- buffer.setIndex( readArt, writArt );
- copiedBuffer.setIndex( readArt, writArt );
-
- return copiedBuffer;
- }
-
- @Override
- public void bodyChanged() {
- bufferValid = false;
-
- endOfBodyPosition = -1;
- }
-
- @Override
- public int getEndOfMessagePosition() {
- return endOfMessagePosition;
- }
-
- @Override
- public int getEndOfBodyPosition() {
- if (endOfBodyPosition < 0) {
- endOfBodyPosition = getBodyBuffer().writerIndex();
- }
- return endOfBodyPosition;
- }
-
- // Encode to journal or paging
- public void encode(final ActiveMQBuffer buff) {
- encodeToBuffer();
-
- buff.writeBytes(buffer, BUFFER_HEADER_SPACE, endOfMessagePosition - BUFFER_HEADER_SPACE);
- }
-
- // Decode from journal or paging
- public void decode(final ActiveMQBuffer buff) {
- int start = buff.readerIndex();
-
- endOfBodyPosition = buff.readInt();
-
- endOfMessagePosition = buff.getInt(endOfBodyPosition - BUFFER_HEADER_SPACE + start);
-
- int length = endOfMessagePosition - BUFFER_HEADER_SPACE;
-
- buffer.setIndex(0, BUFFER_HEADER_SPACE);
-
- buffer.writeBytes(buff, start, length);
-
- decode();
-
- buff.readerIndex(start + length);
- }
-
- @Override
- public synchronized ActiveMQBuffer getEncodedBuffer() {
- ActiveMQBuffer buff = encodeToBuffer();
- return buff.duplicate();
- }
-
- @Override
- public void setAddressTransient(final SimpleString address) {
- this.address = address;
- }
-
- // Properties
- // ---------------------------------------------------------------------------------------
-
- @Override
- public Message putBooleanProperty(final SimpleString key, final boolean value) {
- properties.putBooleanProperty(key, value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putByteProperty(final SimpleString key, final byte value) {
- properties.putByteProperty(key, value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putBytesProperty(final SimpleString key, final byte[] value) {
- properties.putBytesProperty(key, value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putCharProperty(SimpleString key, char value) {
- properties.putCharProperty(key, value);
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putCharProperty(String key, char value) {
- properties.putCharProperty(new SimpleString(key), value);
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putShortProperty(final SimpleString key, final short value) {
- properties.putShortProperty(key, value);
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putIntProperty(final SimpleString key, final int value) {
- properties.putIntProperty(key, value);
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putLongProperty(final SimpleString key, final long value) {
- properties.putLongProperty(key, value);
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putFloatProperty(final SimpleString key, final float value) {
- properties.putFloatProperty(key, value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putDoubleProperty(final SimpleString key, final double value) {
- properties.putDoubleProperty(key, value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putStringProperty(final SimpleString key, final SimpleString value) {
- properties.putSimpleStringProperty(key, value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putObjectProperty(final SimpleString key,
- final Object value) throws ActiveMQPropertyConversionException {
- TypedProperties.setObjectProperty(key, value, properties);
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException {
- putObjectProperty(new SimpleString(key), value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putBooleanProperty(final String key, final boolean value) {
- properties.putBooleanProperty(new SimpleString(key), value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putByteProperty(final String key, final byte value) {
- properties.putByteProperty(new SimpleString(key), value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putBytesProperty(final String key, final byte[] value) {
- properties.putBytesProperty(new SimpleString(key), value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putShortProperty(final String key, final short value) {
- properties.putShortProperty(new SimpleString(key), value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putIntProperty(final String key, final int value) {
- properties.putIntProperty(new SimpleString(key), value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putLongProperty(final String key, final long value) {
- properties.putLongProperty(new SimpleString(key), value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putFloatProperty(final String key, final float value) {
- properties.putFloatProperty(new SimpleString(key), value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putDoubleProperty(final String key, final double value) {
- properties.putDoubleProperty(new SimpleString(key), value);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Message putStringProperty(final String key, final String value) {
- properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value));
-
- bufferValid = false;
-
- return this;
- }
-
- public Message putTypedProperties(final TypedProperties otherProps) {
- properties.putTypedProperties(otherProps);
-
- bufferValid = false;
-
- return this;
- }
-
- @Override
- public Object getObjectProperty(final SimpleString key) {
- return properties.getProperty(key);
- }
-
- @Override
- public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- return properties.getBooleanProperty(key);
- }
-
- @Override
- public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException {
- return properties.getBooleanProperty(new SimpleString(key));
- }
-
- @Override
- public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- return properties.getByteProperty(key);
- }
-
- @Override
- public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException {
- return properties.getByteProperty(new SimpleString(key));
- }
-
- @Override
- public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- return properties.getBytesProperty(key);
- }
-
- @Override
- public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException {
- return getBytesProperty(new SimpleString(key));
- }
-
- @Override
- public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- return properties.getDoubleProperty(key);
- }
-
- @Override
- public Double getDoubleProperty(final String key) throws ActiveMQPropertyConversionException {
- return properties.getDoubleProperty(new SimpleString(key));
- }
-
- @Override
- public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- return properties.getIntProperty(key);
- }
-
- @Override
- public Integer getIntProperty(final String key) throws ActiveMQPropertyConversionException {
- return properties.getIntProperty(new SimpleString(key));
- }
-
- @Override
- public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- return properties.getLongProperty(key);
- }
-
- @Override
- public Long getLongProperty(final String key) throws ActiveMQPropertyConversionException {
- return properties.getLongProperty(new SimpleString(key));
- }
-
- @Override
- public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- return properties.getShortProperty(key);
- }
-
- @Override
- public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException {
- return properties.getShortProperty(new SimpleString(key));
- }
-
- @Override
- public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- return properties.getFloatProperty(key);
- }
-
- @Override
- public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException {
- return properties.getFloatProperty(new SimpleString(key));
- }
-
- @Override
- public String getStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- SimpleString str = getSimpleStringProperty(key);
-
- if (str == null) {
- return null;
- } else {
- return str.toString();
- }
- }
-
- @Override
- public String getStringProperty(final String key) throws ActiveMQPropertyConversionException {
- return getStringProperty(new SimpleString(key));
- }
-
- @Override
- public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
- return properties.getSimpleStringProperty(key);
- }
-
- @Override
- public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException {
- return properties.getSimpleStringProperty(new SimpleString(key));
- }
-
- @Override
- public Object getObjectProperty(final String key) {
- return properties.getProperty(new SimpleString(key));
- }
-
- @Override
- public Object removeProperty(final SimpleString key) {
- bufferValid = false;
-
- return properties.removeProperty(key);
- }
-
- @Override
- public Object removeProperty(final String key) {
- bufferValid = false;
-
- return properties.removeProperty(new SimpleString(key));
- }
-
- @Override
- public boolean containsProperty(final SimpleString key) {
- return properties.containsProperty(key);
- }
-
- @Override
- public boolean containsProperty(final String key) {
- return properties.containsProperty(new SimpleString(key));
- }
-
- @Override
- public Set<SimpleString> getPropertyNames() {
- return properties.getPropertyNames();
- }
-
- @Override
- public ActiveMQBuffer getWholeBuffer() {
- return buffer;
- }
-
- @Override
- public BodyEncoder getBodyEncoder() throws ActiveMQException {
- return new DecodingContext();
- }
-
- @Override
- public TypedProperties getTypedProperties() {
- return this.properties;
- }
-
- @Override
- public boolean equals(Object other) {
-
- if (this == other) {
- return true;
- }
-
- if (other instanceof MessageImpl) {
- MessageImpl message = (MessageImpl) other;
-
- if (this.getMessageID() == message.getMessageID())
- return true;
- }
-
- return false;
- }
-
- /**
- * Debug Helper!!!!
- *
- * I'm leaving this message here without any callers for a reason:
- * During debugs it's important eventually to identify what's on the bodies, and this method will give you a good idea about them.
- * Add the message.bodyToString() to the Watch variables on the debugger view and this will show up like a charm!!!
- *
- * @return
- */
- public String bodyToString() {
- getEndOfBodyPosition();
- int readerIndex1 = this.buffer.readerIndex();
- buffer.readerIndex(0);
- byte[] buffer1 = new byte[buffer.writerIndex()];
- buffer.readBytes(buffer1);
- buffer.readerIndex(readerIndex1);
-
- byte[] buffer2 = null;
- if (bodyBuffer != null) {
- int readerIndex2 = this.bodyBuffer.readerIndex();
- bodyBuffer.readerIndex(0);
- buffer2 = new byte[bodyBuffer.writerIndex() - bodyBuffer.readerIndex()];
- bodyBuffer.readBytes(buffer2);
- bodyBuffer.readerIndex(readerIndex2);
- return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex=" + buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1);
- } else {
- return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex=" + buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1);
- }
-
- }
-
- @Override
- public int hashCode() {
- return 31 + (int) (messageID ^ (messageID >>> 32));
- }
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- public TypedProperties getProperties() {
- return properties;
- }
-
- // This must be synchronized as it can be called concurrently id the message is being delivered
- // concurrently to
- // many queues - the first caller in this case will actually encode it
- private synchronized ActiveMQBuffer encodeToBuffer() {
- if (!bufferValid) {
- int bodySize = getEndOfBodyPosition();
-
- // write it
- buffer.setInt(BUFFER_HEADER_SPACE, bodySize);
-
- // Position at end of body and skip past the message end position int.
- // check for enough room in the buffer even though it is dynamic
- if ((bodySize + 4) > buffer.capacity()) {
- buffer.setIndex(0, bodySize);
- buffer.writeInt(0);
- } else {
- buffer.setIndex(0, bodySize + DataConstants.SIZE_INT);
- }
-
- encodeHeadersAndProperties(buffer);
-
- // Write end of message position
-
- endOfMessagePosition = buffer.writerIndex();
-
- buffer.setInt(bodySize, endOfMessagePosition);
-
- bufferValid = true;
- }
-
- return buffer;
- }
-
- private void decode() {
- endOfBodyPosition = buffer.getInt(BUFFER_HEADER_SPACE);
-
- buffer.readerIndex(endOfBodyPosition + DataConstants.SIZE_INT);
-
- decodeHeadersAndProperties(buffer);
-
- endOfMessagePosition = buffer.readerIndex();
-
- bufferValid = true;
- }
-
- public void createBody(final int initialMessageBufferSize) {
- buffer = ActiveMQBuffers.dynamicBuffer(initialMessageBufferSize);
-
- // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
- buffer.writeByte((byte) 0);
-
- buffer.setIndex(BODY_OFFSET, BODY_OFFSET);
- }
-
- // Inner classes -------------------------------------------------
-
- private final class DecodingContext implements BodyEncoder {
-
- private int lastPos = 0;
-
- private DecodingContext() {
- }
-
- @Override
- public void open() {
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public long getLargeBodySize() {
- return buffer.writerIndex();
- }
-
- @Override
- public int encode(final ByteBuffer bufferRead) throws ActiveMQException {
- ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bufferRead);
- return encode(buffer, bufferRead.capacity());
- }
-
- @Override
- public int encode(final ActiveMQBuffer bufferOut, final int size) {
- bufferOut.writeBytes(getWholeBuffer(), lastPos, size);
- lastPos += size;
- return size;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java
deleted file mode 100644
index a7b2199..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.message.impl;
-
-import java.io.InputStream;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.utils.TypedProperties;
-
-public interface MessageInternal extends Message {
-
- void decodeFromBuffer(ActiveMQBuffer buffer);
-
- int getEndOfMessagePosition();
-
- int getEndOfBodyPosition();
-
- void bodyChanged();
-
- boolean isServerMessage();
-
- ActiveMQBuffer getEncodedBuffer();
-
- int getHeadersAndPropertiesEncodeSize();
-
- ActiveMQBuffer getWholeBuffer();
-
- void encodeHeadersAndProperties(ActiveMQBuffer buffer);
-
- void decodeHeadersAndProperties(ActiveMQBuffer buffer);
-
- BodyEncoder getBodyEncoder() throws ActiveMQException;
-
- InputStream getBodyInputStream();
-
- void setAddressTransient(SimpleString address);
-
- TypedProperties getTypedProperties();
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index ae1cf71..9975a5b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
@@ -45,7 +46,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
@@ -103,7 +104,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAR
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -422,12 +422,12 @@ public class ActiveMQSessionContext extends SessionContext {
}
@Override
- public int getCreditsOnSendingFull(MessageInternal msgI) {
+ public int getCreditsOnSendingFull(Message msgI) {
return msgI.getEncodeSize();
}
@Override
- public void sendFullMessage(MessageInternal msgI,
+ public void sendFullMessage(Message msgI,
boolean sendBlocking,
SendAcknowledgementHandler handler,
SimpleString defaultAddress) throws ActiveMQException {
@@ -441,16 +441,16 @@ public class ActiveMQSessionContext extends SessionContext {
}
@Override
- public int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws ActiveMQException {
+ public int sendInitialChunkOnLargeMessage(Message msgI) throws ActiveMQException {
SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(msgI);
sessionChannel.send(initialChunk);
- return msgI.getHeadersAndPropertiesEncodeSize();
+ return ((CoreMessage)msgI).getHeadersAndPropertiesEncodeSize();
}
@Override
- public int sendLargeMessageChunk(MessageInternal msgI,
+ public int sendLargeMessageChunk(Message msgI,
long messageBodySize,
boolean sendBlocking,
boolean lastChunk,
@@ -471,7 +471,7 @@ public class ActiveMQSessionContext extends SessionContext {
}
@Override
- public int sendServerLargeMessageChunk(MessageInternal msgI,
+ public int sendServerLargeMessageChunk(Message msgI,
long messageBodySize,
boolean sendBlocking,
boolean lastChunk,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 0f5cdf0..e95227d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -371,6 +371,7 @@ public final class ChannelImpl implements Channel {
if (logger.isTraceEnabled()) {
logger.trace("Sending blocking " + packet);
}
+
connection.getTransportConnection().write(buffer, false, false);
long toWait = connection.getBlockingCallTimeout();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 9025210..08c17e4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -16,8 +16,11 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.DataConstants;
@@ -25,6 +28,7 @@ import org.apache.activemq.artemis.utils.DataConstants;
public class PacketImpl implements Packet {
// Constants -------------------------------------------------------------------------
+
public static final int ADDRESSING_CHANGE_VERSION = 129;
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
@@ -310,7 +314,7 @@ public class PacketImpl implements Packet {
@Override
public ActiveMQBuffer encode(final RemotingConnection connection, boolean usePooled) {
- ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE, usePooled);
+ ActiveMQBuffer buffer = createPacket(connection, usePooled);
// The standard header fields
@@ -330,6 +334,14 @@ public class PacketImpl implements Packet {
return buffer;
}
+ protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) {
+ if (connection == null) {
+ return new ChannelBufferWrapper(Unpooled.buffer(INITIAL_PACKET_SIZE));
+ } else {
+ return connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE, usePooled);
+ }
+ }
+
@Override
public void decode(final ActiveMQBuffer buffer) {
channelID = buffer.readLong();
@@ -339,6 +351,22 @@ public class PacketImpl implements Packet {
size = buffer.readerIndex();
}
+ protected ByteBuf copyMessageBuffer(ByteBuf buffer, int skipBytes) {
+
+ ByteBuf newNettyBuffer = Unpooled.buffer(buffer.capacity() - PACKET_HEADERS_SIZE - skipBytes);
+
+ int read = buffer.readerIndex();
+ int writ = buffer.writerIndex();
+ buffer.readerIndex(PACKET_HEADERS_SIZE);
+
+ newNettyBuffer.writeBytes(buffer, buffer.readableBytes() - skipBytes);
+ buffer.setIndex( read, writ );
+ newNettyBuffer.setIndex( 0, writ - PACKET_HEADERS_SIZE - skipBytes);
+
+ return newNettyBuffer;
+ }
+
+
@Override
public int getPacketSize() {
if (size == -1) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index 8bd62ca..cada061 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -353,6 +353,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
}
dataReceived = true;
+
doBufferReceived(packet);
super.bufferReceived(connectionID, buffer);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
index 6a52a27..ec2520a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
@@ -16,15 +16,19 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public abstract class MessagePacket extends PacketImpl implements MessagePacketI {
- protected MessageInternal message;
+ protected Message message;
- public MessagePacket(final byte type, final MessageInternal message) {
+ public MessagePacket(final byte type, final Message message) {
super(type);
this.message = message;
@@ -40,4 +44,12 @@ public abstract class MessagePacket extends PacketImpl implements MessagePacketI
return super.getParentString() + ", message=" + message;
}
+ protected ActiveMQBuffer internalCreatePacket(int size, RemotingConnection connection, boolean usePooled) {
+ if (connection == null) {
+ return new ChannelBufferWrapper(Unpooled.buffer(size));
+ } else {
+ return connection.createTransportBuffer(size, usePooled);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java
index 66e509c..e9e3138 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java
@@ -17,12 +17,13 @@
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
public class SessionReceiveClientLargeMessage extends SessionReceiveLargeMessage {
- public SessionReceiveClientLargeMessage(MessageInternal message) {
+ public SessionReceiveClientLargeMessage(Message message) {
super(message);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
index 64f96f9..dc2c458 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
@@ -18,12 +18,12 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class SessionReceiveLargeMessage extends PacketImpl implements MessagePacketI {
- private final MessageInternal message;
+ private final Message message;
/**
* Since we receive the message before the entire message was received,
@@ -35,13 +35,13 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac
private int deliveryCount;
// To be used on decoding at the client while receiving a large message
- public SessionReceiveLargeMessage(final MessageInternal message) {
+ public SessionReceiveLargeMessage(final Message message) {
super(SESS_RECEIVE_LARGE_MSG);
this.message = message;
}
public SessionReceiveLargeMessage(final long consumerID,
- final MessageInternal message,
+ final Message message,
final long largeMessageSize,
final int deliveryCount) {
super(SESS_RECEIVE_LARGE_MSG);
@@ -55,7 +55,7 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac
this.largeMessageSize = largeMessageSize;
}
- public MessageInternal getLargeMessage() {
+ public Message getLargeMessage() {
return message;
}
@@ -85,7 +85,7 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac
buffer.writeInt(deliveryCount);
buffer.writeLong(largeMessageSize);
if (message != null) {
- message.encodeHeadersAndProperties(buffer);
+ ((CoreMessage)message).encodeHeadersAndProperties(buffer.byteBuf());
}
}
@@ -94,7 +94,7 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac
consumerID = buffer.readLong();
deliveryCount = buffer.readInt();
largeMessageSize = buffer.readLong();
- message.decodeHeadersAndProperties(buffer);
+ ((CoreMessage)message).decodeHeadersAndProperties(buffer.byteBuf());
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
index c21ebda..c03d3c8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
@@ -17,7 +17,8 @@
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.DataConstants;
@@ -30,7 +31,7 @@ public class SessionReceiveMessage extends MessagePacket {
private int deliveryCount;
- public SessionReceiveMessage(final long consumerID, final MessageInternal message, final int deliveryCount) {
+ public SessionReceiveMessage(final long consumerID, final Message message, final int deliveryCount) {
super(SESS_RECEIVE_MSG, message);
this.consumerID = consumerID;
@@ -38,7 +39,7 @@ public class SessionReceiveMessage extends MessagePacket {
this.deliveryCount = deliveryCount;
}
- public SessionReceiveMessage(final MessageInternal message) {
+ public SessionReceiveMessage(final Message message) {
super(SESS_RECEIVE_MSG, message);
}
@@ -53,53 +54,28 @@ public class SessionReceiveMessage extends MessagePacket {
}
@Override
- public ActiveMQBuffer encode(final RemotingConnection connection) {
- ActiveMQBuffer buffer = message.getEncodedBuffer();
-
- ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, true);
- bufferWrite.writeBytes(buffer, 0, buffer.capacity());
- bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
-
- // Sanity check
- if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) {
- throw new IllegalStateException("Wrong encode position");
- }
-
- bufferWrite.writeLong(consumerID);
- bufferWrite.writeInt(deliveryCount);
-
- size = bufferWrite.writerIndex();
-
- // Write standard headers
-
- int len = size - DataConstants.SIZE_INT;
- bufferWrite.setInt(0, len);
- bufferWrite.setByte(DataConstants.SIZE_INT, getType());
- bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
-
- // Position reader for reading by Netty
- bufferWrite.setIndex(0, size);
-
- return bufferWrite;
+ protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) {
+ return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, connection, usePooled);
}
@Override
- public void decode(final ActiveMQBuffer buffer) {
- channelID = buffer.readLong();
-
- message.decodeFromBuffer(buffer);
-
- consumerID = buffer.readLong();
+ public void encodeRest(ActiveMQBuffer buffer) {
+ message.sendBuffer(buffer.byteBuf(), deliveryCount);
+ buffer.writeLong(consumerID);
+ buffer.writeInt(deliveryCount);
+ }
- deliveryCount = buffer.readInt();
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ // Buffer comes in after having read standard headers and positioned at Beginning of body part
- size = buffer.readerIndex();
+ message.receiveBuffer(copyMessageBuffer(buffer.byteBuf(), DataConstants.SIZE_LONG + DataConstants.SIZE_INT));
- // Need to position buffer for reading
+ buffer.readerIndex(buffer.capacity() - DataConstants.SIZE_LONG - DataConstants.SIZE_INT);
+ this.consumerID = buffer.readLong();
+ this.deliveryCount = buffer.readInt();
- buffer.setIndex(PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getEndOfBodyPosition());
}
-
@Override
public int hashCode() {
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
index b4ec027..0ecfe33 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
@@ -17,8 +17,8 @@
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
/**
* A SessionSendContinuationMessage<br>
@@ -28,7 +28,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
private boolean requiresResponse;
// Used on confirmation handling
- private MessageInternal message;
+ private Message message;
/**
* In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession}
* <br>
@@ -58,7 +58,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final MessageInternal message,
+ public SessionSendContinuationMessage(final Message message,
final byte[] body,
final boolean continues,
final boolean requiresResponse,
@@ -87,7 +87,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
/**
* @return the message
*/
- public MessageInternal getMessage() {
+ public Message getMessage() {
return message;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
index bf4290b..869940c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class SessionSendLargeMessage extends PacketImpl implements MessagePacketI {
@@ -26,13 +26,13 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket
/**
* Used only if largeMessage
*/
- private final MessageInternal largeMessage;
+ private final Message largeMessage;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionSendLargeMessage(final MessageInternal largeMessage) {
+ public SessionSendLargeMessage(final Message largeMessage) {
super(SESS_SEND_LARGE);
this.largeMessage = largeMessage;
@@ -40,7 +40,7 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket
// Public --------------------------------------------------------
- public MessageInternal getLargeMessage() {
+ public Message getLargeMessage() {
return largeMessage;
}
@@ -51,12 +51,12 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
- largeMessage.encodeHeadersAndProperties(buffer);
+ ((CoreMessage)largeMessage).encodeHeadersAndProperties(buffer.byteBuf());
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
- largeMessage.decodeHeadersAndProperties(buffer);
+ ((CoreMessage)largeMessage).decodeHeadersAndProperties(buffer.byteBuf());
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
index c7bb30e..8182b90 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -16,11 +16,12 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.utils.DataConstants;
public class SessionSendMessage extends MessagePacket {
@@ -36,7 +37,8 @@ public class SessionSendMessage extends MessagePacket {
*/
private final transient SendAcknowledgementHandler handler;
- public SessionSendMessage(final MessageInternal message,
+ /** This will be using the CoreMessage because it is meant for the core-protocol */
+ public SessionSendMessage(final Message message,
final boolean requiresResponse,
final SendAcknowledgementHandler handler) {
super(SESS_SEND, message);
@@ -44,7 +46,7 @@ public class SessionSendMessage extends MessagePacket {
this.requiresResponse = requiresResponse;
}
- public SessionSendMessage(final MessageInternal message) {
+ public SessionSendMessage(final CoreMessage message) {
super(SESS_SEND, message);
this.handler = null;
}
@@ -60,53 +62,29 @@ public class SessionSendMessage extends MessagePacket {
}
@Override
- public ActiveMQBuffer encode(final RemotingConnection connection) {
- ActiveMQBuffer buffer = message.getEncodedBuffer();
-
- ActiveMQBuffer bufferWrite;
- if (connection == null) {
- // this is for unit tests only
- bufferWrite = buffer.copy(0, buffer.capacity());
- } else {
- bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1, true); // 1 for the requireResponse
- }
- bufferWrite.writeBytes(buffer, 0, buffer.writerIndex());
- bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
-
- // Sanity check
- if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) {
- throw new IllegalStateException("Wrong encode position");
- }
-
- bufferWrite.writeBoolean(requiresResponse);
-
- size = bufferWrite.writerIndex();
-
- // Write standard headers
+ protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) {
+ return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + 1, connection, usePooled);
+ }
- int len = size - DataConstants.SIZE_INT;
- bufferWrite.setInt(0, len);
- bufferWrite.setByte(DataConstants.SIZE_INT, getType());
- bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
+ @Override
+ public void encodeRest(ActiveMQBuffer buffer) {
+ message.sendBuffer(buffer.byteBuf(), 0);
+ buffer.writeBoolean(requiresResponse);
- // Position reader for reading by Netty
- bufferWrite.readerIndex(0);
- return bufferWrite;
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
// Buffer comes in after having read standard headers and positioned at Beginning of body part
- message.decodeFromBuffer(buffer);
+ ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1);
+ message.receiveBuffer(messageBuffer);
- int ri = buffer.readerIndex();
+ buffer.readerIndex(buffer.capacity() - 1);
requiresResponse = buffer.readBoolean();
- buffer.readerIndex(ri);
-
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
index 65aeccb..8560f5d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
@@ -26,7 +26,7 @@ public class MapMessageUtil extends MessageUtil {
*/
public static void writeBodyMap(ActiveMQBuffer message, TypedProperties properties) {
message.resetWriterIndex();
- properties.encode(message);
+ properties.encode(message.byteBuf());
}
/**
@@ -43,7 +43,7 @@ public class MapMessageUtil extends MessageUtil {
*/
public static void readBodyMap(ActiveMQBuffer message, TypedProperties map) {
message.resetReaderIndex();
- map.decode(message);
+ map.decode(message.byteBuf());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 72795b7..8bb0081 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -33,7 +33,6 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.IDGenerator;
@@ -128,9 +127,9 @@ public abstract class SessionContext {
}
- public abstract int getCreditsOnSendingFull(MessageInternal msgI);
+ public abstract int getCreditsOnSendingFull(Message msgI);
- public abstract void sendFullMessage(MessageInternal msgI,
+ public abstract void sendFullMessage(Message msgI,
boolean sendBlocking,
SendAcknowledgementHandler handler,
SimpleString defaultAddress) throws ActiveMQException;
@@ -142,9 +141,9 @@ public abstract class SessionContext {
* @return
* @throws ActiveMQException
*/
- public abstract int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws ActiveMQException;
+ public abstract int sendInitialChunkOnLargeMessage(Message msgI) throws ActiveMQException;
- public abstract int sendLargeMessageChunk(MessageInternal msgI,
+ public abstract int sendLargeMessageChunk(Message msgI,
long messageBodySize,
boolean sendBlocking,
boolean lastChunk,
@@ -152,7 +151,7 @@ public abstract class SessionContext {
int reconnectID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException;
- public abstract int sendServerLargeMessageChunk(MessageInternal msgI,
+ public abstract int sendServerLargeMessageChunk(Message msgI,
long messageBodySize,
boolean sendBlocking,
boolean lastChunk,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/46be6a2f/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
new file mode 100644
index 0000000..5e92eaf
--- /dev/null
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.message;
+
+import java.util.LinkedList;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.apache.activemq.artemis.reader.TextMessageUtil;
+import org.apache.activemq.artemis.utils.Base64;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.UUID;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class CoreMessageTest {
+
+ public static final SimpleString ADDRESS = new SimpleString("this.local.address");
+ public static final byte MESSAGE_TYPE = Message.TEXT_TYPE;
+ public static final boolean DURABLE = true;
+ public static final long EXPIRATION = 123L;
+ public static final long TIMESTAMP = 321L;
+ public static final byte PRIORITY = (byte) 3;
+ public static final String TEXT = "hi";
+ public static final String BIGGER_TEXT = "AAAAAAAAAAAAAAAAAAAAAAAAA ASDF ASDF ASF ASD ASF ASDF ASDF ASDF ASF ADSF ASDF";
+ public static final String SMALLER_TEXT = "H";
+ public static final UUID uuid = new UUID(UUID.TYPE_TIME_BASED, new byte[]{0, 0, 0, 0,
+ 0, 0, 0, 0,
+ 0, 0, 0, 0,
+ 0, 0, 0, 1});
+ public static final SimpleString PROP1_NAME = new SimpleString("t1");
+ public static final SimpleString PROP1_VALUE = new SimpleString("value-t1");
+
+ /**
+ * This encode was generated by {@link #generate()}.
+ * Run it manually with a right-click on the IDE to eventually update it
+ * */
+ // body = "hi";
+ private final String STRING_ENCODE = "AAAAFgEAAAAEaABpAAAAAAAAAAAAAQAAACR0AGgAaQBzAC4AbABvAGMAYQBsAC4AYQBkAGQAcgBlAHMAcwAAAwEAAAAAAAAAewAAAAAAAAFBAwEAAAABAAAABHQAMQAKAAAAEHYAYQBsAHUAZQAtAHQAMQA=";
+
+ private ByteBuf BYTE_ENCODE;
+
+
+ @Before
+ public void before() {
+ BYTE_ENCODE = Unpooled.wrappedBuffer(Base64.decode(STRING_ENCODE, Base64.DONT_BREAK_LINES | Base64.URL_SAFE));
+ // some extra caution here, nothing else, to make sure we would get the same encoding back
+ Assert.assertEquals(STRING_ENCODE, encodeString(BYTE_ENCODE.array()));
+ BYTE_ENCODE.readerIndex(0).writerIndex(BYTE_ENCODE.capacity());
+ }
+
+ /** The message is received, then sent to the other side untouched */
+ @Test
+ public void testPassThrough() {
+ CoreMessage decodedMessage = decodeMessage();
+
+ Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(decodedMessage.getReadOnlyBodyBuffer()).toString());
+ }
+
+ /** The message is received, then sent to the other side untouched */
+ @Test
+ public void sendThroughPackets() {
+ CoreMessage decodedMessage = decodeMessage();
+
+ int encodeSize = decodedMessage.getEncodeSize();
+ Assert.assertEquals(BYTE_ENCODE.capacity(), encodeSize);
+
+ SessionSendMessage sendMessage = new SessionSendMessage(decodedMessage, true, null);
+ sendMessage.setChannelID(777);
+
+ ActiveMQBuffer buffer = sendMessage.encode(null);
+
+ byte[] byteArray = buffer.byteBuf().array();
+ System.out.println("Sending " + ByteUtil.bytesToHex(buffer.toByteBuffer().array(), 1) + ", bytes = " + byteArray.length);
+
+ buffer.readerIndex(5);
+
+ SessionSendMessage sendMessageReceivedSent = new SessionSendMessage(new CoreMessage());
+
+ sendMessageReceivedSent.decode(buffer);
+
+ Assert.assertEquals(encodeSize, sendMessageReceivedSent.getMessage().getEncodeSize());
+
+ Assert.assertTrue(sendMessageReceivedSent.isRequiresResponse());
+
+ Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(sendMessageReceivedSent.getMessage().getReadOnlyBodyBuffer()).toString());
+ }
+
+ /** The message is received, then sent to the other side untouched */
+ @Test
+ public void sendThroughPacketsClient() {
+ CoreMessage decodedMessage = decodeMessage();
+
+ int encodeSize = decodedMessage.getEncodeSize();
+ Assert.assertEquals(BYTE_ENCODE.capacity(), encodeSize);
+
+ SessionReceiveMessage sendMessage = new SessionReceiveMessage(33, decodedMessage, 7);
+ sendMessage.setChannelID(777);
+
+ ActiveMQBuffer buffer = sendMessage.encode(null);
+
+ buffer.readerIndex(5);
+
+ SessionReceiveMessage sendMessageReceivedSent = new SessionReceiveMessage(new CoreMessage());
+
+ sendMessageReceivedSent.decode(buffer);
+
+ Assert.assertEquals(33, sendMessageReceivedSent.getConsumerID());
+
+ Assert.assertEquals(7, sendMessageReceivedSent.getDeliveryCount());
+
+ Assert.assertEquals(encodeSize, sendMessageReceivedSent.getMessage().getEncodeSize());
+
+ Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(sendMessageReceivedSent.getMessage().getReadOnlyBodyBuffer()).toString());
+ }
+
+ private CoreMessage decodeMessage() {
+
+ ByteBuf newBuffer = Unpooled.buffer(BYTE_ENCODE.capacity());
+ newBuffer.writeBytes(BYTE_ENCODE, 0, BYTE_ENCODE.writerIndex());
+
+ CoreMessage coreMessage = internalDecode(newBuffer);
+
+ int encodeSize = coreMessage.getEncodeSize();
+
+ Assert.assertEquals(newBuffer.capacity(), encodeSize);
+
+ Assert.assertEquals(ADDRESS, coreMessage.getAddressSimpleString());
+
+ Assert.assertEquals(PROP1_VALUE.toString(), coreMessage.getStringProperty(PROP1_NAME));
+
+ ByteBuf destinedBuffer = Unpooled.buffer(BYTE_ENCODE.array().length);
+ coreMessage.sendBuffer(destinedBuffer, 0);
+
+ byte[] destinedArray = destinedBuffer.array();
+ byte[] sourceArray = BYTE_ENCODE.array();
+
+ CoreMessage newDecoded = internalDecode(Unpooled.wrappedBuffer(destinedArray));
+
+ Assert.assertEquals(encodeSize, newDecoded.getEncodeSize());
+
+ Assert.assertArrayEquals(sourceArray, destinedArray);
+
+ return coreMessage;
+ }
+
+ private CoreMessage internalDecode(ByteBuf bufferOrigin) {
+ CoreMessage coreMessage = new CoreMessage();
+// System.out.println("Bytes from test " + ByteUtil.bytesToHex(bufferOrigin.array(), 1));
+ coreMessage.receiveBuffer(bufferOrigin);
+ return coreMessage;
+ }
+
+ /** The message is received, then sent to the other side untouched */
+ @Test
+ public void testChangeBodyStringSameSize() {
+ testChangeBodyString(TEXT.toUpperCase());
+ }
+
+ @Test
+ public void testChangeBodyBiggerString() {
+ testChangeBodyString(BIGGER_TEXT);
+ }
+
+ @Test
+ public void testGenerateEmpty() {
+ CoreMessage empty = new CoreMessage().initBuffer(100);
+ ByteBuf buffer = Unpooled.buffer(200);
+ empty.sendBuffer(buffer, 0);
+
+ CoreMessage empty2 = new CoreMessage();
+ empty2.receiveBuffer(buffer);
+
+ try {
+ empty2.getBodyBuffer().readByte();
+ Assert.fail("should throw exception");
+ } catch (Exception expected) {
+
+ }
+ }
+
+ @Test
+ public void testSaveReceiveLimitedBytes() {
+ CoreMessage empty = new CoreMessage().initBuffer(100);
+ System.out.println("R " + empty.getBodyBuffer().readerIndex() + " W " + empty.getBodyBuffer().writerIndex());
+ empty.getBodyBuffer().writeByte((byte)7);
+ System.out.println("R " + empty.getBodyBuffer().readerIndex() + " W " + empty.getBodyBuffer().writerIndex());
+
+ ByteBuf buffer = Unpooled.buffer(200);
+ empty.sendBuffer(buffer, 0);
+
+ CoreMessage empty2 = new CoreMessage();
+ empty2.receiveBuffer(buffer);
+
+ Assert.assertEquals((byte)7, empty2.getBodyBuffer().readByte());
+
+ System.out.println("Readable :: " + empty2.getBodyBuffer().readerIndex() + " writer :" + empty2.getBodyBuffer().writerIndex());
+
+ try {
+ empty2.getBodyBuffer().readByte();
+ Assert.fail("should throw exception");
+ } catch (Exception expected) {
+
+ }
+ }
+
+ @Test
+ public void testChangeBodySmallerString() {
+ testChangeBodyString(SMALLER_TEXT);
+ }
+
+ public void testChangeBodyString(String newString) {
+ CoreMessage coreMessage = decodeMessage();
+
+ coreMessage.putStringProperty("newProperty", "newValue");
+ ActiveMQBuffer legacyBuffer = coreMessage.getBodyBuffer();
+ legacyBuffer.resetWriterIndex();
+ legacyBuffer.clear();
+
+ TextMessageUtil.writeBodyText(legacyBuffer, SimpleString.toSimpleString(newString));
+
+ ByteBuf newbuffer = Unpooled.buffer(150000);
+
+ coreMessage.sendBuffer(newbuffer, 0);
+ newbuffer.readerIndex(0);
+
+ CoreMessage newCoreMessage = new CoreMessage();
+ newCoreMessage.receiveBuffer(newbuffer);
+
+
+ SimpleString newText = TextMessageUtil.readBodyText(newCoreMessage.getReadOnlyBodyBuffer());
+
+ Assert.assertEquals(newString, newText.toString());
+
+// coreMessage.putStringProperty()
+ }
+
+ @Test
+ public void testPassThroughMultipleThreads() throws Throwable {
+ CoreMessage coreMessage = new CoreMessage();
+ coreMessage.receiveBuffer(BYTE_ENCODE);
+
+ LinkedList<Throwable> errors = new LinkedList<>();
+
+ Thread[] threads = new Thread[50];
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(() -> {
+ try {
+ for (int j = 0; j < 50; j++) {
+ Assert.assertEquals(ADDRESS, coreMessage.getAddressSimpleString());
+ Assert.assertEquals(PROP1_VALUE.toString(), coreMessage.getStringProperty(PROP1_NAME));
+
+ ByteBuf destinedBuffer = Unpooled.buffer(BYTE_ENCODE.array().length);
+ coreMessage.sendBuffer(destinedBuffer, 0);
+
+ byte[] destinedArray = destinedBuffer.array();
+ byte[] sourceArray = BYTE_ENCODE.array();
+
+ Assert.assertArrayEquals(sourceArray, destinedArray);
+
+ Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(coreMessage.getReadOnlyBodyBuffer()).toString());
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ errors.add(e);
+ }
+ });
+ }
+
+ for (Thread t : threads) {
+ t.start();
+ }
+
+ for (Thread t : threads) {
+ t.join();
+ }
+
+ for (Throwable e: errors) {
+ throw e;
+ }
+
+ }
+
+ // This is to compare the original encoding with the current version
+ @Test
+ public void compareOriginal() throws Exception {
+ String generated = generate(TEXT);
+
+ Assert.assertEquals(STRING_ENCODE, generated);
+
+ for (int i = 0; i < generated.length(); i++) {
+ Assert.assertEquals("Chart at " + i + " was " + generated.charAt(i) + " instead of " + STRING_ENCODE.charAt(i), generated.charAt(i), STRING_ENCODE.charAt(i));
+ }
+ }
+
+ /** Use this method to update the encode for the known message */
+ @Ignore
+ @Test
+ public void generate() throws Exception {
+
+ printVariable(TEXT, generate(TEXT));
+ printVariable(SMALLER_TEXT, generate(SMALLER_TEXT));
+ printVariable(BIGGER_TEXT, generate(BIGGER_TEXT));
+
+ }
+
+ private void printVariable(String body, String encode) {
+ System.out.println("// body = \"" + body + "\";");
+ System.out.println("private final String STRING_ENCODE = \"" + encode + "\";");
+
+ }
+
+ public String generate(String body) throws Exception {
+
+ ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024);
+ TextMessageUtil.writeBodyText(message.getBodyBuffer(), SimpleString.toSimpleString(body));
+
+ message.setAddress(ADDRESS);
+ message.setUserID(uuid);
+ message.getProperties().putSimpleStringProperty(PROP1_NAME, PROP1_VALUE);
+
+
+ ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(10 * 1024);
+ message.sendBuffer(buffer.byteBuf(), 0);
+
+ byte[] bytes = new byte[buffer.byteBuf().writerIndex()];
+ buffer.byteBuf().readBytes(bytes);
+
+ return encodeString(bytes);
+
+ // replace the code
+
+
+ }
+
+ private String encodeString(byte[] bytes) {
+ return Base64.encodeBytes(bytes, 0, bytes.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+ }
+
+}