You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/05 16:50:14 UTC
[13/17] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
index c21ebda..b0ab52b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
@@ -17,7 +17,8 @@
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.DataConstants;
@@ -30,7 +31,7 @@ public class SessionReceiveMessage extends MessagePacket {
private int deliveryCount;
- public SessionReceiveMessage(final long consumerID, final MessageInternal message, final int deliveryCount) {
+ public SessionReceiveMessage(final long consumerID, final ICoreMessage message, final int deliveryCount) {
super(SESS_RECEIVE_MSG, message);
this.consumerID = consumerID;
@@ -38,7 +39,7 @@ public class SessionReceiveMessage extends MessagePacket {
this.deliveryCount = deliveryCount;
}
- public SessionReceiveMessage(final MessageInternal message) {
+ public SessionReceiveMessage(final CoreMessage message) {
super(SESS_RECEIVE_MSG, message);
}
@@ -53,53 +54,28 @@ public class SessionReceiveMessage extends MessagePacket {
}
@Override
- public ActiveMQBuffer encode(final RemotingConnection connection) {
- ActiveMQBuffer buffer = message.getEncodedBuffer();
-
- ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, true);
- bufferWrite.writeBytes(buffer, 0, buffer.capacity());
- bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
-
- // Sanity check
- if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) {
- throw new IllegalStateException("Wrong encode position");
- }
-
- bufferWrite.writeLong(consumerID);
- bufferWrite.writeInt(deliveryCount);
-
- size = bufferWrite.writerIndex();
-
- // Write standard headers
-
- int len = size - DataConstants.SIZE_INT;
- bufferWrite.setInt(0, len);
- bufferWrite.setByte(DataConstants.SIZE_INT, getType());
- bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
-
- // Position reader for reading by Netty
- bufferWrite.setIndex(0, size);
-
- return bufferWrite;
+ protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) {
+ return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, connection, usePooled);
}
@Override
- public void decode(final ActiveMQBuffer buffer) {
- channelID = buffer.readLong();
-
- message.decodeFromBuffer(buffer);
-
- consumerID = buffer.readLong();
+ public void encodeRest(ActiveMQBuffer buffer) {
+ message.sendBuffer(buffer.byteBuf(), deliveryCount);
+ buffer.writeLong(consumerID);
+ buffer.writeInt(deliveryCount);
+ }
- deliveryCount = buffer.readInt();
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ // Buffer comes in after having read standard headers and positioned at Beginning of body part
- size = buffer.readerIndex();
+ message.receiveBuffer(copyMessageBuffer(buffer.byteBuf(), DataConstants.SIZE_LONG + DataConstants.SIZE_INT));
- // Need to position buffer for reading
+ buffer.readerIndex(buffer.capacity() - DataConstants.SIZE_LONG - DataConstants.SIZE_INT);
+ this.consumerID = buffer.readLong();
+ this.deliveryCount = buffer.readInt();
- buffer.setIndex(PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getEndOfBodyPosition());
}
-
@Override
public int hashCode() {
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
index b4ec027..0ecfe33 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
@@ -17,8 +17,8 @@
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
/**
* A SessionSendContinuationMessage<br>
@@ -28,7 +28,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
private boolean requiresResponse;
// Used on confirmation handling
- private MessageInternal message;
+ private Message message;
/**
* In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession}
* <br>
@@ -58,7 +58,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final MessageInternal message,
+ public SessionSendContinuationMessage(final Message message,
final byte[] body,
final boolean continues,
final boolean requiresResponse,
@@ -87,7 +87,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
/**
* @return the message
*/
- public MessageInternal getMessage() {
+ public Message getMessage() {
return message;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
index bf4290b..869940c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class SessionSendLargeMessage extends PacketImpl implements MessagePacketI {
@@ -26,13 +26,13 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket
/**
* Used only if largeMessage
*/
- private final MessageInternal largeMessage;
+ private final Message largeMessage;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionSendLargeMessage(final MessageInternal largeMessage) {
+ public SessionSendLargeMessage(final Message largeMessage) {
super(SESS_SEND_LARGE);
this.largeMessage = largeMessage;
@@ -40,7 +40,7 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket
// Public --------------------------------------------------------
- public MessageInternal getLargeMessage() {
+ public Message getLargeMessage() {
return largeMessage;
}
@@ -51,12 +51,12 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
- largeMessage.encodeHeadersAndProperties(buffer);
+ ((CoreMessage)largeMessage).encodeHeadersAndProperties(buffer.byteBuf());
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
- largeMessage.decodeHeadersAndProperties(buffer);
+ ((CoreMessage)largeMessage).decodeHeadersAndProperties(buffer.byteBuf());
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
index c7bb30e..43bb0be 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -16,11 +16,12 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.utils.DataConstants;
public class SessionSendMessage extends MessagePacket {
@@ -36,7 +37,8 @@ public class SessionSendMessage extends MessagePacket {
*/
private final transient SendAcknowledgementHandler handler;
- public SessionSendMessage(final MessageInternal message,
+ /** This will be using the CoreMessage because it is meant for the core-protocol */
+ public SessionSendMessage(final ICoreMessage message,
final boolean requiresResponse,
final SendAcknowledgementHandler handler) {
super(SESS_SEND, message);
@@ -44,7 +46,7 @@ public class SessionSendMessage extends MessagePacket {
this.requiresResponse = requiresResponse;
}
- public SessionSendMessage(final MessageInternal message) {
+ public SessionSendMessage(final CoreMessage message) {
super(SESS_SEND, message);
this.handler = null;
}
@@ -60,53 +62,29 @@ public class SessionSendMessage extends MessagePacket {
}
@Override
- public ActiveMQBuffer encode(final RemotingConnection connection) {
- ActiveMQBuffer buffer = message.getEncodedBuffer();
-
- ActiveMQBuffer bufferWrite;
- if (connection == null) {
- // this is for unit tests only
- bufferWrite = buffer.copy(0, buffer.capacity());
- } else {
- bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1, true); // 1 for the requireResponse
- }
- bufferWrite.writeBytes(buffer, 0, buffer.writerIndex());
- bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
-
- // Sanity check
- if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) {
- throw new IllegalStateException("Wrong encode position");
- }
-
- bufferWrite.writeBoolean(requiresResponse);
-
- size = bufferWrite.writerIndex();
-
- // Write standard headers
+ protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) {
+ return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + 1, connection, usePooled);
+ }
- int len = size - DataConstants.SIZE_INT;
- bufferWrite.setInt(0, len);
- bufferWrite.setByte(DataConstants.SIZE_INT, getType());
- bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
+ @Override
+ public void encodeRest(ActiveMQBuffer buffer) {
+ message.sendBuffer(buffer.byteBuf(), 0);
+ buffer.writeBoolean(requiresResponse);
- // Position reader for reading by Netty
- bufferWrite.readerIndex(0);
- return bufferWrite;
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
// Buffer comes in after having read standard headers and positioned at Beginning of body part
- message.decodeFromBuffer(buffer);
+ ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1);
+ message.receiveBuffer(messageBuffer);
- int ri = buffer.readerIndex();
+ buffer.readerIndex(buffer.capacity() - 1);
requiresResponse = buffer.readBoolean();
- buffer.readerIndex(ri);
-
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
index 65aeccb..8560f5d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
@@ -26,7 +26,7 @@ public class MapMessageUtil extends MessageUtil {
*/
public static void writeBodyMap(ActiveMQBuffer message, TypedProperties properties) {
message.resetWriterIndex();
- properties.encode(message);
+ properties.encode(message.byteBuf());
}
/**
@@ -43,7 +43,7 @@ public class MapMessageUtil extends MessageUtil {
*/
public static void readBodyMap(ActiveMQBuffer message, TypedProperties map) {
message.resetReaderIndex();
- map.decode(message);
+ map.decode(message.byteBuf());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 72795b7..3fddb8e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -23,7 +23,9 @@ import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
@@ -33,8 +35,6 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@@ -128,9 +128,9 @@ public abstract class SessionContext {
}
- public abstract int getCreditsOnSendingFull(MessageInternal msgI);
+ public abstract int getCreditsOnSendingFull(Message msgI);
- public abstract void sendFullMessage(MessageInternal msgI,
+ public abstract void sendFullMessage(ICoreMessage msgI,
boolean sendBlocking,
SendAcknowledgementHandler handler,
SimpleString defaultAddress) throws ActiveMQException;
@@ -142,9 +142,9 @@ public abstract class SessionContext {
* @return
* @throws ActiveMQException
*/
- public abstract int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws ActiveMQException;
+ public abstract int sendInitialChunkOnLargeMessage(Message msgI) throws ActiveMQException;
- public abstract int sendLargeMessageChunk(MessageInternal msgI,
+ public abstract int sendLargeMessageChunk(Message msgI,
long messageBodySize,
boolean sendBlocking,
boolean lastChunk,
@@ -152,7 +152,7 @@ public abstract class SessionContext {
int reconnectID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException;
- public abstract int sendServerLargeMessageChunk(MessageInternal msgI,
+ public abstract int sendServerLargeMessageChunk(Message msgI,
long messageBodySize,
boolean sendBlocking,
boolean lastChunk,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
new file mode 100644
index 0000000..5e92eaf
--- /dev/null
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.message;
+
+import java.util.LinkedList;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.apache.activemq.artemis.reader.TextMessageUtil;
+import org.apache.activemq.artemis.utils.Base64;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.UUID;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class CoreMessageTest {
+
+ public static final SimpleString ADDRESS = new SimpleString("this.local.address");
+ public static final byte MESSAGE_TYPE = Message.TEXT_TYPE;
+ public static final boolean DURABLE = true;
+ public static final long EXPIRATION = 123L;
+ public static final long TIMESTAMP = 321L;
+ public static final byte PRIORITY = (byte) 3;
+ public static final String TEXT = "hi";
+ public static final String BIGGER_TEXT = "AAAAAAAAAAAAAAAAAAAAAAAAA ASDF ASDF ASF ASD ASF ASDF ASDF ASDF ASF ADSF ASDF";
+ public static final String SMALLER_TEXT = "H";
+ public static final UUID uuid = new UUID(UUID.TYPE_TIME_BASED, new byte[]{0, 0, 0, 0,
+ 0, 0, 0, 0,
+ 0, 0, 0, 0,
+ 0, 0, 0, 1});
+ public static final SimpleString PROP1_NAME = new SimpleString("t1");
+ public static final SimpleString PROP1_VALUE = new SimpleString("value-t1");
+
+ /**
+ * This encode was generated by {@link #generate()}.
+ * Run it manually with a right-click on the IDE to eventually update it
+ * */
+ // body = "hi";
+ private final String STRING_ENCODE = "AAAAFgEAAAAEaABpAAAAAAAAAAAAAQAAACR0AGgAaQBzAC4AbABvAGMAYQBsAC4AYQBkAGQAcgBlAHMAcwAAAwEAAAAAAAAAewAAAAAAAAFBAwEAAAABAAAABHQAMQAKAAAAEHYAYQBsAHUAZQAtAHQAMQA=";
+
+ private ByteBuf BYTE_ENCODE;
+
+
+ @Before
+ public void before() {
+ BYTE_ENCODE = Unpooled.wrappedBuffer(Base64.decode(STRING_ENCODE, Base64.DONT_BREAK_LINES | Base64.URL_SAFE));
+ // some extra caution here, nothing else, to make sure we would get the same encoding back
+ Assert.assertEquals(STRING_ENCODE, encodeString(BYTE_ENCODE.array()));
+ BYTE_ENCODE.readerIndex(0).writerIndex(BYTE_ENCODE.capacity());
+ }
+
+ /** The message is received, then sent to the other side untouched */
+ @Test
+ public void testPassThrough() {
+ CoreMessage decodedMessage = decodeMessage();
+
+ Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(decodedMessage.getReadOnlyBodyBuffer()).toString());
+ }
+
+ /** The message is received, then sent to the other side untouched */
+ @Test
+ public void sendThroughPackets() {
+ CoreMessage decodedMessage = decodeMessage();
+
+ int encodeSize = decodedMessage.getEncodeSize();
+ Assert.assertEquals(BYTE_ENCODE.capacity(), encodeSize);
+
+ SessionSendMessage sendMessage = new SessionSendMessage(decodedMessage, true, null);
+ sendMessage.setChannelID(777);
+
+ ActiveMQBuffer buffer = sendMessage.encode(null);
+
+ byte[] byteArray = buffer.byteBuf().array();
+ System.out.println("Sending " + ByteUtil.bytesToHex(buffer.toByteBuffer().array(), 1) + ", bytes = " + byteArray.length);
+
+ buffer.readerIndex(5);
+
+ SessionSendMessage sendMessageReceivedSent = new SessionSendMessage(new CoreMessage());
+
+ sendMessageReceivedSent.decode(buffer);
+
+ Assert.assertEquals(encodeSize, sendMessageReceivedSent.getMessage().getEncodeSize());
+
+ Assert.assertTrue(sendMessageReceivedSent.isRequiresResponse());
+
+ Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(sendMessageReceivedSent.getMessage().getReadOnlyBodyBuffer()).toString());
+ }
+
+ /** The message is received, then sent to the other side untouched */
+ @Test
+ public void sendThroughPacketsClient() {
+ CoreMessage decodedMessage = decodeMessage();
+
+ int encodeSize = decodedMessage.getEncodeSize();
+ Assert.assertEquals(BYTE_ENCODE.capacity(), encodeSize);
+
+ SessionReceiveMessage sendMessage = new SessionReceiveMessage(33, decodedMessage, 7);
+ sendMessage.setChannelID(777);
+
+ ActiveMQBuffer buffer = sendMessage.encode(null);
+
+ buffer.readerIndex(5);
+
+ SessionReceiveMessage sendMessageReceivedSent = new SessionReceiveMessage(new CoreMessage());
+
+ sendMessageReceivedSent.decode(buffer);
+
+ Assert.assertEquals(33, sendMessageReceivedSent.getConsumerID());
+
+ Assert.assertEquals(7, sendMessageReceivedSent.getDeliveryCount());
+
+ Assert.assertEquals(encodeSize, sendMessageReceivedSent.getMessage().getEncodeSize());
+
+ Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(sendMessageReceivedSent.getMessage().getReadOnlyBodyBuffer()).toString());
+ }
+
+ private CoreMessage decodeMessage() {
+
+ ByteBuf newBuffer = Unpooled.buffer(BYTE_ENCODE.capacity());
+ newBuffer.writeBytes(BYTE_ENCODE, 0, BYTE_ENCODE.writerIndex());
+
+ CoreMessage coreMessage = internalDecode(newBuffer);
+
+ int encodeSize = coreMessage.getEncodeSize();
+
+ Assert.assertEquals(newBuffer.capacity(), encodeSize);
+
+ Assert.assertEquals(ADDRESS, coreMessage.getAddressSimpleString());
+
+ Assert.assertEquals(PROP1_VALUE.toString(), coreMessage.getStringProperty(PROP1_NAME));
+
+ ByteBuf destinedBuffer = Unpooled.buffer(BYTE_ENCODE.array().length);
+ coreMessage.sendBuffer(destinedBuffer, 0);
+
+ byte[] destinedArray = destinedBuffer.array();
+ byte[] sourceArray = BYTE_ENCODE.array();
+
+ CoreMessage newDecoded = internalDecode(Unpooled.wrappedBuffer(destinedArray));
+
+ Assert.assertEquals(encodeSize, newDecoded.getEncodeSize());
+
+ Assert.assertArrayEquals(sourceArray, destinedArray);
+
+ return coreMessage;
+ }
+
+ private CoreMessage internalDecode(ByteBuf bufferOrigin) {
+ CoreMessage coreMessage = new CoreMessage();
+// System.out.println("Bytes from test " + ByteUtil.bytesToHex(bufferOrigin.array(), 1));
+ coreMessage.receiveBuffer(bufferOrigin);
+ return coreMessage;
+ }
+
+ /** The message is received, then sent to the other side untouched */
+ @Test
+ public void testChangeBodyStringSameSize() {
+ testChangeBodyString(TEXT.toUpperCase());
+ }
+
+ @Test
+ public void testChangeBodyBiggerString() {
+ testChangeBodyString(BIGGER_TEXT);
+ }
+
+ @Test
+ public void testGenerateEmpty() {
+ CoreMessage empty = new CoreMessage().initBuffer(100);
+ ByteBuf buffer = Unpooled.buffer(200);
+ empty.sendBuffer(buffer, 0);
+
+ CoreMessage empty2 = new CoreMessage();
+ empty2.receiveBuffer(buffer);
+
+ try {
+ empty2.getBodyBuffer().readByte();
+ Assert.fail("should throw exception");
+ } catch (Exception expected) {
+
+ }
+ }
+
+ @Test
+ public void testSaveReceiveLimitedBytes() {
+ CoreMessage empty = new CoreMessage().initBuffer(100);
+ System.out.println("R " + empty.getBodyBuffer().readerIndex() + " W " + empty.getBodyBuffer().writerIndex());
+ empty.getBodyBuffer().writeByte((byte)7);
+ System.out.println("R " + empty.getBodyBuffer().readerIndex() + " W " + empty.getBodyBuffer().writerIndex());
+
+ ByteBuf buffer = Unpooled.buffer(200);
+ empty.sendBuffer(buffer, 0);
+
+ CoreMessage empty2 = new CoreMessage();
+ empty2.receiveBuffer(buffer);
+
+ Assert.assertEquals((byte)7, empty2.getBodyBuffer().readByte());
+
+ System.out.println("Readable :: " + empty2.getBodyBuffer().readerIndex() + " writer :" + empty2.getBodyBuffer().writerIndex());
+
+ try {
+ empty2.getBodyBuffer().readByte();
+ Assert.fail("should throw exception");
+ } catch (Exception expected) {
+
+ }
+ }
+
+ @Test
+ public void testChangeBodySmallerString() {
+ testChangeBodyString(SMALLER_TEXT);
+ }
+
+ public void testChangeBodyString(String newString) {
+ CoreMessage coreMessage = decodeMessage();
+
+ coreMessage.putStringProperty("newProperty", "newValue");
+ ActiveMQBuffer legacyBuffer = coreMessage.getBodyBuffer();
+ legacyBuffer.resetWriterIndex();
+ legacyBuffer.clear();
+
+ TextMessageUtil.writeBodyText(legacyBuffer, SimpleString.toSimpleString(newString));
+
+ ByteBuf newbuffer = Unpooled.buffer(150000);
+
+ coreMessage.sendBuffer(newbuffer, 0);
+ newbuffer.readerIndex(0);
+
+ CoreMessage newCoreMessage = new CoreMessage();
+ newCoreMessage.receiveBuffer(newbuffer);
+
+
+ SimpleString newText = TextMessageUtil.readBodyText(newCoreMessage.getReadOnlyBodyBuffer());
+
+ Assert.assertEquals(newString, newText.toString());
+
+// coreMessage.putStringProperty()
+ }
+
+ @Test
+ public void testPassThroughMultipleThreads() throws Throwable {
+ CoreMessage coreMessage = new CoreMessage();
+ coreMessage.receiveBuffer(BYTE_ENCODE);
+
+ LinkedList<Throwable> errors = new LinkedList<>();
+
+ Thread[] threads = new Thread[50];
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(() -> {
+ try {
+ for (int j = 0; j < 50; j++) {
+ Assert.assertEquals(ADDRESS, coreMessage.getAddressSimpleString());
+ Assert.assertEquals(PROP1_VALUE.toString(), coreMessage.getStringProperty(PROP1_NAME));
+
+ ByteBuf destinedBuffer = Unpooled.buffer(BYTE_ENCODE.array().length);
+ coreMessage.sendBuffer(destinedBuffer, 0);
+
+ byte[] destinedArray = destinedBuffer.array();
+ byte[] sourceArray = BYTE_ENCODE.array();
+
+ Assert.assertArrayEquals(sourceArray, destinedArray);
+
+ Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(coreMessage.getReadOnlyBodyBuffer()).toString());
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ errors.add(e);
+ }
+ });
+ }
+
+ for (Thread t : threads) {
+ t.start();
+ }
+
+ for (Thread t : threads) {
+ t.join();
+ }
+
+ for (Throwable e: errors) {
+ throw e;
+ }
+
+ }
+
+ // This is to compare the original encoding with the current version
+ @Test
+ public void compareOriginal() throws Exception {
+ String generated = generate(TEXT);
+
+ Assert.assertEquals(STRING_ENCODE, generated);
+
+ for (int i = 0; i < generated.length(); i++) {
+ Assert.assertEquals("Chart at " + i + " was " + generated.charAt(i) + " instead of " + STRING_ENCODE.charAt(i), generated.charAt(i), STRING_ENCODE.charAt(i));
+ }
+ }
+
+ /** Use this method to update the encode for the known message */
+ @Ignore
+ @Test
+ public void generate() throws Exception {
+
+ printVariable(TEXT, generate(TEXT));
+ printVariable(SMALLER_TEXT, generate(SMALLER_TEXT));
+ printVariable(BIGGER_TEXT, generate(BIGGER_TEXT));
+
+ }
+
+ private void printVariable(String body, String encode) {
+ System.out.println("// body = \"" + body + "\";");
+ System.out.println("private final String STRING_ENCODE = \"" + encode + "\";");
+
+ }
+
+ public String generate(String body) throws Exception {
+
+ ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024);
+ TextMessageUtil.writeBodyText(message.getBodyBuffer(), SimpleString.toSimpleString(body));
+
+ message.setAddress(ADDRESS);
+ message.setUserID(uuid);
+ message.getProperties().putSimpleStringProperty(PROP1_NAME, PROP1_VALUE);
+
+
+ ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(10 * 1024);
+ message.sendBuffer(buffer.byteBuf(), 0);
+
+ byte[] bytes = new byte[buffer.byteBuf().writerIndex()];
+ buffer.byteBuf().readBytes(bytes);
+
+ return encodeString(bytes);
+
+ // replace the code
+
+
+ }
+
+ private String encodeString(byte[] bytes) {
+ return Base64.encodeBytes(bytes, 0, bytes.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index eb7cda1..2108be7 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -31,11 +31,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
@@ -366,10 +368,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
@Override
- public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+ public void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
- r.setRecord(record);
+ r.setRecord(persister, record);
r.setSync(sync);
appendRecord(r);
}
@@ -377,12 +379,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
@Override
public void appendAddRecord(long id,
byte recordType,
- EncodingSupport record,
+ Persister persister,
+ Object record,
boolean sync,
IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
- r.setRecord(record);
+ r.setRecord(persister, record);
r.setSync(sync);
r.setIoCompletion(completionCallback);
appendRecord(r);
@@ -398,10 +401,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
@Override
- public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+ public void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
- r.setRecord(record);
+ r.setRecord(persister, record);
r.setSync(sync);
appendRecord(r);
}
@@ -409,12 +412,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
@Override
public void appendUpdateRecord(long id,
byte recordType,
- EncodingSupport record,
+ Persister persister,
+ Object record,
boolean sync,
IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
- r.setRecord(record);
+ r.setRecord(persister, record);
r.setSync(sync);
r.setIoCompletion(completionCallback);
appendRecord(r);
@@ -448,10 +452,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
public void appendAddRecordTransactional(long txID,
long id,
byte recordType,
- EncodingSupport record) throws Exception {
+ Persister persister,
+ Object record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
r.setUserRecordType(recordType);
- r.setRecord(record);
+ r.setRecord(persister, record);
r.setTxId(txID);
appendRecord(r);
}
@@ -469,10 +474,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
public void appendUpdateRecordTransactional(long txID,
long id,
byte recordType,
- EncodingSupport record) throws Exception {
+ Persister persister,
+ Object record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet());
r.setUserRecordType(recordType);
- r.setRecord(record);
+ r.setRecord(persister, record);
r.setTxId(txID);
appendRecord(r);
}
@@ -488,7 +494,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
@Override
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
- r.setRecord(record);
+ r.setRecord(EncoderPersister.getInstance(), record);
r.setTxId(txID);
appendRecord(r);
}
@@ -685,10 +691,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
@Override
- public void perfBlast(int pages) {
- }
-
- @Override
public void runDirectJournalBlast() throws Exception {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
index 9691d3e..b094164 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
@@ -237,11 +238,11 @@ class JDBCJournalRecord {
this.record = record;
}
- public void setRecord(EncodingSupport record) {
- this.variableSize = record.getEncodeSize();
+ public void setRecord(Persister persister, Object record) {
+ this.variableSize = persister.getEncodeSize(record);
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(variableSize);
- record.encode(encodedBuffer);
+ persister.encode(encodedBuffer, record);
this.record = new ActiveMQBufferInputStream(encodedBuffer);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java
index 0e99106..4d0306b 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.api.jms.management;
import javax.jms.JMSException;
import javax.jms.Message;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
@@ -27,7 +28,7 @@ import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
*/
public class JMSManagementHelper {
- private static org.apache.activemq.artemis.api.core.Message getCoreMessage(final Message jmsMessage) {
+ private static ClientMessage getCoreMessage(final Message jmsMessage) {
if (jmsMessage instanceof ActiveMQMessage == false) {
throw new IllegalArgumentException("Cannot send a foreign message as a management message " + jmsMessage.getClass().getName());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
index 59f04e8..6da3912 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
@@ -26,7 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset;
import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean;
@@ -374,7 +374,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
if (bodyLength == 0)
return null;
byte[] dst = new byte[bodyLength];
- message.getBodyBuffer().getBytes(MessageImpl.BODY_OFFSET, dst);
+ message.getBodyBuffer().getBytes(CoreMessage.BODY_OFFSET, dst);
return (T) dst;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index 47dcfb2..80a07ac 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -43,7 +43,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID;
@@ -293,7 +293,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public String getJMSMessageID() {
if (msgID == null) {
- UUID uid = message.getUserID();
+ UUID uid = (UUID)message.getUserID();
msgID = uid == null ? null : "ID:" + uid.toString();
}
@@ -397,7 +397,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public Destination getJMSDestination() throws JMSException {
if (dest == null) {
- SimpleString address = message.getAddress();
+ SimpleString address = message.getAddressSimpleString();
String prefix = "";
if (message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)) {
RoutingType routingType = RoutingType.getType(message.getByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE));
@@ -756,7 +756,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@SuppressWarnings("unchecked")
protected <T> T getBodyInternal(Class<T> c) throws MessageFormatException {
- InputStream is = ((MessageInternal) message).getBodyInputStream();
+ InputStream is = ((ClientMessageInternal) message).getBodyInputStream();
try {
ObjectInputStream ois = new ObjectInputStream(is);
return (T) ois.readObject();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
index 6cf20ff..ecb4ccb 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
@@ -19,7 +19,8 @@ package org.apache.activemq.artemis.jms.transaction;
import javax.transaction.xa.Xid;
import java.util.Map;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionDetail;
import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
@@ -36,8 +37,11 @@ public class JMSTransactionDetail extends TransactionDetail {
}
@Override
- public String decodeMessageType(ServerMessage msg) {
- int type = msg.getType();
+ public String decodeMessageType(Message msg) {
+ if (!(msg instanceof ICoreMessage)) {
+ return "N/A";
+ }
+ int type = ((ICoreMessage) msg).getType();
switch (type) {
case ActiveMQMessage.TYPE: // 0
return "Default";
@@ -57,7 +61,7 @@ public class JMSTransactionDetail extends TransactionDetail {
}
@Override
- public Map<String, Object> decodeMessageProperties(ServerMessage msg) {
+ public Map<String, Object> decodeMessageProperties(Message msg) {
try {
return ActiveMQMessage.coreMaptoJMSMap(msg.toMap());
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
new file mode 100644
index 0000000..8fc2a5aa
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.persistence.Persister;
+
+/** This is a facade between the new Persister and the former EncodingSupport.
+ * Methods using the old interface will use this as a facade to provide the previous semantic. */
+public class EncoderPersister implements Persister<EncodingSupport> {
+
+ private static final EncoderPersister theInstance = new EncoderPersister();
+
+ private EncoderPersister() {
+ }
+
+ public static EncoderPersister getInstance() {
+ return theInstance;
+ }
+
+ @Override
+ public int getEncodeSize(EncodingSupport record) {
+ return record.getEncodeSize();
+ }
+
+ @Override
+ public void encode(ActiveMQBuffer buffer, EncodingSupport record) {
+ record.encode(buffer);
+ }
+
+ @Override
+ public EncodingSupport decode(ActiveMQBuffer buffer, EncodingSupport record) {
+ record.decode(buffer);
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index fbd4182..ca194b8 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
/**
@@ -60,23 +61,49 @@ public interface Journal extends ActiveMQComponent {
void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
- void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+ default void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+ appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
+ }
+
+ void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
void appendAddRecord(long id,
byte recordType,
- EncodingSupport record,
+ Persister persister,
+ Object record,
boolean sync,
IOCompletion completionCallback) throws Exception;
+ default void appendAddRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception {
+ appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
+ }
+
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
- void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+ default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+ appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
+ }
- void appendUpdateRecord(long id,
+ void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
+
+ default void appendUpdateRecord(long id,
byte recordType,
EncodingSupport record,
boolean sync,
- IOCompletion completionCallback) throws Exception;
+ IOCompletion completionCallback) throws Exception {
+ appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
+ }
+
+ void appendUpdateRecord(final long id,
+ final byte recordType,
+ final Persister persister,
+ final Object record,
+ final boolean sync,
+ final IOCompletion callback) throws Exception;
void appendDeleteRecord(long id, boolean sync) throws Exception;
@@ -86,11 +113,23 @@ public interface Journal extends ActiveMQComponent {
void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
- void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+ default void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception {
+ appendAddRecordTransactional(txID, id, recordType, EncoderPersister.getInstance(), record);
+ }
+
+ void appendAddRecordTransactional(final long txID,
+ final long id,
+ final byte recordType,
+ final Persister persister,
+ final Object record) throws Exception;
void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
- void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+ default void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception {
+ appendUpdateRecordTransactional(txID, id, recordType, EncoderPersister.getInstance(), record);
+ }
+
+ void appendUpdateRecordTransactional(long txID, long id, byte recordType, Persister persister, Object record) throws Exception;
void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception;
@@ -165,8 +204,6 @@ public interface Journal extends ActiveMQComponent {
int getUserVersion();
- void perfBlast(int pages);
-
void runDirectJournalBlast() throws Exception;
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index 8bbecd2..943077c 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
@@ -127,7 +128,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
}
}
- JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, new ByteArrayEncoding(filesToRename.toByteBuffer().array()));
+ JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, EncoderPersister.getInstance(), new ByteArrayEncoding(filesToRename.toByteBuffer().array()));
ActiveMQBuffer renameBuffer = ActiveMQBuffers.dynamicBuffer(filesToRename.writerIndex());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 0b702a5..8e5ca2c 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
@@ -90,10 +91,11 @@ public final class FileWrapperJournal extends JournalBase {
@Override
public void appendAddRecord(long id,
byte recordType,
- EncodingSupport record,
+ Persister persister,
+ Object record,
boolean sync,
IOCompletion callback) throws Exception {
- JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
+ JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
writeRecord(addRecord, sync, callback);
}
@@ -144,19 +146,21 @@ public final class FileWrapperJournal extends JournalBase {
public void appendAddRecordTransactional(long txID,
long id,
byte recordType,
- EncodingSupport record) throws Exception {
+ Persister persister,
+ Object record) throws Exception {
count(txID);
- JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+ JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
writeRecord(addRecord, false, null);
}
@Override
public void appendUpdateRecord(long id,
byte recordType,
- EncodingSupport record,
+ Persister persister,
+ Object record,
boolean sync,
IOCompletion callback) throws Exception {
- JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
+ JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
writeRecord(updateRecord, sync, callback);
}
@@ -164,9 +168,10 @@ public final class FileWrapperJournal extends JournalBase {
public void appendUpdateRecordTransactional(long txID,
long id,
byte recordType,
- EncodingSupport record) throws Exception {
+ Persister persister,
+ Object record) throws Exception {
count(txID);
- JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
+ JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, persister, record);
writeRecord(updateRecordTX, false, null);
}
@@ -261,11 +266,6 @@ public final class FileWrapperJournal extends JournalBase {
}
@Override
- public void perfBlast(int pages) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void runDirectJournalBlast() throws Exception {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
index e2ca84d..e6bd99e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
@@ -21,6 +21,7 @@ import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
abstract class JournalBase implements Journal {
@@ -37,68 +38,15 @@ abstract class JournalBase implements Journal {
}
@Override
- public abstract void appendAddRecord(final long id,
- final byte recordType,
- final EncodingSupport record,
- final boolean sync,
- final IOCompletion callback) throws Exception;
-
- @Override
- public abstract void appendAddRecordTransactional(final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record) throws Exception;
-
- @Override
- public abstract void appendCommitRecord(final long txID,
- final boolean sync,
- final IOCompletion callback,
- boolean lineUpContext) throws Exception;
-
- @Override
- public abstract void appendDeleteRecord(final long id,
- final boolean sync,
- final IOCompletion callback) throws Exception;
-
- @Override
- public abstract void appendDeleteRecordTransactional(final long txID,
- final long id,
- final EncodingSupport record) throws Exception;
-
- @Override
- public abstract void appendPrepareRecord(final long txID,
- final EncodingSupport transactionData,
- final boolean sync,
- final IOCompletion callback) throws Exception;
-
- @Override
- public abstract void appendUpdateRecord(final long id,
- final byte recordType,
- final EncodingSupport record,
- final boolean sync,
- final IOCompletion callback) throws Exception;
-
- @Override
- public abstract void appendUpdateRecordTransactional(final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record) throws Exception;
-
- @Override
- public abstract void appendRollbackRecord(final long txID,
- final boolean sync,
- final IOCompletion callback) throws Exception;
-
- @Override
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
@Override
- public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+ public void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
SyncIOCompletion callback = getSyncCallback(sync);
- appendAddRecord(id, recordType, record, sync, callback);
+ appendAddRecord(id, recordType, persister, record, sync, callback);
if (callback != null) {
callback.waitCompletion();
@@ -176,11 +124,12 @@ abstract class JournalBase implements Journal {
@Override
public void appendUpdateRecord(final long id,
final byte recordType,
- final EncodingSupport record,
+ final Persister persister,
+ final Object record,
final boolean sync) throws Exception {
SyncIOCompletion callback = getSyncCallback(sync);
- appendUpdateRecord(id, recordType, record, sync, callback);
+ appendUpdateRecord(id, recordType, persister, record, sync, callback);
if (callback != null) {
callback.waitCompletion();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
index b95d641..c62b27b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
@@ -252,7 +253,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override
public void onReadAddRecord(final RecordInfo info) throws Exception {
if (lookupRecord(info.id)) {
- JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data));
+ JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
addRecord.setCompactCount((short) (info.compactCount + 1));
checkSize(addRecord.getEncodeSize(), info.compactCount);
@@ -268,7 +269,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data));
+ JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), EncoderPersister.getInstance(),new ByteArrayEncoding(info.data));
record.setCompactCount((short) (info.compactCount + 1));
@@ -374,7 +375,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override
public void onReadUpdateRecord(final RecordInfo info) throws Exception {
if (lookupRecord(info.id)) {
- JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, new ByteArrayEncoding(info.data));
+ JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
updateRecord.setCompactCount((short) (info.compactCount + 1));
@@ -397,7 +398,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, new ByteArrayEncoding(info.data));
+ JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
updateRecordTX.setCompactCount((short) (info.compactCount + 1));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index db615f8..24bb916 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -57,11 +57,11 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TestableJournal;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
-import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecordTX;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalCompleteRecordTX;
@@ -713,7 +713,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
@Override
public void appendAddRecord(final long id,
final byte recordType,
- final EncodingSupport record,
+ final Persister persister,
+ final Object record,
final boolean sync,
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
@@ -727,7 +728,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void run() {
journalLock.readLock().lock();
try {
- JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
+ JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
@@ -762,7 +763,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
@Override
public void appendUpdateRecord(final long id,
final byte recordType,
- final EncodingSupport record,
+ final Persister persister,
+ final Object record,
final boolean sync,
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
@@ -777,7 +779,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
journalLock.readLock().lock();
try {
JournalRecord jrnRecord = records.get(id);
- JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
+ JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
if (logger.isTraceEnabled()) {
@@ -873,7 +875,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void appendAddRecordTransactional(final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception {
+ final Persister persister,
+ final Object record) throws Exception {
checkJournalIsLoaded();
final JournalTransaction tx = getTransactionInfo(txID);
@@ -885,7 +888,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void run() {
journalLock.readLock().lock();
try {
- JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+ JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
if (logger.isTraceEnabled()) {
@@ -952,7 +955,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception {
+ final Persister persister,
+ final Object record) throws Exception {
checkJournalIsLoaded();
final JournalTransaction tx = getTransactionInfo(txID);
@@ -965,7 +969,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
journalLock.readLock().lock();
try {
- JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, record );
+ JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, persister, record );
JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null );
if ( logger.isTraceEnabled() ) {
@@ -2165,45 +2169,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
- @Override
- public void perfBlast(final int pages) {
-
- checkJournalIsLoaded();
-
- final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
-
- final JournalInternalRecord blastRecord = new JournalInternalRecord() {
-
- @Override
- public int getEncodeSize() {
- return byteEncoder.getEncodeSize();
- }
-
- @Override
- public void encode(final ActiveMQBuffer buffer) {
- byteEncoder.encode(buffer);
- }
- };
-
- appendExecutor.execute(new Runnable() {
- @Override
- public void run() {
- journalLock.readLock().lock();
- try {
-
- for (int i = 0; i < pages; i++) {
- appendRecord(blastRecord, false, false, null, null);
- }
-
- } catch (Exception e) {
- ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e);
- } finally {
- journalLock.readLock().unlock();
- }
- }
- });
- }
-
// ActiveMQComponent implementation
// ---------------------------------------------------
@@ -2921,5 +2886,4 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public int getCompactCount() {
return compactCount;
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
index c6a5d4a..6e5b651 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
@@ -17,14 +17,16 @@
package org.apache.activemq.artemis.core.journal.impl.dataformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
public class JournalAddRecord extends JournalInternalRecord {
protected final long id;
- protected final EncodingSupport record;
+ protected final Persister persister;
+
+ protected final Object record;
protected final byte recordType;
@@ -35,7 +37,7 @@ public class JournalAddRecord extends JournalInternalRecord {
* @param recordType
* @param record
*/
- public JournalAddRecord(final boolean add, final long id, final byte recordType, final EncodingSupport record) {
+ public JournalAddRecord(final boolean add, final long id, final byte recordType, final Persister persister, Object record) {
this.id = id;
this.record = record;
@@ -43,6 +45,8 @@ public class JournalAddRecord extends JournalInternalRecord {
this.recordType = recordType;
this.add = add;
+
+ this.persister = persister;
}
@Override
@@ -59,17 +63,19 @@ public class JournalAddRecord extends JournalInternalRecord {
buffer.writeLong(id);
- buffer.writeInt(record.getEncodeSize());
+ int recordEncodeSize = persister.getEncodeSize(record);
+
+ buffer.writeInt(persister.getEncodeSize(record));
buffer.writeByte(recordType);
- record.encode(buffer);
+ persister.encode(buffer, record);
- buffer.writeInt(getEncodeSize());
+ buffer.writeInt(recordEncodeSize + JournalImpl.SIZE_ADD_RECORD + 1);
}
@Override
public int getEncodeSize() {
- return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize() + 1;
+ return JournalImpl.SIZE_ADD_RECORD + persister.getEncodeSize(record) + 1;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
index 6cec122..483418f 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
@@ -17,7 +17,7 @@
package org.apache.activemq.artemis.core.journal.impl.dataformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
public class JournalAddRecordTX extends JournalInternalRecord {
@@ -26,7 +26,9 @@ public class JournalAddRecordTX extends JournalInternalRecord {
private final long id;
- private final EncodingSupport record;
+ protected final Persister persister;
+
+ protected final Object record;
private final byte recordType;
@@ -41,12 +43,15 @@ public class JournalAddRecordTX extends JournalInternalRecord {
final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) {
+ final Persister persister,
+ Object record) {
this.txID = txID;
this.id = id;
+ this.persister = persister;
+
this.record = record;
this.recordType = recordType;
@@ -70,17 +75,17 @@ public class JournalAddRecordTX extends JournalInternalRecord {
buffer.writeLong(id);
- buffer.writeInt(record.getEncodeSize());
+ buffer.writeInt(persister.getEncodeSize(record));
buffer.writeByte(recordType);
- record.encode(buffer);
+ persister.encode(buffer, record);
buffer.writeInt(getEncodeSize());
}
@Override
public int getEncodeSize() {
- return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize() + 1;
+ return JournalImpl.SIZE_ADD_RECORD_TX + persister.getEncodeSize(record) + 1;
}
}