You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/05 16:50:06 UTC
[05/17] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 2f6ae3d..464859f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -22,7 +22,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -38,6 +37,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
@@ -45,7 +45,7 @@ import org.apache.activemq.artemis.api.core.management.NotificationType;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -69,12 +69,9 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
@@ -665,20 +662,20 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
@Override
- public RoutingStatus route(final ServerMessage message,
+ public RoutingStatus route(final Message message,
final boolean direct) throws Exception {
return route(message, (Transaction) null, direct);
}
@Override
- public RoutingStatus route(final ServerMessage message,
+ public RoutingStatus route(final Message message,
final Transaction tx,
final boolean direct) throws Exception {
return route(message, new RoutingContextImpl(tx), direct);
}
@Override
- public RoutingStatus route(final ServerMessage message,
+ public RoutingStatus route(final Message message,
final Transaction tx,
final boolean direct,
final boolean rejectDuplicates) throws Exception {
@@ -686,14 +683,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
@Override
- public RoutingStatus route(final ServerMessage message,
+ public RoutingStatus route(final Message message,
final RoutingContext context,
final boolean direct) throws Exception {
return route(message, context, direct, true);
}
@Override
- public RoutingStatus route(final ServerMessage message,
+ public RoutingStatus route(final Message message,
final RoutingContext context,
final boolean direct,
boolean rejectDuplicates) throws Exception {
@@ -708,7 +705,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
AtomicBoolean startedTX = new AtomicBoolean(false);
- final SimpleString address = message.getAddress();
+ final SimpleString address = message.getAddressSimpleString();
applyExpiryDelay(message, address);
@@ -716,13 +713,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return RoutingStatus.DUPLICATED_ID;
}
- if (message.hasInternalProperties()) {
- // We need to perform some cleanup on internal properties,
- // but we don't do it every time, otherwise it wouldn't be optimal
- cleanupInternalPropertiesBeforeRouting(message);
- }
+ message.cleanupInternalProperties();
- Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress() == null ? message.getAddress() : context.getAddress());
+ Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress() == null ? message.getAddressSimpleString() : context.getAddress());
// TODO auto-create queues here?
// first check for the auto-queue creation thing
@@ -768,7 +761,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
result = RoutingStatus.NO_BINDINGS;
ActiveMQServerLogger.LOGGER.noDLA(address);
} else {
- message.setOriginalHeaders(message, null, false);
+ message.referenceOriginalMessage(message, null);
message.setAddress(dlaAddress);
@@ -806,7 +799,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
// HORNETQ-1029
- private void applyExpiryDelay(ServerMessage message, SimpleString address) {
+ private void applyExpiryDelay(Message message, SimpleString address) {
long expirationOverride = addressSettingsRepository.getMatch(address.toString()).getExpiryDelay();
// A -1 <expiry-delay> means don't do anything
@@ -819,12 +812,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
@Override
- public MessageReference reroute(final ServerMessage message,
+ public MessageReference reroute(final Message message,
final Queue queue,
final Transaction tx) throws Exception {
+
setPagingStore(message);
- MessageReference reference = message.createReference(queue);
+ MessageReference reference = MessageReference.Factory.createReference(message, queue);
if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
@@ -852,15 +846,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
* The redistribution can't process the route right away as we may be dealing with a large message which will need to be processed on a different thread
*/
@Override
- public Pair<RoutingContext, ServerMessage> redistribute(final ServerMessage message,
+ public Pair<RoutingContext, Message> redistribute(final Message message,
final Queue originatingQueue,
final Transaction tx) throws Exception {
// We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
// arrived the target node
// as described on https://issues.jboss.org/browse/JBPAPP-6130
- ServerMessage copyRedistribute = message.copy(storageManager.generateID());
+ Message copyRedistribute = message.copy(storageManager.generateID());
- Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddress());
+ Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString());
if (bindings != null) {
RoutingContext context = new RoutingContextImpl(tx);
@@ -937,7 +931,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
synchronized (notificationLock) {
// First send a reset message
- ServerMessage message = new ServerMessageImpl(storageManager.generateID(), 50);
+ Message message = new CoreMessage(storageManager.generateID(), 50);
message.setAddress(queueName);
message.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA, true);
@@ -987,7 +981,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
}
- ServerMessage completeMessage = new ServerMessageImpl(storageManager.generateID(), 50);
+ Message completeMessage = new CoreMessage(storageManager.generateID(), 50);
completeMessage.setAddress(queueName);
completeMessage.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA_COMPLETE, true);
@@ -1006,37 +1000,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// Private -----------------------------------------------------------------
- /**
- * @param message
- */
- protected void cleanupInternalPropertiesBeforeRouting(final ServerMessage message) {
- LinkedList<SimpleString> valuesToRemove = null;
-
- for (SimpleString name : message.getPropertyNames()) {
- // We use properties to establish routing context on clustering.
- // However if the client resends the message after receiving, it needs to be removed
- if ((name.startsWith(MessageImpl.HDR_ROUTE_TO_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_IDS)) || (name.startsWith(MessageImpl.HDR_ROUTE_TO_ACK_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_ACK_IDS))) {
- if (valuesToRemove == null) {
- valuesToRemove = new LinkedList<>();
- }
- valuesToRemove.add(name);
- }
- }
-
- if (valuesToRemove != null) {
- for (SimpleString removal : valuesToRemove) {
- message.removeProperty(removal);
- }
- }
- }
- private void setPagingStore(final ServerMessage message) throws Exception {
- PagingStore store = pagingManager.getPageStore(message.getAddress());
+ private void setPagingStore(final Message message) throws Exception {
+ PagingStore store = pagingManager.getPageStore(message.getAddressSimpleString());
- message.setPagingStore(store);
+ message.setContext(store);
}
- private void routeQueueInfo(final ServerMessage message,
+ private void routeQueueInfo(final Message message,
final Queue queue,
final boolean applyFilters) throws Exception {
if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message)) {
@@ -1074,13 +1045,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
@Override
- public void processRoute(final ServerMessage message,
+ public void processRoute(final Message message,
final RoutingContext context,
final boolean direct) throws Exception {
final List<MessageReference> refs = new ArrayList<>();
Transaction tx = context.getTransaction();
+ Long deliveryTime = message.getScheduledDeliveryTime();
+
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
PagingStore store = pagingManager.getPageStore(entry.getKey());
@@ -1095,14 +1068,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
for (Queue queue : entry.getValue().getNonDurableQueues()) {
- MessageReference reference = message.createReference(queue);
-
- refs.add(reference);
- if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
- Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+ MessageReference reference = MessageReference.Factory.createReference(message, queue);
- reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ if (deliveryTime != null) {
+ reference.setScheduledDeliveryTime(deliveryTime);
}
+ refs.add(reference);
message.incrementRefCount();
}
@@ -1112,22 +1083,20 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
while (iter.hasNext()) {
Queue queue = iter.next();
- MessageReference reference = message.createReference(queue);
+ MessageReference reference = MessageReference.Factory.createReference(message, queue);
- if (context.isAlreadyAcked(message.getAddress(), queue)) {
+ if (context.isAlreadyAcked(message.getAddressSimpleString(), queue)) {
reference.setAlreadyAcked();
if (tx != null) {
queue.acknowledge(tx, reference);
}
}
- refs.add(reference);
-
- if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
- Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
- reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ if (deliveryTime != null) {
+ reference.setScheduledDeliveryTime(deliveryTime);
}
+ refs.add(reference);
if (message.isDurable()) {
int durableRefCount = message.incrementDurableRefCount();
@@ -1152,7 +1121,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
}
- if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
+ if (message.containsDeliveryAnnotationProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
if (tx != null) {
storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
} else {
@@ -1189,7 +1158,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
* @param message
* @throws Exception
*/
- private void confirmLargeMessageSend(Transaction tx, final ServerMessage message) throws Exception {
+ private void confirmLargeMessageSend(Transaction tx, final Message message) throws Exception {
LargeServerMessage largeServerMessage = (LargeServerMessage) message;
if (largeServerMessage.getPendingRecordID() >= 0) {
if (tx == null) {
@@ -1245,13 +1214,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
- private boolean checkDuplicateID(final ServerMessage message,
+ private boolean checkDuplicateID(final Message message,
final RoutingContext context,
boolean rejectDuplicates,
AtomicBoolean startedTX) throws Exception {
// Check the DuplicateCache for the Bridge first
- Object bridgeDup = message.getObjectProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
+ Object bridgeDup = message.removeDeliveryAnnoationProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
if (bridgeDup != null) {
// if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
byte[] bridgeDupBytes = (byte[]) bridgeDup;
@@ -1269,8 +1238,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.decrementRefCount();
return false;
}
-
- message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
} else {
// if used BridgeDuplicate, it's not going to use the regular duplicate
// since this will would break redistribution (re-setting the duplicateId)
@@ -1281,7 +1248,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
boolean isDuplicate = false;
if (duplicateIDBytes != null) {
- cache = getDuplicateIDCache(message.getAddress());
+ cache = getDuplicateIDCache(message.getAddressSimpleString());
isDuplicate = cache.contains(duplicateIDBytes);
@@ -1338,8 +1305,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
- private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName) {
- ServerMessage message = new ServerMessageImpl(storageManager.generateID(), 50);
+ private Message createQueueInfoMessage(final NotificationType type, final SimpleString queueName) {
+ Message message = new CoreMessage().initBuffer(50).setMessageID(storageManager.generateID());
message.setAddress(queueName);
@@ -1433,7 +1400,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// Reverse the ref counts, and paging sizes
for (MessageReference ref : refs) {
- ServerMessage message = ref.getMessage();
+ Message message = ref.getMessage();
if (message.isDurable() && ref.getQueue().isDurable()) {
message.decrementDurableRefCount();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index 2869e38..45082b9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
@@ -47,7 +48,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@@ -87,11 +87,11 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
switch (packetType) {
case SESS_SEND: {
- packet = new SessionSendMessage(new ServerMessageImpl());
+ packet = new SessionSendMessage(new CoreMessage());
break;
}
case SESS_SEND_LARGE: {
- packet = new SessionSendLargeMessage(new ServerMessageImpl());
+ packet = new SessionSendLargeMessage(new CoreMessage());
break;
}
case REPLICATION_APPEND: {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index d09f62f..92cae64 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -25,9 +25,12 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
@@ -81,9 +84,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.jboss.logging.Logger;
@@ -137,13 +139,23 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private volatile CoreRemotingConnection remotingConnection;
+ private final CoreProtocolManager manager;
+
+ // The current currentLargeMessage being processed
+ private volatile LargeServerMessage currentLargeMessage;
+
private final boolean direct;
- public ServerSessionPacketHandler(final ServerSession session,
+ public ServerSessionPacketHandler(final CoreProtocolManager manager,
+ final ServerSession session,
final StorageManager storageManager,
final Channel channel) {
+ this.manager = manager;
+
this.session = session;
+ session.addCloseable((boolean failed) -> clearLargeMessage());
+
this.storageManager = storageManager;
this.channel = channel;
@@ -159,6 +171,16 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
}
+ private void clearLargeMessage() {
+ if (currentLargeMessage != null) {
+ try {
+ currentLargeMessage.deleteFile();
+ } catch (Throwable error) {
+ ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
+ }
+ }
+ }
+
public ServerSession getSession() {
return session;
}
@@ -469,7 +491,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_SEND: {
SessionSendMessage message = (SessionSendMessage) packet;
requiresResponse = message.isRequiresResponse();
- session.send((ServerMessage) message.getMessage(), direct);
+ session.send(message.getMessage(), direct);
if (requiresResponse) {
response = new NullResponseMessage();
}
@@ -477,13 +499,13 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
case SESS_SEND_LARGE: {
SessionSendLargeMessage message = (SessionSendLargeMessage) packet;
- session.sendLarge(message.getLargeMessage());
+ sendLarge(message.getLargeMessage());
break;
}
case SESS_SEND_CONTINUATION: {
SessionSendContinuationMessage message = (SessionSendContinuationMessage) packet;
requiresResponse = message.isRequiresResponse();
- session.sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
+ sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
if (requiresResponse) {
response = new NullResponseMessage();
}
@@ -681,4 +703,53 @@ public class ServerSessionPacketHandler implements ChannelHandler {
return serverLastReceivedCommandID;
}
+
+ // Large Message is part of the core protocol, we have these functions here as part of Packet handler
+ private void sendLarge(final Message message) throws Exception {
+ // need to create the LargeMessage before continue
+ long id = storageManager.generateID();
+
+ LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("sendLarge::" + largeMsg);
+ }
+
+ if (currentLargeMessage != null) {
+ ActiveMQServerLogger.LOGGER.replacingIncompleteLargeMessage(currentLargeMessage.getMessageID());
+ }
+
+ currentLargeMessage = largeMsg;
+ }
+
+
+
+ private void sendContinuations(final int packetSize,
+ final long messageBodySize,
+ final byte[] body,
+ final boolean continues) throws Exception {
+ if (currentLargeMessage == null) {
+ throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised();
+ }
+
+ // Immediately release the credits for the continuations- these don't contribute to the in-memory size
+ // of the message
+
+ currentLargeMessage.addBytes(body);
+
+ if (!continues) {
+ currentLargeMessage.releaseResources();
+
+ if (messageBodySize >= 0) {
+ currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
+ }
+
+
+ session.doSend(session.getCurrentTransaction(), currentLargeMessage, null, false, false);
+
+ currentLargeMessage = null;
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index e6595a5..919d84e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -168,7 +168,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap);
- ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel);
+ ServerSessionPacketHandler handler = new ServerSessionPacketHandler(protocolManager, session, server.getStorageManager(), channel);
channel.setHandler(handler);
// TODO - where is this removed?
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index ffaf2cb..cc81fbe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -53,9 +54,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQFrameDecoder2;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -111,16 +110,6 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
return false;
}
- /**
- * no need to implement this now
- *
- * @return
- */
- @Override
- public MessageConverter getConverter() {
- return null;
- }
-
@Override
public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) {
final Configuration config = server.getConfiguration();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
index 7fed534..7560917 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
@@ -21,7 +21,10 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -29,10 +32,21 @@ import org.apache.activemq.artemis.utils.uri.BeanSupport;
public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
+ public static final byte ID = 1;
private static String[] SUPPORTED_PROTOCOLS = {ActiveMQClient.DEFAULT_CORE_PROTOCOL};
private static final String MODULE_NAME = "artemis-server";
+ @Override
+ public byte getStoreID() {
+ return ID;
+ }
+
+ @Override
+ public Persister<Message> getPersister() {
+ return CoreMessagePersister.getInstance();
+ }
+
/**
* {@inheritDoc} *
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index a6c73eb..542d726 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.Packet;
@@ -28,7 +29,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -66,7 +66,7 @@ public final class CoreSessionCallback implements SessionCallback {
@Override
public int sendLargeMessage(MessageReference ref,
- ServerMessage message,
+ Message message,
ServerConsumer consumer,
long bodySize,
int deliveryCount) {
@@ -92,8 +92,9 @@ public final class CoreSessionCallback implements SessionCallback {
}
@Override
- public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
- Packet packet = new SessionReceiveMessage(consumer.getID(), message, deliveryCount);
+ public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
+
+ Packet packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount);
int size = 0;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
index 89d2863..8d22fab 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import java.util.Arrays;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
@@ -36,7 +36,9 @@ public final class ReplicationAddMessage extends PacketImpl {
private byte journalRecordType;
- private EncodingSupport encodingData;
+ private Persister persister;
+
+ private Object encodingData;
private byte[] recordData;
@@ -48,12 +50,14 @@ public final class ReplicationAddMessage extends PacketImpl {
final ADD_OPERATION_TYPE operation,
final long id,
final byte journalRecordType,
- final EncodingSupport encodingData) {
+ final Persister persister,
+ final Object encodingData) {
this();
this.journalID = journalID;
this.operation = operation;
this.id = id;
this.journalRecordType = journalRecordType;
+ this.persister = persister;
this.encodingData = encodingData;
}
@@ -66,8 +70,8 @@ public final class ReplicationAddMessage extends PacketImpl {
buffer.writeBoolean(operation.toBoolean());
buffer.writeLong(id);
buffer.writeByte(journalRecordType);
- buffer.writeInt(encodingData.getEncodeSize());
- encodingData.encode(buffer);
+ buffer.writeInt(persister.getEncodeSize(encodingData));
+ persister.encode(buffer, encodingData);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
index 59475e0..a6fd02b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import java.util.Arrays;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
@@ -36,7 +36,9 @@ public class ReplicationAddTXMessage extends PacketImpl {
private byte recordType;
- private EncodingSupport encodingData;
+ private Persister persister;
+
+ private Object encodingData;
private byte[] recordData;
@@ -51,7 +53,8 @@ public class ReplicationAddTXMessage extends PacketImpl {
final long txId,
final long id,
final byte recordType,
- final EncodingSupport encodingData) {
+ final Persister persister,
+ final Object encodingData) {
this();
this.journalID = journalID;
this.operation = operation;
@@ -59,6 +62,7 @@ public class ReplicationAddTXMessage extends PacketImpl {
this.id = id;
this.recordType = recordType;
this.encodingData = encodingData;
+ this.persister = persister;
}
// Public --------------------------------------------------------
@@ -70,8 +74,8 @@ public class ReplicationAddTXMessage extends PacketImpl {
buffer.writeLong(txId);
buffer.writeLong(id);
buffer.writeByte(recordType);
- buffer.writeInt(encodingData.getEncodeSize());
- encodingData.encode(buffer);
+ buffer.writeInt(persister.getEncodeSize(encodingData));
+ persister.encode(buffer, encodingData);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
index 7307151..b88e0fe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
@@ -48,7 +48,7 @@ public class ReplicationPageWriteMessage extends PacketImpl {
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
pageNumber = buffer.readInt();
- pagedMessage = new PagedMessageImpl();
+ pagedMessage = new PagedMessageImpl(null);
pagedMessage.decode(buffer);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
index ea3107c..c5318e7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
@@ -17,12 +17,14 @@
package org.apache.activemq.artemis.core.remoting.server;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.utils.ReusableLatch;
@@ -65,6 +67,8 @@ public interface RemotingService {
boolean isStarted();
+ Map<String, ProtocolManagerFactory> getProtocolFactoryMap();
+
/**
* Allow acceptors to use this as their default security Principal if applicable.
* <p>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 50bc90d..3e15f3e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -59,6 +59,7 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -147,7 +148,9 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
this.scheduledThreadPool = scheduledThreadPool;
CoreProtocolManagerFactory coreProtocolManagerFactory = new CoreProtocolManagerFactory();
- //i know there is only 1
+
+ MessagePersister.getInstance().registerProtocol(coreProtocolManagerFactory);
+
this.flushExecutor = flushExecutor;
ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.getModuleName());
@@ -174,6 +177,11 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
}
@Override
+ public Map<String, ProtocolManagerFactory> getProtocolFactoryMap() {
+ return protocolMap;
+ }
+
+ @Override
public synchronized void start() throws Exception {
if (started) {
return;
@@ -768,6 +776,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
*/
private void loadProtocolManagerFactories(Iterable<ProtocolManagerFactory> protocolManagerFactoryCollection) {
for (ProtocolManagerFactory next : protocolManagerFactoryCollection) {
+ MessagePersister.registerProtocol(next);
String[] protocols = next.getProtocols();
for (String protocol : protocols) {
ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol, next.getModuleName());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
index d70316f..0731e8c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
@@ -88,13 +89,14 @@ public class ReplicatedJournal implements Journal {
@Override
public void appendAddRecord(final long id,
final byte recordType,
- final EncodingSupport record,
+ Persister persister,
+ final Object record,
final boolean sync) throws Exception {
if (ReplicatedJournal.trace) {
ReplicatedJournal.trace("Append record id = " + id + " recordType = " + recordType);
}
- replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, record);
- localJournal.appendAddRecord(id, recordType, record, sync);
+ replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, persister, record);
+ localJournal.appendAddRecord(id, recordType, persister, record, sync);
}
/**
@@ -108,14 +110,15 @@ public class ReplicatedJournal implements Journal {
@Override
public void appendAddRecord(final long id,
final byte recordType,
- final EncodingSupport record,
+ Persister persister,
+ final Object record,
final boolean sync,
final IOCompletion completionCallback) throws Exception {
if (ReplicatedJournal.trace) {
ReplicatedJournal.trace("Append record id = " + id + " recordType = " + recordType);
}
- replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, record);
- localJournal.appendAddRecord(id, recordType, record, sync, completionCallback);
+ replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, persister, record);
+ localJournal.appendAddRecord(id, recordType, persister, record, sync, completionCallback);
}
/**
@@ -146,12 +149,13 @@ public class ReplicatedJournal implements Journal {
public void appendAddRecordTransactional(final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception {
+ final Persister persister,
+ final Object record) throws Exception {
if (ReplicatedJournal.trace) {
ReplicatedJournal.trace("Append record TXid = " + id + " recordType = " + recordType);
}
- replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.ADD, txID, id, recordType, record);
- localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.ADD, txID, id, recordType, persister, record);
+ localJournal.appendAddRecordTransactional(txID, id, recordType, persister, record);
}
/**
@@ -354,26 +358,28 @@ public class ReplicatedJournal implements Journal {
@Override
public void appendUpdateRecord(final long id,
final byte recordType,
- final EncodingSupport record,
+ final Persister persister,
+ final Object record,
final boolean sync) throws Exception {
if (ReplicatedJournal.trace) {
ReplicatedJournal.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
}
- replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, record);
- localJournal.appendUpdateRecord(id, recordType, record, sync);
+ replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
+ localJournal.appendUpdateRecord(id, recordType, persister, record, sync);
}
@Override
public void appendUpdateRecord(final long id,
final byte journalRecordType,
- final EncodingSupport record,
+ final Persister persister,
+ final Object record,
final boolean sync,
final IOCompletion completionCallback) throws Exception {
if (ReplicatedJournal.trace) {
ReplicatedJournal.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType);
}
- replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, record);
- localJournal.appendUpdateRecord(id, journalRecordType, record, sync, completionCallback);
+ replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
+ localJournal.appendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback);
}
/**
@@ -404,12 +410,13 @@ public class ReplicatedJournal implements Journal {
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception {
+ final Persister persister,
+ final Object record) throws Exception {
if (ReplicatedJournal.trace) {
ReplicatedJournal.trace("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
}
- replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, record);
- localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, persister, record);
+ localJournal.appendUpdateRecordTransactional(txID, id, recordType, persister, record);
}
/**
@@ -437,15 +444,6 @@ public class ReplicatedJournal implements Journal {
}
/**
- * @param pages
- * @see org.apache.activemq.artemis.core.journal.Journal#perfBlast(int)
- */
- @Override
- public void perfBlast(final int pages) {
- localJournal.perfBlast(pages);
- }
-
- /**
* @throws Exception
* @see org.apache.activemq.artemis.core.server.ActiveMQComponent#start()
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 1a07adc..e82d38e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
@@ -76,7 +77,7 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERA
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
@@ -651,8 +652,8 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
private void handlePageWrite(final ReplicationPageWriteMessage packet) throws Exception {
PagedMessage pgdMessage = packet.getPagedMessage();
pgdMessage.initMessage(storageManager);
- ServerMessage msg = pgdMessage.getMessage();
- Page page = getPage(msg.getAddress(), packet.getPageNumber());
+ Message msg = pgdMessage.getMessage();
+ Page page = getPage(msg.getAddressSimpleString(), packet.getPageNumber());
page.write(pgdMessage);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index d0468d1..dce5990 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -147,9 +148,10 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
final ADD_OPERATION_TYPE operation,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception {
+ final Persister persister,
+ final Object record) throws Exception {
if (enabled) {
- sendReplicatePacket(new ReplicationAddMessage(journalID, operation, id, recordType, record));
+ sendReplicatePacket(new ReplicationAddMessage(journalID, operation, id, recordType, persister, record));
}
}
@@ -164,9 +166,10 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception {
+ final Persister persister,
+ final Object record) throws Exception {
if (enabled) {
- sendReplicatePacket(new ReplicationAddTXMessage(journalID, operation, txID, id, recordType, record));
+ sendReplicatePacket(new ReplicationAddTXMessage(journalID, operation, txID, id, recordType, persister, record));
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 6ee844b..b79806a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -577,12 +577,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
void ioErrorAddingReferences(Integer errorCode, String errorMessage);
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 222058, value = "Duplicate message detected through the bridge - message will not be routed. Message information:\n{0}", format = Message.Format.MESSAGE_FORMAT)
- void duplicateMessageDetectedThruBridge(ServerMessage message);
-
- @LogMessage(level = Logger.Level.WARN)
@Message(id = 222059, value = "Duplicate message detected - message will not be routed. Message information:\n{0}", format = Message.Format.MESSAGE_FORMAT)
- void duplicateMessageDetected(ServerMessage message);
+ void duplicateMessageDetected(org.apache.activemq.artemis.api.core.Message message);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222060, value = "Error while confirming large message completion on rollback for recordID={0}", format = Message.Format.MESSAGE_FORMAT)
@@ -783,7 +779,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222110, value = "no queue IDs defined!, originalMessage = {0}, copiedMessage = {1}, props={2}",
format = Message.Format.MESSAGE_FORMAT)
- void noQueueIdDefined(ServerMessage message, ServerMessage messageCopy, SimpleString idsHeaderName);
+ void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message, org.apache.activemq.artemis.api.core.Message messageCopy, SimpleString idsHeaderName);
@LogMessage(level = Logger.Level.TRACE)
@Message(id = 222111, value = "exception while invoking {0} on {1}",
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java
index 0e38634..1ede0ea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java
@@ -16,9 +16,11 @@
*/
package org.apache.activemq.artemis.core.server;
+import org.apache.activemq.artemis.api.core.Message;
+
public interface Bindable {
- void route(ServerMessage message, RoutingContext context) throws Exception;
+ void route(Message message, RoutingContext context) throws Exception;
- void routeWithAck(ServerMessage message, RoutingContext context) throws Exception;
+ void routeWithAck(Message message, RoutingContext context) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
index 2a16ed2..6fcc802 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
@@ -17,10 +17,11 @@
package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage;
-public interface LargeServerMessage extends ServerMessage, ReplicatedLargeMessage {
+public interface LargeServerMessage extends ReplicatedLargeMessage, ICoreMessage {
@Override
void addBytes(byte[] bytes) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index a1e6a20..799b0b0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -16,7 +16,10 @@
*/
package org.apache.activemq.artemis.core.server;
+
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
/**
@@ -26,9 +29,14 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
*/
public interface MessageReference {
+ final class Factory {
+ public static MessageReference createReference(Message encode, final Queue queue) {
+ return new MessageReferenceImpl(encode, queue);
+ }
+ }
boolean isPaged();
- ServerMessage getMessage();
+ Message getMessage();
/**
* We define this method aggregation here because on paging we need to hold the original estimate,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index ae377bb..d7b70a3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
@@ -197,7 +198,7 @@ public interface Queue extends Bindable {
void cancelRedistributor() throws Exception;
- boolean hasMatchingConsumer(ServerMessage message);
+ boolean hasMatchingConsumer(Message message);
Collection<Consumer> getConsumers();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java
deleted file mode 100644
index 40dc50f..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
-import org.apache.activemq.artemis.core.paging.PagingStore;
-
-/**
- * A ServerMessage
- */
-public interface ServerMessage extends MessageInternal, EncodingSupport {
-
- ServerMessage setMessageID(long id);
-
- MessageReference createReference(Queue queue);
-
- /**
- * This will force encoding of the address, and will re-check the buffer
- * This is to avoid setMessageTransient which set the address without changing the buffer
- *
- * @param address
- */
- void forceAddress(SimpleString address);
-
- int incrementRefCount() throws Exception;
-
- int decrementRefCount() throws Exception;
-
- int incrementDurableRefCount();
-
- int decrementDurableRefCount();
-
- ServerMessage copy(long newID);
-
- ServerMessage copy();
-
- int getMemoryEstimate();
-
- int getRefCount();
-
- ServerMessage makeCopyForExpiryOrDLA(long newID,
- MessageReference originalReference,
- boolean expiry,
- boolean copyOriginalHeaders) throws Exception;
-
- void setOriginalHeaders(ServerMessage other, MessageReference originalReference, boolean expiry);
-
- void setPagingStore(PagingStore store);
-
- PagingStore getPagingStore();
-
- // Is there any _AMQ_ property being used
- boolean hasInternalProperties();
-
- boolean storeIsPaging();
-
- void encodeMessageIDToBuffer();
-
- byte[] getDuplicateIDBytes();
-
- Object getDuplicateProperty();
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index f4e2ec7..0ce0728 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -21,10 +21,11 @@ import javax.transaction.xa.Xid;
import java.util.List;
import java.util.Set;
+import org.apache.activemq.artemis.Closeable;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
@@ -99,6 +100,8 @@ public interface ServerSession extends SecurityAuth {
void stop();
+ void addCloseable(Closeable closeable);
+
/**
* To be used by protocol heads that needs to control the transaction outside the session context.
*/
@@ -178,18 +181,20 @@ public interface ServerSession extends SecurityAuth {
void receiveConsumerCredits(long consumerID, int credits) throws Exception;
- void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean continues) throws Exception;
-
RoutingStatus send(Transaction tx,
- ServerMessage message,
+ Message message,
boolean direct,
boolean noAutoCreateQueue) throws Exception;
- RoutingStatus send(ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception;
+ RoutingStatus doSend(final Transaction tx,
+ final Message msg,
+ final SimpleString originalAddress,
+ final boolean direct,
+ final boolean noAutoCreateQueue) throws Exception;
- RoutingStatus send(ServerMessage message, boolean direct) throws Exception;
+ RoutingStatus send(Message message, boolean direct, boolean noAutoCreateQueue) throws Exception;
- void sendLarge(MessageInternal msg) throws Exception;
+ RoutingStatus send(Message message, boolean direct) throws Exception;
void forceConsumerDelivery(long consumerID, long sequence) throws Exception;
@@ -249,7 +254,9 @@ public interface ServerSession extends SecurityAuth {
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
- SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
+ SimpleString getMatchingQueue(SimpleString address,
+ SimpleString queueName,
+ RoutingType routingType) throws Exception;
AddressInfo getAddress(SimpleString address);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java
index 1583f2c..48f4aa9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java
@@ -16,9 +16,9 @@
*/
package org.apache.activemq.artemis.core.server.cluster;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.Message;
public interface Transformer {
- ServerMessage transform(ServerMessage message);
+ Message transform(Message message);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index ee549c5..f7a3540 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -46,14 +46,12 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@@ -499,16 +497,16 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
/* Hook for processing message before forwarding */
- protected ServerMessage beforeForward(final ServerMessage message) {
+ protected Message beforeForward(final Message message) {
if (useDuplicateDetection) {
// We keep our own DuplicateID for the Bridge, so bouncing back and forth will work fine
byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
- message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
+ message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, bytes);
}
if (transformer != null) {
- final ServerMessage transformedMessage = transformer.transform(message);
+ final Message transformedMessage = transformer.transform(message);
if (transformedMessage != message) {
if (logger.isDebugEnabled()) {
logger.debug("The transformer " + transformer +
@@ -556,7 +554,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
refs.put(ref.getMessage().getMessageID(), ref);
}
- final ServerMessage message = beforeForward(ref.getMessage());
+ final Message message = beforeForward(ref.getMessage());
final SimpleString dest;
@@ -564,7 +562,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
dest = forwardingAddress;
} else {
// Preserve the original address
- dest = message.getAddress();
+ dest = message.getAddressSimpleString();
}
pendingAcks.countUp();
@@ -686,7 +684,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
* @param message
* @return
*/
- private HandleStatus deliverStandardMessage(SimpleString dest, final MessageReference ref, ServerMessage message) {
+ private HandleStatus deliverStandardMessage(SimpleString dest, final MessageReference ref, Message message) {
// if we failover during send then there is a chance that the
// that this will throw a disconnect, we need to remove the message
// from the acks so it will get resent, duplicate detection will cope
@@ -697,6 +695,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
try {
+ // TODO-now: replace this
producer.send(dest, message);
} catch (final ActiveMQException e) {
ActiveMQServerLogger.LOGGER.bridgeUnableToSendMessage(e, ref);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index f16d863..a870ea6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -36,12 +37,10 @@ import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
@@ -113,7 +112,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
this.discoveryLocator = discoveryLocator;
- idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
+ idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(name);
this.clusterConnection = clusterConnection;
@@ -150,13 +149,13 @@ public class ClusterConnectionBridge extends BridgeImpl {
}
@Override
- protected ServerMessage beforeForward(final ServerMessage message) {
+ protected Message beforeForward(final Message message) {
// We make a copy of the message, then we strip out the unwanted routing id headers and leave
// only
// the one pertinent for the address node - this is important since different queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require different headers
- ServerMessage messageCopy = message.copy();
+ Message messageCopy = message.copy();
if (logger.isTraceEnabled()) {
logger.trace("Clustered bridge copied message " + message + " as " + messageCopy + " before delivery");
@@ -175,12 +174,12 @@ public class ClusterConnectionBridge extends BridgeImpl {
}
for (SimpleString propName : propNames) {
- if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) {
+ if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
messageCopy.removeProperty(propName);
}
}
- messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+ messageCopy.putBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
messageCopy = super.beforeForward(messageCopy);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index c585405..e9477a8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ReusableLatch;
@@ -139,7 +138,7 @@ public class Redistributor implements Consumer {
final Transaction tx = new TransactionImpl(storageManager);
- final Pair<RoutingContext, ServerMessage> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);
+ final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);
if (routingInfo == null) {
return HandleStatus.BUSY;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
index 8f54b2a..02a7671 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
@@ -23,15 +23,14 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.jboss.logging.Logger;
@@ -88,7 +87,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
queueFilter = FilterImpl.createFilter(filterString);
- idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(bridgeName);
+ idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(bridgeName);
this.distance = distance;
}
@@ -149,7 +148,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
}
@Override
- public synchronized boolean isHighAcceptPriority(final ServerMessage message) {
+ public synchronized boolean isHighAcceptPriority(final Message message) {
if (consumerCount == 0) {
return false;
}
@@ -172,7 +171,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
}
@Override
- public void route(final ServerMessage message, final RoutingContext context) {
+ public void route(final Message message, final RoutingContext context) {
addRouteContextToMessage(message);
List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress());
@@ -185,7 +184,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
}
@Override
- public void routeWithAck(ServerMessage message, RoutingContext context) {
+ public void routeWithAck(Message message, RoutingContext context) {
addRouteContextToMessage(message);
List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress());
@@ -315,7 +314,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
*
* @param message
*/
- private void addRouteContextToMessage(final ServerMessage message) {
+ private void addRouteContextToMessage(final Message message) {
byte[] ids = message.getBytesProperty(idsHeaderName);
if (ids == null) {