You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/06 03:58:27 UTC
[09/21] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 2943f15..f0f8e97 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -49,7 +49,6 @@ import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.QueueStatus;
@@ -71,7 +70,6 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -440,12 +438,12 @@ public class QueueImpl implements Queue {
}
@Override
- public void route(final ServerMessage message, final RoutingContext context) throws Exception {
+ public void route(final Message message, final RoutingContext context) throws Exception {
context.addQueue(address, this);
}
@Override
- public void routeWithAck(ServerMessage message, RoutingContext context) {
+ public void routeWithAck(Message message, RoutingContext context) {
context.addQueueWithAck(address, this);
}
@@ -922,7 +920,7 @@ public class QueueImpl implements Queue {
}
@Override
- public boolean hasMatchingConsumer(final ServerMessage message) {
+ public boolean hasMatchingConsumer(final Message message) {
for (ConsumerHolder holder : consumerList) {
Consumer consumer = holder.consumer;
@@ -1055,7 +1053,7 @@ public class QueueImpl implements Queue {
pageSubscription.ack((PagedReference) ref);
postAcknowledge(ref);
} else {
- ServerMessage message = ref.getMessage();
+ Message message = ref.getMessage();
boolean durableRef = message.isDurable() && durable;
@@ -1087,7 +1085,7 @@ public class QueueImpl implements Queue {
getRefsOperation(tx).addAck(ref);
} else {
- ServerMessage message = ref.getMessage();
+ Message message = ref.getMessage();
boolean durableRef = message.isDurable() && durable;
@@ -1111,7 +1109,7 @@ public class QueueImpl implements Queue {
@Override
public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception {
- ServerMessage message = ref.getMessage();
+ Message message = ref.getMessage();
if (message.isDurable() && durable) {
tx.setContainsPersistent();
@@ -1216,11 +1214,11 @@ public class QueueImpl implements Queue {
return expiryAddress;
}
- private SimpleString extractAddress(ServerMessage message) {
- if (message.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) {
- return message.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS);
+ private SimpleString extractAddress(Message message) {
+ if (message.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) {
+ return message.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString());
} else {
- return message.getAddress();
+ return message.getAddressSimpleString();
}
}
@@ -1244,7 +1242,7 @@ public class QueueImpl implements Queue {
List<MessageReference> scheduledMessages = scheduledDeliveryHandler.cancel(null);
if (scheduledMessages != null && scheduledMessages.size() > 0) {
for (MessageReference ref : scheduledMessages) {
- ref.getMessage().putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, ref.getScheduledDeliveryTime());
+ ref.getMessage().setScheduledDeliveryTime(ref.getScheduledDeliveryTime());
ref.setScheduledDeliveryTime(0);
}
this.addHead(scheduledMessages, true);
@@ -2274,7 +2272,7 @@ public class QueueImpl implements Queue {
public boolean checkRedelivery(final MessageReference reference,
final long timeBase,
final boolean ignoreRedeliveryDelay) throws Exception {
- ServerMessage message = reference.getMessage();
+ Message message = reference.getMessage();
if (internalQueue) {
if (logger.isTraceEnabled()) {
@@ -2337,7 +2335,7 @@ public class QueueImpl implements Queue {
final boolean expiry,
final boolean rejectDuplicate,
final long... queueIDs) throws Exception {
- ServerMessage copyMessage = makeCopy(ref, expiry);
+ Message copyMessage = makeCopy(ref, expiry);
copyMessage.setAddress(toAddress);
@@ -2346,7 +2344,7 @@ public class QueueImpl implements Queue {
for (long id : queueIDs) {
buffer.putLong(id);
}
- copyMessage.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+ copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
}
postOffice.route(copyMessage, tx, false, rejectDuplicate);
@@ -2358,7 +2356,7 @@ public class QueueImpl implements Queue {
private void moveBetweenSnFQueues(final SimpleString queueSuffix,
final Transaction tx,
final MessageReference ref) throws Exception {
- ServerMessage copyMessage = makeCopy(ref, false, false);
+ Message copyMessage = makeCopy(ref, false, false);
byte[] oldRouteToIDs = null;
String targetNodeID;
@@ -2366,8 +2364,8 @@ public class QueueImpl implements Queue {
// remove the old route
for (SimpleString propName : copyMessage.getPropertyNames()) {
- if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) {
- oldRouteToIDs = (byte[]) copyMessage.removeProperty(propName);
+ if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
+ oldRouteToIDs = (byte[]) copyMessage.removeProperty(propName.toString());
final String hashcodeToString = oldRouteToIDs.toString(); // don't use Arrays.toString(..) here
logger.debug("Removed property from message: " + propName + " = " + hashcodeToString + " (" + ByteBuffer.wrap(oldRouteToIDs).getLong() + ")");
@@ -2420,7 +2418,7 @@ public class QueueImpl implements Queue {
}
private Pair<String, Binding> locateTargetBinding(SimpleString queueSuffix,
- ServerMessage copyMessage,
+ Message copyMessage,
long oldQueueID) {
String targetNodeID = null;
Binding targetBinding = null;
@@ -2440,7 +2438,7 @@ public class QueueImpl implements Queue {
// parse the queue name of the remote queue binding to determine the node ID
String temp = remoteQueueBinding.getQueue().getName().toString();
targetNodeID = temp.substring(temp.lastIndexOf(".") + 1);
- logger.debug("Message formerly destined for " + oldQueueName + " with ID: " + oldQueueID + " on address " + copyMessage.getAddress() + " on node " + targetNodeID);
+ logger.debug("Message formerly destined for " + oldQueueName + " with ID: " + oldQueueID + " on address " + copyMessage.getAddressSimpleString() + " on node " + targetNodeID);
// now that we have the name of the queue we need to look through all the bindings again to find the new remote queue binding
for (Map.Entry<SimpleString, Binding> entry2 : postOffice.getAllBindings().entrySet()) {
@@ -2468,14 +2466,14 @@ public class QueueImpl implements Queue {
return new Pair<>(targetNodeID, targetBinding);
}
- private ServerMessage makeCopy(final MessageReference ref, final boolean expiry) throws Exception {
+ private Message makeCopy(final MessageReference ref, final boolean expiry) throws Exception {
return makeCopy(ref, expiry, true);
}
- private ServerMessage makeCopy(final MessageReference ref,
+ private Message makeCopy(final MessageReference ref,
final boolean expiry,
final boolean copyOriginalHeaders) throws Exception {
- ServerMessage message = ref.getMessage();
+ Message message = ref.getMessage();
/*
We copy the message and send that to the dla/expiry queue - this is
because otherwise we may end up with a ref with the same message id in the
@@ -2487,7 +2485,15 @@ public class QueueImpl implements Queue {
long newID = storageManager.generateID();
- ServerMessage copy = message.makeCopyForExpiryOrDLA(newID, ref, expiry, copyOriginalHeaders);
+ Message copy = message.copy(newID);
+
+ if (copyOriginalHeaders) {
+ copy.referenceOriginalMessage(message, ref != null ? ref.getQueue().getName().toString() : null);
+ }
+
+ if (expiry) {
+ copy.putLongProperty(Message.HDR_ACTUAL_EXPIRY_TIME.toString(), System.currentTimeMillis());
+ }
return copy;
}
@@ -2549,7 +2555,7 @@ public class QueueImpl implements Queue {
tx = new TransactionImpl(storageManager);
}
- ServerMessage copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);
+ Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);
copyMessage.setAddress(address);
@@ -2719,7 +2725,7 @@ public class QueueImpl implements Queue {
return;
}
- ServerMessage message;
+ Message message;
try {
message = ref.getMessage();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 8e3a94b..0f3da07 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -22,12 +22,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
@@ -122,7 +122,7 @@ public class RefsOperation extends TransactionOperationAbstract {
try {
Transaction ackedTX = new TransactionImpl(storageManager);
for (MessageReference ref : ackedRefs) {
- ServerMessage message = ref.getMessage();
+ Message message = ref.getMessage();
if (message.isDurable()) {
int durableRefCount = message.incrementDurableRefCount();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index a5f96b1..4590c0b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -39,7 +39,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
@@ -54,7 +53,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -193,7 +191,7 @@ public class ScaleDownHandler {
buffer.putLong(queueID);
}
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+ message.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
if (logger.isDebugEnabled()) {
if (messageReference.isPaged()) {
@@ -264,11 +262,11 @@ public class ScaleDownHandler {
byte[] oldRouteToIDs = null;
List<SimpleString> propertiesToRemove = new ArrayList<>();
- message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
+ message.removeProperty(Message.HDR_ROUTE_TO_IDS.toString());
for (SimpleString propName : message.getPropertyNames()) {
- if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) {
+ if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
if (propName.toString().endsWith(propertyEnd)) {
- oldRouteToIDs = message.getBytesProperty(propName);
+ oldRouteToIDs = message.getBytesProperty(propName.toString());
}
propertiesToRemove.add(propName);
}
@@ -277,16 +275,17 @@ public class ScaleDownHandler {
// TODO: what if oldRouteToIDs == null ??
for (SimpleString propertyToRemove : propertiesToRemove) {
- message.removeProperty(propertyToRemove);
+ message.removeProperty(propertyToRemove.toString());
}
if (queueOnTarget) {
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, oldRouteToIDs);
+ message.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), oldRouteToIDs);
} else {
- message.putBytesProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS, oldRouteToIDs);
+ message.putBytesProperty(Message.HDR_SCALEDOWN_TO_IDS.toString(), oldRouteToIDs);
}
logger.debug("Scaling down message " + message + " from " + address + " to " + message.getAddress() + " on node " + targetNodeId);
+
producer.send(message.getAddress(), message);
messageCount++;
@@ -322,13 +321,13 @@ public class ScaleDownHandler {
List<TransactionOperation> allOperations = transaction.getAllOperations();
// Get the information of the Prepared TXs so it could replay the TXs
- Map<ServerMessage, Pair<List<Long>, List<Long>>> queuesToSendTo = new HashMap<>();
+ Map<Message, Pair<List<Long>, List<Long>>> queuesToSendTo = new HashMap<>();
for (TransactionOperation operation : allOperations) {
if (operation instanceof PostOfficeImpl.AddOperation) {
PostOfficeImpl.AddOperation addOperation = (PostOfficeImpl.AddOperation) operation;
List<MessageReference> refs = addOperation.getRelatedMessageReferences();
for (MessageReference ref : refs) {
- ServerMessage message = ref.getMessage();
+ Message message = ref.getMessage();
Queue queue = ref.getQueue();
long queueID;
String queueName = queue.getName().toString();
@@ -336,7 +335,7 @@ public class ScaleDownHandler {
if (queueIDs.containsKey(queueName)) {
queueID = queueIDs.get(queueName);
} else {
- queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddress());
+ queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString());
queueIDs.put(queueName, queueID); // store it so we don't have to look it up every time
}
Pair<List<Long>, List<Long>> queueIds = queuesToSendTo.get(message);
@@ -350,7 +349,7 @@ public class ScaleDownHandler {
RefsOperation refsOperation = (RefsOperation) operation;
List<MessageReference> refs = refsOperation.getReferencesToAcknowledge();
for (MessageReference ref : refs) {
- ServerMessage message = ref.getMessage();
+ Message message = ref.getMessage();
Queue queue = ref.getQueue();
long queueID;
String queueName = queue.getName().toString();
@@ -358,7 +357,7 @@ public class ScaleDownHandler {
if (queueIDs.containsKey(queueName)) {
queueID = queueIDs.get(queueName);
} else {
- queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddress());
+ queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString());
queueIDs.put(queueName, queueID); // store it so we don't have to look it up every time
}
Pair<List<Long>, List<Long>> queueIds = queuesToSendTo.get(message);
@@ -373,23 +372,23 @@ public class ScaleDownHandler {
}
ClientProducer producer = session.createProducer();
- for (Map.Entry<ServerMessage, Pair<List<Long>, List<Long>>> entry : queuesToSendTo.entrySet()) {
+ for (Map.Entry<Message, Pair<List<Long>, List<Long>>> entry : queuesToSendTo.entrySet()) {
List<Long> ids = entry.getValue().getA();
ByteBuffer buffer = ByteBuffer.allocate(ids.size() * 8);
for (Long id : ids) {
buffer.putLong(id);
}
- ServerMessage message = entry.getKey();
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+ Message message = entry.getKey();
+ message.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
ids = entry.getValue().getB();
if (ids.size() > 0) {
buffer = ByteBuffer.allocate(ids.size() * 8);
for (Long id : ids) {
buffer.putLong(id);
}
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_ACK_IDS, buffer.array());
+ message.putBytesProperty(Message.HDR_ROUTE_TO_ACK_IDS.toString(), buffer.array());
}
- producer.send(message.getAddress(), message);
+ producer.send(message.getAddressSimpleString().toString(), message);
}
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index bcc6df1..710a22b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -31,12 +31,14 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@@ -48,7 +50,6 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -205,7 +206,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
this.creationTime = System.currentTimeMillis();
-
if (browseOnly) {
browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator());
} else {
@@ -341,7 +341,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
return HandleStatus.BUSY;
}
- final ServerMessage message = ref.getMessage();
+ final Message message = ref.getMessage();
if (filter != null && !filter.match(message)) {
if (logger.isTraceEnabled()) {
@@ -400,7 +400,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public void proceedDeliver(MessageReference reference) throws Exception {
try {
- ServerMessage message = reference.getMessage();
+ Message message = reference.getMessage();
if (message.isLargeMessage() && supportLargeMessage) {
if (largeMessageDeliverer == null) {
@@ -507,17 +507,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
* there are no other messages to be delivered.
*/
@Override
- public void forceDelivery(final long sequence) {
- forceDelivery(sequence, new Runnable() {
- @Override
- public void run() {
- ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateID(), 50);
+ public void forceDelivery(final long sequence) {
+ forceDelivery(sequence, () -> {
+ Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
- forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
- forcedDeliveryMessage.setAddress(messageQueue.getName());
+ forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+ forcedDeliveryMessage.setAddress(messageQueue.getName());
+
+ callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
- callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
- }
});
}
@@ -1018,7 +1016,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
* @param ref
* @param message
*/
- private void deliverStandardMessage(final MessageReference ref, final ServerMessage message) {
+ private void deliverStandardMessage(final MessageReference ref, final Message message) throws ActiveMQException {
int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount());
if (availableCredits != null) {
@@ -1070,7 +1068,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
*/
private long positionPendingLargeMessage;
- private BodyEncoder context;
+ private LargeBodyEncoder context;
private LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref) throws Exception {
largeMessage = message;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
deleted file mode 100644
index 39e77ca..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server.impl;
-
-import java.io.InputStream;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
-import org.apache.activemq.artemis.core.paging.PagingStore;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.artemis.utils.MemorySize;
-import org.apache.activemq.artemis.utils.TypedProperties;
-
-public class ServerMessageImpl extends MessageImpl implements ServerMessage {
-
- private final AtomicInteger durableRefCount = new AtomicInteger();
-
- private final AtomicInteger refCount = new AtomicInteger();
-
- private PagingStore pagingStore;
-
- private static final int memoryOffset;
-
- private boolean persisted = false;
-
- static {
- // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties
- // Note, it is only an estimate, it's not possible to be entirely sure with Java
- // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof
- // The value is somewhat higher on 64 bit architectures, probably due to different alignment
-
- if (MemorySize.is64bitArch()) {
- memoryOffset = 352;
- } else {
- memoryOffset = 232;
- }
- }
-
- /*
- * Constructor for when reading from network
- */
- public ServerMessageImpl() {
- }
-
- /*
- * Construct a MessageImpl from storage, or notification, or before routing
- */
- public ServerMessageImpl(final long messageID, final int initialMessageBufferSize) {
- super(initialMessageBufferSize);
-
- this.messageID = messageID;
- }
-
- /*
- * Copy constructor
- */
- protected ServerMessageImpl(final ServerMessageImpl other) {
- super(other);
- }
-
- /*
- * Copy constructor
- */
- protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties properties) {
- super(other, properties);
- }
-
- @Override
- public boolean isServerMessage() {
- return true;
- }
-
- @Override
- public ServerMessageImpl setMessageID(final long id) {
- messageID = id;
- return this;
- }
-
- @Override
- public MessageReference createReference(final Queue queue) {
- MessageReference ref = new MessageReferenceImpl(this, queue);
-
- return ref;
- }
-
- @Override
- public boolean hasInternalProperties() {
- return properties.hasInternalProperties();
- }
-
- @Override
- public int incrementRefCount() throws Exception {
- int count = refCount.incrementAndGet();
-
- if (pagingStore != null) {
- if (count == 1) {
- pagingStore.addSize(getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate());
- } else {
- pagingStore.addSize(MessageReferenceImpl.getMemoryEstimate());
- }
- }
-
- return count;
- }
-
- @Override
- public int decrementRefCount() throws Exception {
- int count = refCount.decrementAndGet();
-
- if (count < 0) {
- // this could happen on paged messages since they are not routed and incrementRefCount is never called
- return count;
- }
-
- if (pagingStore != null) {
- if (count == 0) {
- pagingStore.addSize(-getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate());
-
- if (buffer != null) {
- // release the buffer now
- buffer.byteBuf().release();
- }
- } else {
- pagingStore.addSize(-MessageReferenceImpl.getMemoryEstimate());
- }
- }
-
- return count;
- }
-
- @Override
- public int incrementDurableRefCount() {
- return durableRefCount.incrementAndGet();
- }
-
- @Override
- public int decrementDurableRefCount() {
- return durableRefCount.decrementAndGet();
- }
-
- @Override
- public int getRefCount() {
- return refCount.get();
- }
-
- @Override
- public boolean isLargeMessage() {
- return false;
- }
-
- private volatile int memoryEstimate = -1;
-
- @Override
- public int getMemoryEstimate() {
- if (memoryEstimate == -1) {
- memoryEstimate = ServerMessageImpl.memoryOffset + buffer.capacity() + properties.getMemoryOffset();
- }
-
- return memoryEstimate;
- }
-
- @Override
- public ServerMessage copy(final long newID) {
- ServerMessage m = new ServerMessageImpl(this);
-
- m.setMessageID(newID);
-
- return m;
- }
-
- @Override
- public ServerMessage copy() {
- // This is a simple copy, used only to avoid changing original properties
- return new ServerMessageImpl(this);
- }
-
- public ServerMessage makeCopyForExpiryOrDLA(final long newID,
- MessageReference originalReference,
- final boolean expiry) throws Exception {
- return makeCopyForExpiryOrDLA(newID, originalReference, expiry, true);
- }
-
- @Override
- public ServerMessage makeCopyForExpiryOrDLA(final long newID,
- MessageReference originalReference,
- final boolean expiry,
- final boolean copyOriginalHeaders) throws Exception {
- /*
- We copy the message and send that to the dla/expiry queue - this is
- because otherwise we may end up with a ref with the same message id in the
- queue more than once which would barf - this might happen if the same message had been
- expire from multiple subscriptions of a topic for example
- We set headers that hold the original message address, expiry time
- and original message id
- */
-
- ServerMessage copy = copy(newID);
-
- if (copyOriginalHeaders) {
- copy.setOriginalHeaders(this, originalReference, expiry);
- }
-
- return copy;
- }
-
- @Override
- public void setOriginalHeaders(final ServerMessage other,
- final MessageReference originalReference,
- final boolean expiry) {
- SimpleString originalQueue = other.getSimpleStringProperty(Message.HDR_ORIGINAL_QUEUE);
-
- if (originalQueue != null) {
- putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
- } else if (originalReference != null) {
- putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalReference.getQueue().getName());
- }
-
- if (other.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) {
- putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS));
-
- putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getLongProperty(Message.HDR_ORIG_MESSAGE_ID));
- } else {
- putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getAddress());
-
- putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getMessageID());
- }
-
- // reset expiry
- setExpiration(0);
-
- if (expiry) {
- long actualExpiryTime = System.currentTimeMillis();
-
- putLongProperty(Message.HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
- }
-
- bufferValid = false;
- }
-
- @Override
- public void setPagingStore(final PagingStore pagingStore) {
- this.pagingStore = pagingStore;
-
- // On the server side, we reset the address to point to the instance of address in the paging store
- // Otherwise each message would have its own copy of the address String which would take up more memory
- address = pagingStore.getAddress();
- }
-
- @Override
- public synchronized void forceAddress(final SimpleString address) {
- this.address = address;
- bufferValid = false;
- }
-
- @Override
- public PagingStore getPagingStore() {
- return pagingStore;
- }
-
- @Override
- public boolean storeIsPaging() {
- if (pagingStore != null) {
- return pagingStore.isPaging();
- } else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- try {
- return "ServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", bodySize=" + this.getBodyBufferDuplicate().capacity() +
- ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
- ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
- } catch (Throwable e) {
- return "ServerMessage[messageID=" + messageID + "]";
- }
- }
-
- private static String toDate(long timestamp) {
- if (timestamp == 0) {
- return "0";
- } else {
- return new java.util.Date(timestamp).toString();
- }
-
- }
-
- @Override
- public InputStream getBodyInputStream() {
- return null;
- }
-
- // Encoding stuff
-
- @Override
- public void encodeMessageIDToBuffer() {
- // We first set the message id - this needs to be set on the buffer since this buffer will be re-used
-
- buffer.setLong(buffer.getInt(MessageImpl.BUFFER_HEADER_SPACE) + DataConstants.SIZE_INT, messageID);
- }
-
- @Override
- public byte[] getDuplicateIDBytes() {
- Object duplicateID = getDuplicateProperty();
-
- if (duplicateID == null) {
- return null;
- } else {
- if (duplicateID instanceof SimpleString) {
- return ((SimpleString) duplicateID).getData();
- } else {
- return (byte[]) duplicateID;
- }
- }
- }
-
- @Override
- public Object getDuplicateProperty() {
- return getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 52ecda1..97187e7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -41,12 +42,10 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -66,14 +65,13 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
-import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.TempQueueObserver;
import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -90,7 +88,6 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.PrefixUtil;
import org.apache.activemq.artemis.utils.TypedProperties;
-import org.apache.activemq.artemis.utils.UUID;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
@@ -155,9 +152,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private final SimpleString managementAddress;
- // The current currentLargeMessage being processed
- private volatile LargeServerMessage currentLargeMessage;
-
protected final RoutingContext routingContext = new RoutingContextImpl(null);
protected final SessionCallback callback;
@@ -171,7 +165,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private final OperationContext context;
// Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
- protected final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<>();
+ protected final Map<SimpleString, Pair<Object, AtomicLong>> targetAddressInfos = new HashMap<>();
private final long creationTime = System.currentTimeMillis();
@@ -187,6 +181,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private Map<SimpleString, RoutingType> prefixes;
+ private Set<Closeable> closeables;
+
public ServerSessionImpl(final String name,
final String username,
final String password,
@@ -273,6 +269,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
+ public void addCloseable(Closeable closeable) {
+ if (closeables == null) {
+ closeables = new HashSet<>();
+ }
+ this.closeables.add(closeable);
+ }
+
+ @Override
public void disableSecurity() {
this.securityEnabled = false;
}
@@ -376,11 +380,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
consumers.clear();
- if (currentLargeMessage != null) {
- try {
- currentLargeMessage.deleteFile();
- } catch (Throwable error) {
- ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
+ if (closeables != null) {
+ for (Closeable closeable : closeables) {
+ closeable.close(failed);
}
}
@@ -1272,30 +1274,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
- public void sendLarge(final MessageInternal message) throws Exception {
- // need to create the LargeMessage before continue
- long id = storageManager.generateID();
-
- LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
-
- if (logger.isTraceEnabled()) {
- logger.trace("sendLarge::" + largeMsg);
- }
-
- if (currentLargeMessage != null) {
- ActiveMQServerLogger.LOGGER.replacingIncompleteLargeMessage(currentLargeMessage.getMessageID());
- }
-
- currentLargeMessage = largeMsg;
- }
-
- @Override
- public RoutingStatus send(final ServerMessage message, final boolean direct) throws Exception {
+ public RoutingStatus send(final Message message, final boolean direct) throws Exception {
return send(message, direct, false);
}
@Override
- public RoutingStatus send(final ServerMessage message,
+ public RoutingStatus send(final Message message,
final boolean direct,
boolean noAutoCreateQueue) throws Exception {
return send(getCurrentTransaction(), message, direct, noAutoCreateQueue);
@@ -1303,7 +1287,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public RoutingStatus send(Transaction tx,
- final ServerMessage message,
+ final Message message,
final boolean direct,
boolean noAutoCreateQueue) throws Exception {
@@ -1319,19 +1303,20 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
//case the id header already generated.
if (!message.isLargeMessage()) {
long id = storageManager.generateID();
-
+ // This will re-encode the message
message.setMessageID(id);
- message.encodeMessageIDToBuffer();
}
if (server.getConfiguration().isPopulateValidatedUser() && validatedUser != null) {
message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser));
}
- SimpleString address = removePrefix(message.getAddress());
+ SimpleString originalAddress = message.getAddressSimpleString();
+
+ SimpleString address = removePrefix(message.getAddressSimpleString());
// In case the prefix was removed, we also need to update the message
- if (address != message.getAddress()) {
+ if (address != message.getAddressSimpleString()) {
message.setAddress(address);
}
@@ -1340,14 +1325,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
if (address == null) {
- if (message.isDurable()) {
- // We need to force a re-encode when the message gets persisted or when it gets reloaded
- // it will have no address
- message.setAddress(defaultAddress);
- } else {
- // We don't want to force a re-encode when the message gets sent to the consumer
- message.setAddressTransient(defaultAddress);
- }
+ // We don't want to force a re-encode when the message gets sent to the consumer
+ message.setAddress(defaultAddress);
}
if (logger.isTraceEnabled()) {
@@ -1359,42 +1338,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
throw ActiveMQMessageBundle.BUNDLE.noAddress();
}
- if (message.getAddress().equals(managementAddress)) {
+ if (message.getAddressSimpleString().equals(managementAddress)) {
// It's a management message
handleManagementMessage(tx, message, direct);
} else {
- result = doSend(tx, message, direct, noAutoCreateQueue);
+ result = doSend(tx, message, originalAddress, direct, noAutoCreateQueue);
}
return result;
}
- @Override
- public void sendContinuations(final int packetSize,
- final long messageBodySize,
- final byte[] body,
- final boolean continues) throws Exception {
- if (currentLargeMessage == null) {
- throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised();
- }
-
- // Immediately release the credits for the continuations- these don't contribute to the in-memory size
- // of the message
-
- currentLargeMessage.addBytes(body);
-
- if (!continues) {
- currentLargeMessage.releaseResources();
-
- if (messageBodySize >= 0) {
- currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
- }
-
- doSend(tx, currentLargeMessage, false, false);
-
- currentLargeMessage = null;
- }
- }
@Override
public void requestProducerCredits(SimpleString address, final int credits) throws Exception {
@@ -1456,7 +1409,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public String[] getTargetAddresses() {
- Map<SimpleString, Pair<UUID, AtomicLong>> copy = cloneTargetAddresses();
+ Map<SimpleString, Pair<Object, AtomicLong>> copy = cloneTargetAddresses();
Iterator<SimpleString> iter = copy.keySet().iterator();
int num = copy.keySet().size();
String[] addresses = new String[num];
@@ -1470,7 +1423,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public String getLastSentMessageID(String address) {
- Pair<UUID, AtomicLong> value = targetAddressInfos.get(SimpleString.toSimpleString(address));
+ Pair<Object, AtomicLong> value = targetAddressInfos.get(SimpleString.toSimpleString(address));
if (value != null) {
return value.getA().toString();
} else {
@@ -1489,9 +1442,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void describeProducersInfo(JsonArrayBuilder array) throws Exception {
- Map<SimpleString, Pair<UUID, AtomicLong>> targetCopy = cloneTargetAddresses();
+ Map<SimpleString, Pair<Object, AtomicLong>> targetCopy = cloneTargetAddresses();
- for (Map.Entry<SimpleString, Pair<UUID, AtomicLong>> entry : targetCopy.entrySet()) {
+ for (Map.Entry<SimpleString, Pair<Object, AtomicLong>> entry : targetCopy.entrySet()) {
String uuid = null;
if (entry.getValue().getA() != null) {
uuid = entry.getValue().getA().toString();
@@ -1566,14 +1519,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
connectionFailed(me, failedOver);
}
- public void clearLargeMessage() {
- currentLargeMessage = null;
- }
-
private void installJMSHooks() {
}
- private Map<SimpleString, Pair<UUID, AtomicLong>> cloneTargetAddresses() {
+ private Map<SimpleString, Pair<Object, AtomicLong>> cloneTargetAddresses() {
return new HashMap<>(targetAddressInfos);
}
@@ -1588,10 +1537,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
private RoutingStatus handleManagementMessage(final Transaction tx,
- final ServerMessage message,
+ final Message message,
final boolean direct) throws Exception {
try {
- securityCheck(removePrefix(message.getAddress()), CheckType.MANAGE, this);
+ securityCheck(removePrefix(message.getAddressSimpleString()), CheckType.MANAGE, this);
} catch (ActiveMQException e) {
if (!autoCommitSends) {
tx.markAsRollbackOnly(e);
@@ -1599,9 +1548,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
throw e;
}
- ServerMessage reply = managementService.handleMessage(message);
+ Message reply = managementService.handleMessage(message);
- SimpleString replyTo = message.getSimpleStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME);
+ SimpleString replyTo = message.getReplyTo();
if (replyTo != null) {
// TODO: move this check somewhere else? this is a JMS-specific bit of logic in the core impl
@@ -1612,7 +1561,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
reply.setAddress(replyTo);
- doSend(tx, reply, direct, false);
+ doSend(tx, reply, null, direct, false);
}
return RoutingStatus.OK;
@@ -1669,21 +1618,26 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
theTx.rollback();
}
+ @Override
public RoutingStatus doSend(final Transaction tx,
- final ServerMessage msg,
+ final Message msg,
+ final SimpleString originalAddress,
final boolean direct,
final boolean noAutoCreateQueue) throws Exception {
RoutingStatus result = RoutingStatus.OK;
- /**
- * TODO Checking message properties on each message is expensive. Instead we should update the API and Core Packets
- * to add the RoutingType information directly.
- */
- RoutingType routingType = null;
- if (msg.containsProperty(Message.HDR_ROUTING_TYPE)) {
- routingType = RoutingType.getType(msg.getByteProperty(Message.HDR_ROUTING_TYPE));
- }
- Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddress(), routingType);
+ RoutingType routingType = msg.getRouteType();
+
+ /* TODO-now: How to address here with AMQP?
+ if (originalAddress != null) {
+ if (originalAddress.toString().startsWith("anycast:")) {
+ routingType = RoutingType.ANYCAST;
+ } else if (originalAddress.toString().startsWith("multicast:")) {
+ routingType = RoutingType.MULTICAST;
+ }
+ } */
+
+ Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
// Consumer
// check the user has write access to this address.
@@ -1707,10 +1661,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
result = postOffice.route(msg, routingContext, direct);
- Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
+ Pair<Object, AtomicLong> value = targetAddressInfos.get(msg.getAddressSimpleString());
if (value == null) {
- targetAddressInfos.put(msg.getAddress(), new Pair<>(msg.getUserID(), new AtomicLong(1)));
+ targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(), new AtomicLong(1)));
} else {
value.setA(msg.getUserID());
value.getB().incrementAndGet();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
index 0222928..29a2e47 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
@@ -21,6 +21,9 @@ import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
@@ -41,8 +44,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -128,5 +129,5 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
Object[] getResources(Class<?> resourceType);
- ServerMessage handleMessage(ServerMessage message) throws Exception;
+ ICoreMessage handleMessage(Message message) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 55f2aea..f45aea7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -33,7 +33,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.AcceptorControl;
@@ -56,6 +59,7 @@ import org.apache.activemq.artemis.core.management.impl.BroadcastGroupControlImp
import org.apache.activemq.artemis.core.management.impl.ClusterConnectionControlImpl;
import org.apache.activemq.artemis.core.management.impl.DivertControlImpl;
import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
@@ -71,13 +75,10 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
@@ -365,9 +366,11 @@ public class ManagementServiceImpl implements ManagementService {
}
@Override
- public ServerMessage handleMessage(final ServerMessage message) throws Exception {
+ public ICoreMessage handleMessage(Message message) throws Exception {
+ message = message.toCore();
// a reply message is sent with the result stored in the message body.
- ServerMessage reply = new ServerMessageImpl(storageManager.generateID(), 512);
+ CoreMessage reply = new CoreMessage(storageManager.generateID(), 512);
+ reply.setReplyTo(message.getReplyTo());
String resourceName = message.getStringProperty(ManagementHelper.HDR_RESOURCE_NAME);
if (logger.isDebugEnabled()) {
@@ -631,7 +634,7 @@ public class ManagementServiceImpl implements ManagementService {
long messageID = storageManager.generateID();
- ServerMessage notificationMessage = new ServerMessageImpl(messageID, 512);
+ Message notificationMessage = new CoreMessage(messageID, 512);
// Notification messages are always durable so the user can choose whether to add a durable queue to
// consume them in
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java
index efe4cf9..0ee1b7d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java
@@ -26,8 +26,8 @@ import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.utils.JsonLoader;
@@ -97,7 +97,7 @@ public abstract class TransactionDetail {
msgJson.add(KEY_MSG_OP_TYPE, opType);
- ServerMessage msg = ref.getMessage().copy();
+ Message msg = ref.getMessage().copy();
msgJson.add(KEY_MSG_TYPE, decodeMessageType(msg));
JsonUtil.addToObject(KEY_MSG_PROPERTIES, decodeMessageProperties(msg), msgJson);
@@ -108,7 +108,7 @@ public abstract class TransactionDetail {
return detailJson.build();
}
- public abstract String decodeMessageType(ServerMessage msg);
+ public abstract String decodeMessageType(Message msg);
- public abstract Map<String, Object> decodeMessageProperties(ServerMessage msg);
+ public abstract Map<String, Object> decodeMessageProperties(Message msg);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
index 4730596..95036da 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.transaction.impl;
import javax.transaction.xa.Xid;
import java.util.Map;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionDetail;
@@ -31,8 +31,11 @@ public class CoreTransactionDetail extends TransactionDetail {
}
@Override
- public String decodeMessageType(ServerMessage msg) {
- int type = msg.getType();
+ public String decodeMessageType(Message msg) {
+ if (!(msg instanceof ICoreMessage)) {
+ return "N/A";
+ }
+ int type = ((ICoreMessage)msg).getType();
switch (type) {
case Message.DEFAULT_TYPE: // 0
return "Default";
@@ -52,7 +55,7 @@ public class CoreTransactionDetail extends TransactionDetail {
}
@Override
- public Map<String, Object> decodeMessageProperties(ServerMessage msg) {
+ public Map<String, Object> decodeMessageProperties(Message msg) {
return msg.toMap();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
index a342e13..a440e31 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
@@ -16,12 +16,12 @@
*/
package org.apache.activemq.artemis.spi.core.protocol;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
-// TODO: use this interface properly on OpenWire
-public interface MessageConverter {
+public interface MessageConverter<ProtocolMessage extends Message> {
- ServerMessage inbound(Object messageInbound) throws Exception;
+ ICoreMessage toCore(ProtocolMessage pureMessage) throws Exception;
- Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception;
+ ProtocolMessage fromCore(ICoreMessage coreMessage) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
new file mode 100644
index 0000000..14891f5
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.spi.core.protocol;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.jboss.logging.Logger;
+
+public class MessagePersister implements Persister<Message> {
+
+ private static final Logger logger = Logger.getLogger(MessagePersister.class);
+
+ private static final MessagePersister theInstance = new MessagePersister();
+
+ /** This will be used for reading messages */
+ private static Map<Byte, Persister<Message>> protocols = new ConcurrentHashMap<>();
+
+
+ public static void registerProtocol(ProtocolManagerFactory manager) {
+ Persister<Message> messagePersister = manager.getPersister();
+ if (messagePersister == null) {
+ logger.warn("Cannot find persister for " + manager);
+ } else {
+ registerPersister(manager.getStoreID(), manager.getPersister());
+ }
+ }
+
+ public static void clearPersisters() {
+ protocols.clear();
+ }
+
+ public static void registerPersister(byte recordType, Persister<Message> persister) {
+ protocols.put(recordType, persister);
+ }
+
+ public static MessagePersister getInstance() {
+ return theInstance;
+ }
+
+
+ protected MessagePersister() {
+ }
+
+ protected byte getID() {
+ return (byte)0;
+ }
+
+ @Override
+ public int getEncodeSize(Message record) {
+ return 0;
+ }
+
+
+ /** Sub classes must add the first short as the protocol-id */
+ @Override
+ public void encode(ActiveMQBuffer buffer, Message record) {
+ buffer.writeByte(getID());
+ }
+
+ @Override
+ public Message decode(ActiveMQBuffer buffer, Message record) {
+ byte protocol = buffer.readByte();
+ Persister<Message> persister = protocols.get(protocol);
+ if (persister == null) {
+ throw new NullPointerException("couldn't find factory for type=" + protocol);
+ }
+ return persister.decode(buffer, record);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
index 890fbfe..e29d74d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
@@ -22,12 +22,14 @@ import java.util.Map;
import io.netty.channel.ChannelPipeline;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
+/**
+ * Info: ProtocolManager is loaded by {@link org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl#loadProtocolManagerFactories(Iterable)} */
public interface ProtocolManager<P extends BaseInterceptor> {
ProtocolManagerFactory<P> getFactory();
@@ -51,14 +53,6 @@ public interface ProtocolManager<P extends BaseInterceptor> {
boolean isProtocol(byte[] array);
/**
- * Gets the Message Converter towards ActiveMQ Artemis.
- * Notice this being null means no need to convert
- *
- * @return
- */
- MessageConverter getConverter();
-
- /**
* If this protocols accepts connectoins without an initial handshake.
* If true this protocol will be the failback case no other connections are made.
* New designed protocols should always require a handshake. This is only useful for legacy protocols.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
index d3b1b2e..9574540 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
@@ -20,10 +20,25 @@ import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
public interface ProtocolManagerFactory<P extends BaseInterceptor> {
+ /** This is to be used to store the protocol-id on Messages.
+ * Messages are stored on their bare format.
+ * The protocol manager will be responsible to code or decode messages.
+ * The caveat here is that the first short-sized bytes need to be this constant. */
+ default byte getStoreID() {
+ return (byte)0;
+ }
+
+ default Persister<Message> getPersister() {
+ return null;
+ }
+
+
/**
* When you create the ProtocolManager, you should filter out any interceptors that won't belong
* to this Protocol.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index ee236c7..799e8b0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -16,10 +16,10 @@
*/
package org.apache.activemq.artemis.spi.core.protocol;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
public interface SessionCallback {
@@ -55,10 +55,10 @@ public interface SessionCallback {
// and I wanted to avoid re-fetching paged data in case of GCs on this specific case.
//
// Future developments may change this, but beware why I have chosen to keep the parameter separated here
- int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumerID, int deliveryCount);
+ int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount);
int sendLargeMessage(MessageReference reference,
- ServerMessage message,
+ Message message,
ServerConsumer consumerID,
long bodySize,
int deliveryCount);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 0c33a35..6fdef44 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -682,22 +682,6 @@
</xsd:annotation>
</xsd:element>
- <xsd:element name="perf-blast-pages" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
- <xsd:annotation>
- <xsd:documentation>
- XXX Only meant to be used by project developers
- </xsd:documentation>
- </xsd:annotation>
- </xsd:element>
-
- <xsd:element name="run-sync-speed-test" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
- <xsd:annotation>
- <xsd:documentation>
- XXX Only meant to be used by project developers
- </xsd:documentation>
- </xsd:annotation>
- </xsd:element>
-
<xsd:element name="server-dump-interval" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index f374979..5e9a95a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -77,7 +77,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
Assert.assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, conf.getJournalBufferSize_AIO());
Assert.assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, conf.getJournalBufferSize_NIO());
Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate(), conf.isLogJournalWriteRate());
- Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages(), conf.getJournalPerfBlastPages());
Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultMessageCounterEnabled(), conf.isMessageCounterEnabled());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMessageCounterMaxDayHistory(), conf.getMessageCounterMaxDayHistory());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMessageCounterSamplePeriod(), conf.getMessageCounterSamplePeriod());
@@ -232,10 +231,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
conf.setLogJournalWriteRate(b);
Assert.assertEquals(b, conf.isLogJournalWriteRate());
- i = RandomUtil.randomInt();
- conf.setJournalPerfBlastPages(i);
- Assert.assertEquals(i, conf.getJournalPerfBlastPages());
-
l = RandomUtil.randomLong();
conf.setServerDumpInterval(l);
Assert.assertEquals(l, conf.getServerDumpInterval());
@@ -434,10 +429,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
conf.setLogJournalWriteRate(b);
Assert.assertEquals(b, conf.isLogJournalWriteRate());
- i = RandomUtil.randomInt();
- conf.setJournalPerfBlastPages(i);
- Assert.assertEquals(i, conf.getJournalPerfBlastPages());
-
l = RandomUtil.randomLong();
conf.setServerDumpInterval(l);
Assert.assertEquals(l, conf.getServerDumpInterval());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java
index d73accd..1eb749b 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java
@@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.filter.impl;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.tests.util.SilentTestCase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
@@ -35,13 +35,13 @@ public class FilterTest extends SilentTestCase {
private Filter filter;
- private ServerMessage message;
+ private Message message;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
- message = new ServerMessageImpl(1, 1000);
+ message = new CoreMessage().initBuffer(1024).setMessageID(1);
}
@Test
@@ -59,7 +59,7 @@ public class FilterTest extends SilentTestCase {
message.putStringProperty(new SimpleString("color"), new SimpleString("RED"));
Assert.assertTrue(filter.match(message));
- message = new ServerMessageImpl();
+ message = new CoreMessage();
Assert.assertFalse(filter.match(message));
}
@@ -94,7 +94,7 @@ public class FilterTest extends SilentTestCase {
filter = FilterImpl.createFilter(new SimpleString("AMQDurable='NON_DURABLE'"));
- message = new ServerMessageImpl();
+ message = new CoreMessage();
message.setDurable(true);
Assert.assertFalse(filter.match(message));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
index 0e9a3f2..2f18c21 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
@@ -23,6 +23,9 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
@@ -43,8 +46,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -329,7 +330,7 @@ public class ClusteredResetMockTest extends ActiveMQTestBase {
}
@Override
- public ServerMessage handleMessage(ServerMessage message) throws Exception {
+ public ICoreMessage handleMessage(Message message) throws Exception {
return null;
}