You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/03 01:05:06 UTC
[13/36] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index d09f62f..2295987 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -25,9 +25,11 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
@@ -81,9 +83,9 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.jboss.logging.Logger;
@@ -137,13 +139,23 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private volatile CoreRemotingConnection remotingConnection;
+ private final CoreProtocolManager manager;
+
+ // The current currentLargeMessage being processed
+ private volatile LargeServerMessage currentLargeMessage;
+
private final boolean direct;
- public ServerSessionPacketHandler(final ServerSession session,
+ public ServerSessionPacketHandler(final CoreProtocolManager manager,
+ final ServerSession session,
final StorageManager storageManager,
final Channel channel) {
+ this.manager = manager;
+
this.session = session;
+ session.addCloseable((boolean failed) -> clearLargeMessage());
+
this.storageManager = storageManager;
this.channel = channel;
@@ -159,6 +171,16 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
}
+ private void clearLargeMessage() {
+ if (currentLargeMessage != null) {
+ try {
+ currentLargeMessage.deleteFile();
+ } catch (Throwable error) {
+ ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
+ }
+ }
+ }
+
public ServerSession getSession() {
return session;
}
@@ -469,7 +491,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_SEND: {
SessionSendMessage message = (SessionSendMessage) packet;
requiresResponse = message.isRequiresResponse();
- session.send((ServerMessage) message.getMessage(), direct);
+ message.getMessage().setProtocol(manager);
+ session.send(message.getMessage(), direct);
if (requiresResponse) {
response = new NullResponseMessage();
}
@@ -477,13 +500,13 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
case SESS_SEND_LARGE: {
SessionSendLargeMessage message = (SessionSendLargeMessage) packet;
- session.sendLarge(message.getLargeMessage());
+ sendLarge(message.getLargeMessage());
break;
}
case SESS_SEND_CONTINUATION: {
SessionSendContinuationMessage message = (SessionSendContinuationMessage) packet;
requiresResponse = message.isRequiresResponse();
- session.sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
+ sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
if (requiresResponse) {
response = new NullResponseMessage();
}
@@ -681,4 +704,53 @@ public class ServerSessionPacketHandler implements ChannelHandler {
return serverLastReceivedCommandID;
}
+
+ // Large Message is part of the core protocol, we have these functions here as part of Packet handler
+ private void sendLarge(final Message message) throws Exception {
+ // need to create the LargeMessage before continue
+ long id = storageManager.generateID();
+
+ LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("sendLarge::" + largeMsg);
+ }
+
+ if (currentLargeMessage != null) {
+ ActiveMQServerLogger.LOGGER.replacingIncompleteLargeMessage(currentLargeMessage.getMessageID());
+ }
+
+ currentLargeMessage = largeMsg;
+ }
+
+
+
+ private void sendContinuations(final int packetSize,
+ final long messageBodySize,
+ final byte[] body,
+ final boolean continues) throws Exception {
+ if (currentLargeMessage == null) {
+ throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised();
+ }
+
+ // Immediately release the credits for the continuations- these don't contribute to the in-memory size
+ // of the message
+
+ currentLargeMessage.addBytes(body);
+
+ if (!continues) {
+ currentLargeMessage.releaseResources();
+
+ if (messageBodySize >= 0) {
+ currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
+ }
+
+
+ session.doSend(session.getCurrentTransaction(), currentLargeMessage, false, false);
+
+ currentLargeMessage = null;
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index e6595a5..919d84e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -168,7 +168,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap);
- ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel);
+ ServerSessionPacketHandler handler = new ServerSessionPacketHandler(protocolManager, session, server.getStorageManager(), channel);
channel.setHandler(handler);
// TODO - where is this removed?
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
index 7fed534..7560917 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
@@ -21,7 +21,10 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -29,10 +32,21 @@ import org.apache.activemq.artemis.utils.uri.BeanSupport;
public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
+ public static final byte ID = 1;
private static String[] SUPPORTED_PROTOCOLS = {ActiveMQClient.DEFAULT_CORE_PROTOCOL};
private static final String MODULE_NAME = "artemis-server";
+ @Override
+ public byte getStoreID() {
+ return ID;
+ }
+
+ @Override
+ public Persister<Message> getPersister() {
+ return CoreMessagePersister.getInstance();
+ }
+
/**
* {@inheritDoc} *
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index a6c73eb..3a09e91 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.Packet;
@@ -28,7 +29,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -66,7 +66,7 @@ public final class CoreSessionCallback implements SessionCallback {
@Override
public int sendLargeMessage(MessageReference ref,
- ServerMessage message,
+ Message message,
ServerConsumer consumer,
long bodySize,
int deliveryCount) {
@@ -92,7 +92,9 @@ public final class CoreSessionCallback implements SessionCallback {
}
@Override
- public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+ public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
+
+ // TODO-now: fix this
Packet packet = new SessionReceiveMessage(consumer.getID(), message, deliveryCount);
int size = 0;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
index 89d2863..8d22fab 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import java.util.Arrays;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
@@ -36,7 +36,9 @@ public final class ReplicationAddMessage extends PacketImpl {
private byte journalRecordType;
- private EncodingSupport encodingData;
+ private Persister persister;
+
+ private Object encodingData;
private byte[] recordData;
@@ -48,12 +50,14 @@ public final class ReplicationAddMessage extends PacketImpl {
final ADD_OPERATION_TYPE operation,
final long id,
final byte journalRecordType,
- final EncodingSupport encodingData) {
+ final Persister persister,
+ final Object encodingData) {
this();
this.journalID = journalID;
this.operation = operation;
this.id = id;
this.journalRecordType = journalRecordType;
+ this.persister = persister;
this.encodingData = encodingData;
}
@@ -66,8 +70,8 @@ public final class ReplicationAddMessage extends PacketImpl {
buffer.writeBoolean(operation.toBoolean());
buffer.writeLong(id);
buffer.writeByte(journalRecordType);
- buffer.writeInt(encodingData.getEncodeSize());
- encodingData.encode(buffer);
+ buffer.writeInt(persister.getEncodeSize(encodingData));
+ persister.encode(buffer, encodingData);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
index 59475e0..925181b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import java.util.Arrays;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
@@ -36,7 +36,9 @@ public class ReplicationAddTXMessage extends PacketImpl {
private byte recordType;
- private EncodingSupport encodingData;
+ private Persister persister;
+
+ private Object encodingData;
private byte[] recordData;
@@ -51,7 +53,8 @@ public class ReplicationAddTXMessage extends PacketImpl {
final long txId,
final long id,
final byte recordType,
- final EncodingSupport encodingData) {
+ final Persister persister,
+ final Object encodingData) {
this();
this.journalID = journalID;
this.operation = operation;
@@ -70,8 +73,8 @@ public class ReplicationAddTXMessage extends PacketImpl {
buffer.writeLong(txId);
buffer.writeLong(id);
buffer.writeByte(recordType);
- buffer.writeInt(encodingData.getEncodeSize());
- encodingData.encode(buffer);
+ buffer.writeInt(persister.getEncodeSize(encodingData));
+ persister.encode(buffer, encodingData);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
index 7307151..b88e0fe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
@@ -48,7 +48,7 @@ public class ReplicationPageWriteMessage extends PacketImpl {
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
pageNumber = buffer.readInt();
- pagedMessage = new PagedMessageImpl();
+ pagedMessage = new PagedMessageImpl(null);
pagedMessage.decode(buffer);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
index ea3107c..c5318e7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
@@ -17,12 +17,14 @@
package org.apache.activemq.artemis.core.remoting.server;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.utils.ReusableLatch;
@@ -65,6 +67,8 @@ public interface RemotingService {
boolean isStarted();
+ Map<String, ProtocolManagerFactory> getProtocolFactoryMap();
+
/**
* Allow acceptors to use this as their default security Principal if applicable.
* <p>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 50bc90d..3e15f3e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -59,6 +59,7 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -147,7 +148,9 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
this.scheduledThreadPool = scheduledThreadPool;
CoreProtocolManagerFactory coreProtocolManagerFactory = new CoreProtocolManagerFactory();
- //i know there is only 1
+
+ MessagePersister.getInstance().registerProtocol(coreProtocolManagerFactory);
+
this.flushExecutor = flushExecutor;
ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.getModuleName());
@@ -174,6 +177,11 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
}
@Override
+ public Map<String, ProtocolManagerFactory> getProtocolFactoryMap() {
+ return protocolMap;
+ }
+
+ @Override
public synchronized void start() throws Exception {
if (started) {
return;
@@ -768,6 +776,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
*/
private void loadProtocolManagerFactories(Iterable<ProtocolManagerFactory> protocolManagerFactoryCollection) {
for (ProtocolManagerFactory next : protocolManagerFactoryCollection) {
+ MessagePersister.registerProtocol(next);
String[] protocols = next.getProtocols();
for (String protocol : protocols) {
ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol, next.getModuleName());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
index d70316f..0731e8c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
@@ -88,13 +89,14 @@ public class ReplicatedJournal implements Journal {
@Override
public void appendAddRecord(final long id,
final byte recordType,
- final EncodingSupport record,
+ Persister persister,
+ final Object record,
final boolean sync) throws Exception {
if (ReplicatedJournal.trace) {
ReplicatedJournal.trace("Append record id = " + id + " recordType = " + recordType);
}
- replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, record);
- localJournal.appendAddRecord(id, recordType, record, sync);
+ replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, persister, record);
+ localJournal.appendAddRecord(id, recordType, persister, record, sync);
}
/**
@@ -108,14 +110,15 @@ public class ReplicatedJournal implements Journal {
@Override
public void appendAddRecord(final long id,
final byte recordType,
- final EncodingSupport record,
+ Persister persister,
+ final Object record,
final boolean sync,
final IOCompletion completionCallback) throws Exception {
if (ReplicatedJournal.trace) {
ReplicatedJournal.trace("Append record id = " + id + " recordType = " + recordType);
}
- replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, record);
- localJournal.appendAddRecord(id, recordType, record, sync, completionCallback);
+ replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, persister, record);
+ localJournal.appendAddRecord(id, recordType, persister, record, sync, completionCallback);
}
/**
@@ -146,12 +149,13 @@ public class ReplicatedJournal implements Journal {
public void appendAddRecordTransactional(final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception {
+ final Persister persister,
+ final Object record) throws Exception {
if (ReplicatedJournal.trace) {
ReplicatedJournal.trace("Append record TXid = " + id + " recordType = " + recordType);
}
- replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.ADD, txID, id, recordType, record);
- localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.ADD, txID, id, recordType, persister, record);
+ localJournal.appendAddRecordTransactional(txID, id, recordType, persister, record);
}
/**
@@ -354,26 +358,28 @@ public class ReplicatedJournal implements Journal {
@Override
public void appendUpdateRecord(final long id,
final byte recordType,
- final EncodingSupport record,
+ final Persister persister,
+ final Object record,
final boolean sync) throws Exception {
if (ReplicatedJournal.trace) {
ReplicatedJournal.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
}
- replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, record);
- localJournal.appendUpdateRecord(id, recordType, record, sync);
+ replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
+ localJournal.appendUpdateRecord(id, recordType, persister, record, sync);
}
@Override
public void appendUpdateRecord(final long id,
final byte journalRecordType,
- final EncodingSupport record,
+ final Persister persister,
+ final Object record,
final boolean sync,
final IOCompletion completionCallback) throws Exception {
if (ReplicatedJournal.trace) {
ReplicatedJournal.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType);
}
- replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, record);
- localJournal.appendUpdateRecord(id, journalRecordType, record, sync, completionCallback);
+ replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
+ localJournal.appendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback);
}
/**
@@ -404,12 +410,13 @@ public class ReplicatedJournal implements Journal {
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception {
+ final Persister persister,
+ final Object record) throws Exception {
if (ReplicatedJournal.trace) {
ReplicatedJournal.trace("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
}
- replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, record);
- localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, persister, record);
+ localJournal.appendUpdateRecordTransactional(txID, id, recordType, persister, record);
}
/**
@@ -437,15 +444,6 @@ public class ReplicatedJournal implements Journal {
}
/**
- * @param pages
- * @see org.apache.activemq.artemis.core.journal.Journal#perfBlast(int)
- */
- @Override
- public void perfBlast(final int pages) {
- localJournal.perfBlast(pages);
- }
-
- /**
* @throws Exception
* @see org.apache.activemq.artemis.core.server.ActiveMQComponent#start()
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 1a07adc..e82d38e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
@@ -76,7 +77,7 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERA
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
@@ -651,8 +652,8 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
private void handlePageWrite(final ReplicationPageWriteMessage packet) throws Exception {
PagedMessage pgdMessage = packet.getPagedMessage();
pgdMessage.initMessage(storageManager);
- ServerMessage msg = pgdMessage.getMessage();
- Page page = getPage(msg.getAddress(), packet.getPageNumber());
+ Message msg = pgdMessage.getMessage();
+ Page page = getPage(msg.getAddressSimpleString(), packet.getPageNumber());
page.write(pgdMessage);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index d0468d1..dce5990 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -147,9 +148,10 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
final ADD_OPERATION_TYPE operation,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception {
+ final Persister persister,
+ final Object record) throws Exception {
if (enabled) {
- sendReplicatePacket(new ReplicationAddMessage(journalID, operation, id, recordType, record));
+ sendReplicatePacket(new ReplicationAddMessage(journalID, operation, id, recordType, persister, record));
}
}
@@ -164,9 +166,10 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception {
+ final Persister persister,
+ final Object record) throws Exception {
if (enabled) {
- sendReplicatePacket(new ReplicationAddTXMessage(journalID, operation, txID, id, recordType, record));
+ sendReplicatePacket(new ReplicationAddTXMessage(journalID, operation, txID, id, recordType, persister, record));
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 6ee844b..e72d5d9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutorService;
import io.netty.channel.Channel;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
@@ -577,12 +578,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
void ioErrorAddingReferences(Integer errorCode, String errorMessage);
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 222058, value = "Duplicate message detected through the bridge - message will not be routed. Message information:\n{0}", format = Message.Format.MESSAGE_FORMAT)
- void duplicateMessageDetectedThruBridge(ServerMessage message);
-
- @LogMessage(level = Logger.Level.WARN)
@Message(id = 222059, value = "Duplicate message detected - message will not be routed. Message information:\n{0}", format = Message.Format.MESSAGE_FORMAT)
- void duplicateMessageDetected(ServerMessage message);
+ void duplicateMessageDetected(org.apache.activemq.artemis.api.core.Message message);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222060, value = "Error while confirming large message completion on rollback for recordID={0}", format = Message.Format.MESSAGE_FORMAT)
@@ -783,7 +780,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222110, value = "no queue IDs defined!, originalMessage = {0}, copiedMessage = {1}, props={2}",
format = Message.Format.MESSAGE_FORMAT)
- void noQueueIdDefined(ServerMessage message, ServerMessage messageCopy, SimpleString idsHeaderName);
+ void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message, org.apache.activemq.artemis.api.core.Message messageCopy, SimpleString idsHeaderName);
@LogMessage(level = Logger.Level.TRACE)
@Message(id = 222111, value = "exception while invoking {0} on {1}",
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java
index 0e38634..1ede0ea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java
@@ -16,9 +16,11 @@
*/
package org.apache.activemq.artemis.core.server;
+import org.apache.activemq.artemis.api.core.Message;
+
public interface Bindable {
- void route(ServerMessage message, RoutingContext context) throws Exception;
+ void route(Message message, RoutingContext context) throws Exception;
- void routeWithAck(ServerMessage message, RoutingContext context) throws Exception;
+ void routeWithAck(Message message, RoutingContext context) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
index 2a16ed2..aa58a7d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
@@ -17,10 +17,11 @@
package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage;
-public interface LargeServerMessage extends ServerMessage, ReplicatedLargeMessage {
+public interface LargeServerMessage extends ReplicatedLargeMessage, Message {
@Override
void addBytes(byte[] bytes) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index a1e6a20..799b0b0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -16,7 +16,10 @@
*/
package org.apache.activemq.artemis.core.server;
+
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
/**
@@ -26,9 +29,14 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
*/
public interface MessageReference {
+ final class Factory {
+ public static MessageReference createReference(Message encode, final Queue queue) {
+ return new MessageReferenceImpl(encode, queue);
+ }
+ }
boolean isPaged();
- ServerMessage getMessage();
+ Message getMessage();
/**
* We define this method aggregation here because on paging we need to hold the original estimate,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index ae377bb..d7b70a3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
@@ -197,7 +198,7 @@ public interface Queue extends Bindable {
void cancelRedistributor() throws Exception;
- boolean hasMatchingConsumer(ServerMessage message);
+ boolean hasMatchingConsumer(Message message);
Collection<Consumer> getConsumers();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java
deleted file mode 100644
index 40dc50f..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
-import org.apache.activemq.artemis.core.paging.PagingStore;
-
-/**
- * A ServerMessage
- */
-public interface ServerMessage extends MessageInternal, EncodingSupport {
-
- ServerMessage setMessageID(long id);
-
- MessageReference createReference(Queue queue);
-
- /**
- * This will force encoding of the address, and will re-check the buffer
- * This is to avoid setMessageTransient which set the address without changing the buffer
- *
- * @param address
- */
- void forceAddress(SimpleString address);
-
- int incrementRefCount() throws Exception;
-
- int decrementRefCount() throws Exception;
-
- int incrementDurableRefCount();
-
- int decrementDurableRefCount();
-
- ServerMessage copy(long newID);
-
- ServerMessage copy();
-
- int getMemoryEstimate();
-
- int getRefCount();
-
- ServerMessage makeCopyForExpiryOrDLA(long newID,
- MessageReference originalReference,
- boolean expiry,
- boolean copyOriginalHeaders) throws Exception;
-
- void setOriginalHeaders(ServerMessage other, MessageReference originalReference, boolean expiry);
-
- void setPagingStore(PagingStore store);
-
- PagingStore getPagingStore();
-
- // Is there any _AMQ_ property being used
- boolean hasInternalProperties();
-
- boolean storeIsPaging();
-
- void encodeMessageIDToBuffer();
-
- byte[] getDuplicateIDBytes();
-
- Object getDuplicateProperty();
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index f4e2ec7..1899d65 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -21,10 +21,11 @@ import javax.transaction.xa.Xid;
import java.util.List;
import java.util.Set;
+import org.apache.activemq.artemis.Closeable;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
@@ -99,6 +100,8 @@ public interface ServerSession extends SecurityAuth {
void stop();
+ void addCloseable(Closeable closeable);
+
/**
* To be used by protocol heads that needs to control the transaction outside the session context.
*/
@@ -178,18 +181,19 @@ public interface ServerSession extends SecurityAuth {
void receiveConsumerCredits(long consumerID, int credits) throws Exception;
- void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean continues) throws Exception;
-
RoutingStatus send(Transaction tx,
- ServerMessage message,
+ Message message,
boolean direct,
boolean noAutoCreateQueue) throws Exception;
- RoutingStatus send(ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception;
+ RoutingStatus doSend(final Transaction tx,
+ final Message msg,
+ final boolean direct,
+ final boolean noAutoCreateQueue) throws Exception;
- RoutingStatus send(ServerMessage message, boolean direct) throws Exception;
+ RoutingStatus send(Message message, boolean direct, boolean noAutoCreateQueue) throws Exception;
- void sendLarge(MessageInternal msg) throws Exception;
+ RoutingStatus send(Message message, boolean direct) throws Exception;
void forceConsumerDelivery(long consumerID, long sequence) throws Exception;
@@ -249,7 +253,9 @@ public interface ServerSession extends SecurityAuth {
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
- SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
+ SimpleString getMatchingQueue(SimpleString address,
+ SimpleString queueName,
+ RoutingType routingType) throws Exception;
AddressInfo getAddress(SimpleString address);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java
index 1583f2c..48f4aa9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java
@@ -16,9 +16,9 @@
*/
package org.apache.activemq.artemis.core.server.cluster;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.Message;
public interface Transformer {
- ServerMessage transform(ServerMessage message);
+ Message transform(Message message);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index ee549c5..18a1f38 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -46,14 +46,14 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@@ -499,16 +499,16 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
/* Hook for processing message before forwarding */
- protected ServerMessage beforeForward(final ServerMessage message) {
+ protected Message beforeForward(final Message message) {
if (useDuplicateDetection) {
// We keep our own DuplicateID for the Bridge, so bouncing back and forth will work fine
byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
- message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
+ message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, bytes);
}
if (transformer != null) {
- final ServerMessage transformedMessage = transformer.transform(message);
+ final Message transformedMessage = transformer.transform(message);
if (transformedMessage != message) {
if (logger.isDebugEnabled()) {
logger.debug("The transformer " + transformer +
@@ -556,7 +556,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
refs.put(ref.getMessage().getMessageID(), ref);
}
- final ServerMessage message = beforeForward(ref.getMessage());
+ final Message message = beforeForward(ref.getMessage());
final SimpleString dest;
@@ -564,7 +564,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
dest = forwardingAddress;
} else {
// Preserve the original address
- dest = message.getAddress();
+ dest = message.getAddressSimpleString();
}
pendingAcks.countUp();
@@ -686,7 +686,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
* @param message
* @return
*/
- private HandleStatus deliverStandardMessage(SimpleString dest, final MessageReference ref, ServerMessage message) {
+ private HandleStatus deliverStandardMessage(SimpleString dest, final MessageReference ref, Message message) {
// if we failover during send then there is a chance that the
// that this will throw a disconnect, we need to remove the message
// from the acks so it will get resent, duplicate detection will cope
@@ -697,6 +697,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
try {
+ // TODO-now: replace this
producer.send(dest, message);
} catch (final ActiveMQException e) {
ActiveMQServerLogger.LOGGER.bridgeUnableToSendMessage(e, ref);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index f16d863..524bb08 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -36,12 +37,11 @@ import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
@@ -113,7 +113,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
this.discoveryLocator = discoveryLocator;
- idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
+ idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(name);
this.clusterConnection = clusterConnection;
@@ -150,13 +150,13 @@ public class ClusterConnectionBridge extends BridgeImpl {
}
@Override
- protected ServerMessage beforeForward(final ServerMessage message) {
+ protected Message beforeForward(final Message message) {
// We make a copy of the message, then we strip out the unwanted routing id headers and leave
// only
// the one pertinent for the address node - this is important since different queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require different headers
- ServerMessage messageCopy = message.copy();
+ Message messageCopy = message.copy();
if (logger.isTraceEnabled()) {
logger.trace("Clustered bridge copied message " + message + " as " + messageCopy + " before delivery");
@@ -175,12 +175,12 @@ public class ClusterConnectionBridge extends BridgeImpl {
}
for (SimpleString propName : propNames) {
- if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) {
+ if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
messageCopy.removeProperty(propName);
}
}
- messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+ messageCopy.putBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
messageCopy = super.beforeForward(messageCopy);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index c585405..e9477a8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ReusableLatch;
@@ -139,7 +138,7 @@ public class Redistributor implements Consumer {
final Transaction tx = new TransactionImpl(storageManager);
- final Pair<RoutingContext, ServerMessage> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);
+ final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);
if (routingInfo == null) {
return HandleStatus.BUSY;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
index 8f54b2a..9803433 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
@@ -23,15 +23,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.jboss.logging.Logger;
@@ -88,7 +88,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
queueFilter = FilterImpl.createFilter(filterString);
- idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(bridgeName);
+ idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(bridgeName);
this.distance = distance;
}
@@ -149,7 +149,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
}
@Override
- public synchronized boolean isHighAcceptPriority(final ServerMessage message) {
+ public synchronized boolean isHighAcceptPriority(final Message message) {
if (consumerCount == 0) {
return false;
}
@@ -172,7 +172,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
}
@Override
- public void route(final ServerMessage message, final RoutingContext context) {
+ public void route(final Message message, final RoutingContext context) {
addRouteContextToMessage(message);
List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress());
@@ -185,7 +185,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
}
@Override
- public void routeWithAck(ServerMessage message, RoutingContext context) {
+ public void routeWithAck(Message message, RoutingContext context) {
addRouteContextToMessage(message);
List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress());
@@ -315,7 +315,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
*
* @param message
*/
- private void addRouteContextToMessage(final ServerMessage message) {
+ private void addRouteContextToMessage(final Message message) {
byte[] ids = message.getBytesProperty(idsHeaderName);
if (ids == null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index aa1ebf3..38500b7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2622,6 +2622,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public void addProtocolManagerFactory(ProtocolManagerFactory factory) {
protocolManagerFactories.add(factory);
+ new Exception("protocol....").printStackTrace();
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 619036d..bd3f303 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.Message;
+
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -25,7 +26,6 @@ import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.jboss.logging.Logger;
@@ -83,7 +83,7 @@ public class DivertImpl implements Divert {
}
@Override
- public void route(final ServerMessage message, final RoutingContext context) throws Exception {
+ public void route(final Message message, final RoutingContext context) throws Exception {
// We must make a copy of the message, otherwise things like returning credits to the page won't work
// properly on ack, since the original address will be overwritten
@@ -91,7 +91,7 @@ public class DivertImpl implements Divert {
logger.trace("Diverting message " + message + " into " + this);
}
- ServerMessage copy = null;
+ Message copy = null;
// Shouldn't copy if it's not routed anywhere else
if (!forwardAddress.equals(context.getAddress())) {
@@ -99,7 +99,7 @@ public class DivertImpl implements Divert {
copy = message.copy(id);
// This will set the original MessageId, and the original address
- copy.setOriginalHeaders(message, null, false);
+ copy.referenceOriginalMessage(message, null);
copy.setAddress(forwardAddress);
@@ -130,7 +130,7 @@ public class DivertImpl implements Divert {
}
@Override
- public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception {
+ public void routeWithAck(Message message, RoutingContext context) throws Exception {
route(message, context);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
index 40cef50..4adb1b2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
@@ -20,6 +20,7 @@ import javax.transaction.xa.Xid;
import java.util.List;
import java.util.Map;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.Journal;
@@ -29,7 +30,6 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.persistence.impl.journal.AddMessageRecord;
import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -43,7 +43,7 @@ public interface JournalLoader {
void handleAddMessage(Map<Long, Map<Long, AddMessageRecord>> queueMap) throws Exception;
- void handleNoMessageReferences(Map<Long, ServerMessage> messages);
+ void handleNoMessageReferences(Map<Long, Message> messages);
void handleGroupingBindings(List<GroupingInfo> groupingInfos);
@@ -53,7 +53,7 @@ public interface JournalLoader {
ResourceManager resourceManager,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception;
- void handlePreparedSendMessage(ServerMessage message, Transaction tx, long queueID) throws Exception;
+ void handlePreparedSendMessage(Message message, Transaction tx, long queueID) throws Exception;
void handlePreparedAcknowledge(long messageID,
List<MessageReference> referencesToAck,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index eb467ae..ab6ab62 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.Message;
+
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
@@ -31,7 +32,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -74,7 +74,7 @@ public class LastValueQueue extends QueueImpl {
return;
}
- SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+ SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME.toString());
if (prop != null) {
HolderReference hr = map.get(prop);
@@ -98,7 +98,7 @@ public class LastValueQueue extends QueueImpl {
@Override
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
- SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+ SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME.toString());
if (prop != null) {
HolderReference hr = map.get(prop);
@@ -148,7 +148,7 @@ public class LastValueQueue extends QueueImpl {
@Override
protected void refRemoved(MessageReference ref) {
synchronized (this) {
- SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+ SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME.toString());
if (prop != null) {
map.remove(prop);
@@ -223,7 +223,7 @@ public class LastValueQueue extends QueueImpl {
}
@Override
- public ServerMessage getMessage() {
+ public Message getMessage() {
return ref.getMessage();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 6d9030e..bffb1ad 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -18,11 +18,10 @@ package org.apache.activemq.artemis.core.server.impl;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.utils.MemorySize;
/**
* Implementation of a MessageReference
@@ -35,7 +34,7 @@ public class MessageReferenceImpl implements MessageReference {
private volatile long scheduledDeliveryTime;
- private final ServerMessage message;
+ private final Message message;
private final Queue queue;
@@ -47,20 +46,7 @@ public class MessageReferenceImpl implements MessageReference {
// Static --------------------------------------------------------
- private static final int memoryOffset;
-
- static {
- // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties
- // Note, it is only an estimate, it's not possible to be entirely sure with Java
- // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof
- // The value is somewhat higher on 64 bit architectures, probably due to different alignment
-
- if (MemorySize.is64bitArch()) {
- memoryOffset = 48;
- } else {
- memoryOffset = 32;
- }
- }
+ private static final int memoryOffset = 64;
// Constructors --------------------------------------------------
@@ -80,7 +66,7 @@ public class MessageReferenceImpl implements MessageReference {
this.queue = queue;
}
- protected MessageReferenceImpl(final ServerMessage message, final Queue queue) {
+ public MessageReferenceImpl(final Message message, final Queue queue) {
this.message = message;
this.queue = queue;
@@ -155,7 +141,7 @@ public class MessageReferenceImpl implements MessageReference {
}
@Override
- public ServerMessage getMessage() {
+ public Message getMessage() {
return message;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 005a994..717e2e2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -57,7 +57,6 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -233,8 +232,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
}
@Override
- public void handleNoMessageReferences(Map<Long, ServerMessage> messages) {
- for (ServerMessage msg : messages.values()) {
+ public void handleNoMessageReferences(Map<Long, Message> messages) {
+ for (Message msg : messages.values()) {
if (msg.getRefCount() == 0) {
ActiveMQServerLogger.LOGGER.journalUnreferencedMessage(msg.getMessageID());
try {
@@ -284,7 +283,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
}
@Override
- public void handlePreparedSendMessage(ServerMessage message, Transaction tx, long queueID) throws Exception {
+ public void handlePreparedSendMessage(Message message, Transaction tx, long queueID) throws Exception {
Queue queue = queues.get(queueID);
if (queue == null) {