You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/03 01:05:13 UTC
[20/36] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
ARTEMIS-1009 Pure Message Encoding.
with this we could send and receive message in their raw format,
without requiring conversions to Core.
- MessageImpl and ServerMessage are removed as part of this
- AMQPMessage and CoreMessage will have the specialized message format for each protocol
- The protocol manager is now responsible to send the message
- The message will provide an encoder for journal and paging
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/64681865
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/64681865
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/64681865
Branch: refs/heads/artemis-1009
Commit: 64681865090a77dc8ea7fe653379d06a67849ab8
Parents: 503b112
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Feb 20 15:55:15 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 2 20:04:30 2017 -0500
----------------------------------------------------------------------
.../artemis/cli/commands/tools/PrintData.java | 7 +
.../cli/commands/tools/XmlDataExporter.java | 18 +-
.../cli/commands/tools/XmlDataExporterUtil.java | 7 +-
.../cli/commands/tools/XmlDataImporter.java | 3 +-
.../org/apache/activemq/artemis/Closeable.java | 22 +
.../artemis/api/core/ActiveMQBuffer.java | 13 +
.../artemis/api/core/ActiveMQBuffers.java | 15 +
.../activemq/artemis/api/core/SimpleString.java | 34 +
.../core/buffers/impl/ChannelBufferWrapper.java | 83 +-
.../artemis/core/persistence/Persister.java | 30 +
.../apache/activemq/artemis/utils/ByteUtil.java | 8 +
.../activemq/artemis/utils/TypedProperties.java | 74 +-
.../apache/activemq/artemis/utils/UTF8Util.java | 36 +-
.../artemis/utils/TypedPropertiesTest.java | 10 +-
.../config/ActiveMQDefaultConfiguration.java | 20 -
.../activemq/artemis/api/core/Message.java | 627 +++++-----
.../artemis/api/core/RefCountMessage.java | 81 ++
.../api/core/RefCountMessageListener.java | 31 +
.../artemis/api/core/client/ClientMessage.java | 8 -
.../artemis/api/core/encode/BodyType.java | 22 +
.../artemis/api/core/encode/MessageBody.java | 28 +
.../impl/ResetLimitWrappedActiveMQBuffer.java | 24 +-
.../client/impl/ClientLargeMessageImpl.java | 22 +-
.../core/client/impl/ClientMessageImpl.java | 74 +-
.../core/client/impl/ClientMessageInternal.java | 5 +
.../core/client/impl/ClientProducerImpl.java | 41 +-
.../CompressedLargeMessageControllerImpl.java | 6 +
.../client/impl/LargeMessageControllerImpl.java | 15 +
.../artemis/core/message/BodyEncoder.java | 55 -
.../artemis/core/message/LargeBodyEncoder.java | 55 +
.../artemis/core/message/impl/CoreMessage.java | 1066 ++++++++++++++++++
.../core/message/impl/CoreMessagePersister.java | 66 ++
.../artemis/core/message/impl/MessageImpl.java | 1059 -----------------
.../core/message/impl/MessageInternal.java | 57 -
.../core/impl/ActiveMQSessionContext.java | 16 +-
.../core/protocol/core/impl/ChannelImpl.java | 1 +
.../core/protocol/core/impl/PacketImpl.java | 30 +-
.../core/impl/RemotingConnectionImpl.java | 1 +
.../core/impl/wireformat/MessagePacket.java | 18 +-
.../SessionReceiveClientLargeMessage.java | 5 +-
.../wireformat/SessionReceiveLargeMessage.java | 14 +-
.../impl/wireformat/SessionReceiveMessage.java | 60 +-
.../SessionSendContinuationMessage.java | 8 +-
.../wireformat/SessionSendLargeMessage.java | 12 +-
.../impl/wireformat/SessionSendMessage.java | 54 +-
.../activemq/artemis/reader/MapMessageUtil.java | 4 +-
.../spi/core/remoting/SessionContext.java | 11 +-
.../artemis/message/CoreMessageTest.java | 365 ++++++
.../jdbc/store/journal/JDBCJournalImpl.java | 36 +-
.../jdbc/store/journal/JDBCJournalRecord.java | 7 +-
.../jms/client/ActiveMQBytesMessage.java | 4 +-
.../artemis/jms/client/ActiveMQMessage.java | 8 +-
.../jms/transaction/JMSTransactionDetail.java | 6 +-
.../artemis/core/journal/EncoderPersister.java | 51 +
.../activemq/artemis/core/journal/Journal.java | 55 +-
.../journal/impl/AbstractJournalUpdateTask.java | 3 +-
.../core/journal/impl/FileWrapperJournal.java | 26 +-
.../artemis/core/journal/impl/JournalBase.java | 63 +-
.../core/journal/impl/JournalCompactor.java | 9 +-
.../artemis/core/journal/impl/JournalImpl.java | 62 +-
.../impl/dataformat/JournalAddRecord.java | 20 +-
.../impl/dataformat/JournalAddRecordTX.java | 17 +-
.../protocol/amqp/broker/AMQPMessage.java | 761 +++++++++++++
.../amqp/broker/AMQPMessagePersister.java | 75 ++
.../amqp/broker/AMQPSessionCallback.java | 33 +-
.../broker/ProtonProtocolManagerFactory.java | 14 +
.../amqp/converter/ProtonMessageConverter.java | 12 +-
.../converter/jms/ServerJMSBytesMessage.java | 8 +-
.../amqp/converter/jms/ServerJMSMapMessage.java | 3 +-
.../amqp/converter/jms/ServerJMSMessage.java | 21 +-
.../converter/jms/ServerJMSObjectMessage.java | 3 +-
.../converter/jms/ServerJMSStreamMessage.java | 5 +-
.../converter/jms/ServerJMSTextMessage.java | 3 +-
.../converter/message/AMQPMessageSupport.java | 28 +-
.../message/AMQPNativeInboundTransformer.java | 44 -
.../message/AMQPRawInboundTransformer.java | 62 -
.../converter/message/InboundTransformer.java | 5 +-
.../message/JMSMappingInboundTransformer.java | 34 +-
.../message/JMSMappingOutboundTransformer.java | 3 +-
.../amqp/proton/AMQPConnectionContext.java | 4 +
.../proton/ProtonServerReceiverContext.java | 32 +-
.../amqp/proton/ProtonServerSenderContext.java | 25 +-
.../amqp/proton/ProtonTransactionHandler.java | 3 +-
.../amqp/proton/handler/ProtonHandler.java | 2 +-
.../protocol/amqp/util/DeliveryUtil.java | 13 -
.../protocol/amqp/util/NettyReadable.java | 139 +++
.../amqp/converter/TestConversions.java | 62 +-
.../JMSMappingInboundTransformerTest.java | 6 +-
.../JMSMappingOutboundTransformerTest.java | 38 +-
.../JMSTransformationSpeedComparisonTest.java | 36 +-
.../message/MessageTransformationTest.java | 19 +-
.../protocol/amqp/message/AMQPMessageTest.java | 63 ++
.../core/protocol/mqtt/MQTTPublishManager.java | 22 +-
.../protocol/mqtt/MQTTRetainMessageManager.java | 8 +-
.../core/protocol/mqtt/MQTTSessionCallback.java | 10 +-
.../artemis/core/protocol/mqtt/MQTTUtil.java | 15 +-
.../openwire/OpenWireMessageConverter.java | 20 +-
.../core/protocol/openwire/amq/AMQConsumer.java | 6 +-
.../core/protocol/openwire/amq/AMQSession.java | 9 +-
.../protocol/openwire/util/OpenWireUtil.java | 7 +-
.../ActiveMQStompProtocolMessageBundle.java | 3 +-
.../core/protocol/stomp/StompConnection.java | 14 +-
.../protocol/stomp/StompProtocolManager.java | 6 +-
.../core/protocol/stomp/StompSession.java | 35 +-
.../artemis/core/protocol/stomp/StompUtils.java | 6 +-
.../stomp/VersionedStompFrameHandler.java | 19 +-
.../stomp/v12/StompFrameHandlerV12.java | 4 +-
.../artemis/core/config/Configuration.java | 8 -
.../core/config/impl/ConfigurationImpl.java | 32 -
.../deployers/impl/FileConfigurationParser.java | 4 -
.../activemq/artemis/core/filter/Filter.java | 4 +-
.../artemis/core/filter/impl/FilterImpl.java | 12 +-
.../management/impl/AddressControlImpl.java | 6 +-
.../core/management/impl/QueueControlImpl.java | 10 +-
.../impl/openmbean/OpenTypeSupport.java | 16 +-
.../artemis/core/paging/PagedMessage.java | 5 +-
.../artemis/core/paging/PagingStore.java | 8 +-
.../core/paging/cursor/PagedReferenceImpl.java | 16 +-
.../cursor/impl/PageSubscriptionImpl.java | 4 +-
.../activemq/artemis/core/paging/impl/Page.java | 2 +-
.../core/paging/impl/PagedMessageImpl.java | 66 +-
.../core/paging/impl/PagingStoreImpl.java | 52 +-
.../core/persistence/StorageManager.java | 16 +-
.../journal/AbstractJournalStorageManager.java | 60 +-
.../impl/journal/AddMessageRecord.java | 8 +-
.../impl/journal/DescribeJournal.java | 17 +-
.../impl/journal/JournalRecordIds.java | 3 +
.../impl/journal/JournalStorageManager.java | 14 +-
.../journal/LargeMessageTXFailureCallback.java | 6 +-
.../impl/journal/LargeServerMessageImpl.java | 117 +-
.../journal/LargeServerMessagePersister.java | 73 ++
.../journal/codec/LargeMessageEncoding.java | 55 -
.../journal/codec/LargeMessagePersister.java | 63 ++
.../nullpm/NullStorageLargeServerMessage.java | 18 +-
.../impl/nullpm/NullStorageManager.java | 16 +-
.../artemis/core/postoffice/Binding.java | 9 +-
.../artemis/core/postoffice/Bindings.java | 6 +-
.../artemis/core/postoffice/PostOffice.java | 18 +-
.../core/postoffice/impl/BindingsImpl.java | 27 +-
.../core/postoffice/impl/DivertBinding.java | 8 +-
.../core/postoffice/impl/LocalQueueBinding.java | 8 +-
.../core/postoffice/impl/PostOfficeImpl.java | 84 +-
.../core/protocol/ServerPacketDecoder.java | 6 +-
.../core/ServerSessionPacketHandler.java | 82 +-
.../core/impl/ActiveMQPacketHandler.java | 2 +-
.../core/impl/CoreProtocolManagerFactory.java | 14 +
.../protocol/core/impl/CoreSessionCallback.java | 8 +-
.../impl/wireformat/ReplicationAddMessage.java | 14 +-
.../wireformat/ReplicationAddTXMessage.java | 13 +-
.../wireformat/ReplicationPageWriteMessage.java | 2 +-
.../core/remoting/server/RemotingService.java | 4 +
.../server/impl/RemotingServiceImpl.java | 11 +-
.../core/replication/ReplicatedJournal.java | 52 +-
.../core/replication/ReplicationEndpoint.java | 7 +-
.../core/replication/ReplicationManager.java | 11 +-
.../core/server/ActiveMQServerLogger.java | 9 +-
.../activemq/artemis/core/server/Bindable.java | 6 +-
.../artemis/core/server/LargeServerMessage.java | 3 +-
.../artemis/core/server/MessageReference.java | 10 +-
.../activemq/artemis/core/server/Queue.java | 3 +-
.../artemis/core/server/ServerMessage.java | 78 --
.../artemis/core/server/ServerSession.java | 22 +-
.../core/server/cluster/Transformer.java | 4 +-
.../core/server/cluster/impl/BridgeImpl.java | 17 +-
.../cluster/impl/ClusterConnectionBridge.java | 14 +-
.../core/server/cluster/impl/Redistributor.java | 3 +-
.../cluster/impl/RemoteQueueBindingImpl.java | 14 +-
.../core/server/impl/ActiveMQServerImpl.java | 1 +
.../artemis/core/server/impl/DivertImpl.java | 10 +-
.../artemis/core/server/impl/JournalLoader.java | 6 +-
.../core/server/impl/LastValueQueue.java | 10 +-
.../core/server/impl/MessageReferenceImpl.java | 24 +-
.../server/impl/PostOfficeJournalLoader.java | 7 +-
.../artemis/core/server/impl/QueueImpl.java | 65 +-
.../artemis/core/server/impl/RefsOperation.java | 4 +-
.../core/server/impl/ScaleDownHandler.java | 45 +-
.../core/server/impl/ServerConsumerImpl.java | 27 +-
.../core/server/impl/ServerMessageImpl.java | 341 ------
.../core/server/impl/ServerSessionImpl.java | 118 +-
.../server/management/ManagementService.java | 5 +-
.../management/impl/ManagementServiceImpl.java | 12 +-
.../core/transaction/TransactionDetail.java | 9 +-
.../transaction/impl/CoreTransactionDetail.java | 6 +-
.../spi/core/protocol/MessageConverter.java | 7 +-
.../spi/core/protocol/MessagePersister.java | 88 ++
.../spi/core/protocol/ProtocolManager.java | 2 +
.../core/protocol/ProtocolManagerFactory.java | 15 +
.../spi/core/protocol/SessionCallback.java | 6 +-
.../resources/schema/artemis-configuration.xsd | 16 -
.../core/config/impl/ConfigurationImplTest.java | 9 -
.../artemis/core/filter/impl/FilterTest.java | 12 +-
.../group/impl/ClusteredResetMockTest.java | 5 +-
.../impl/ScheduledDeliveryHandlerTest.java | 196 ++--
.../transaction/impl/TransactionImplTest.java | 16 +-
.../artemis/tests/util/ActiveMQTestBase.java | 13 +-
.../resources/ConfigurationTest-full-config.xml | 2 -
.../test/resources/artemis-configuration.xsd | 16 -
.../jms/example/HatColourChangeTransformer.java | 2 +-
.../example/AddForwardingTimeTransformer.java | 2 +-
pom.xml | 3 +-
.../amqp/client/util/UnmodifiableDelivery.java | 5 +
.../journal/gcfree/EncodersBench.java | 5 +-
.../byteman/JMSBridgeReconnectionTest.java | 4 +-
.../tests/extras/byteman/MessageCopyTest.java | 163 ---
.../integration/DuplicateDetectionTest.java | 6 +-
.../tests/integration/amqp/ProtonTest.java | 22 +-
.../integration/client/AckBatchSizeTest.java | 14 +-
.../integration/client/AcknowledgeTest.java | 177 ++-
.../tests/integration/client/ConsumerTest.java | 163 ++-
.../integration/client/HangConsumerTest.java | 7 +-
.../InVMNonPersistentMessageBufferTest.java | 36 +-
.../client/InterruptedLargeMessageTest.java | 10 +-
.../integration/client/LargeMessageTest.java | 4 +-
.../integration/cluster/bridge/BridgeTest.java | 10 +-
.../cluster/bridge/SimpleTransformer.java | 61 +-
.../distribution/ClusterHeadersRemovedTest.java | 5 +-
.../distribution/MessageRedistributionTest.java | 4 +-
.../tests/integration/divert/DivertTest.java | 5 +-
.../interceptors/InterceptorTest.java | 8 +-
.../integration/journal/MessageJournalTest.java | 133 +++
.../journal/NIOJournalCompactTest.java | 6 +-
.../integration/karaf/ContainerBaseTest.java | 64 ++
.../tests/integration/karaf/KarafBaseTest.java | 212 ++++
.../karaf/distribution/ArtemisFeatureTest.java | 101 ++
.../karaf/distribution/package-info.java | 21 +
.../karaf/version/ProbeRemoteServer.java | 51 +
.../integration/karaf/version/RemoteTest.java | 38 +
.../karaf/version/VersionWireTest.java | 104 ++
.../integration/karaf/version/package-info.java | 21 +
.../management/ManagementServiceImplTest.java | 24 +-
.../DeleteMessagesOnStartupTest.java | 10 +-
.../replication/ReplicationTest.java | 71 +-
.../integration/server/FakeStorageManager.java | 6 +-
.../storage/PersistMultiThreadTest.java | 31 +-
.../stress/paging/PageCursorStressTest.java | 24 +-
.../core/server/impl/QueueConcurrentTest.java | 6 +-
tests/unit-tests/pom.xml | 6 +
.../core/journal/impl/JournalImplTestUnit.java | 2 +-
.../unit/core/message/impl/MessageImplTest.java | 9 +-
.../tests/unit/core/paging/impl/PageTest.java | 42 +-
.../core/paging/impl/PagingManagerImplTest.java | 16 +-
.../core/paging/impl/PagingStoreImplTest.java | 38 +-
.../core/postoffice/impl/BindingsImplTest.java | 16 +-
.../unit/core/postoffice/impl/FakeQueue.java | 9 +-
.../impl/WildcardAddressManagerUnitTest.java | 12 +-
.../unit/core/server/impl/QueueImplTest.java | 4 +-
.../unit/core/server/impl/fakes/FakeFilter.java | 7 +-
.../server/impl/fakes/FakeJournalLoader.java | 6 +-
.../core/server/impl/fakes/FakePostOffice.java | 22 +-
.../tests/unit/util/FakePagingManager.java | 7 +-
.../artemis/tests/unit/util/MemorySizeTest.java | 4 +-
.../artemis/tests/unit/util/UTF8Test.java | 10 +-
252 files changed, 6442 insertions(+), 4208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
index 408aef5..2816aaf 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.cli.Artemis;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -50,16 +51,22 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory;
@Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)")
public class PrintData extends OptionalLocking {
+ static {
+ MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
+ }
+
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
index 4f99181..b57b5c5 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
@@ -50,7 +50,7 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -75,7 +75,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
@@ -367,7 +367,7 @@ public final class XmlDataExporter extends OptionalLocking {
// Order here is important. We must process the messages from the journal before we process those from the page
// files in order to get the messages in the right order.
for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet()) {
- printSingleMessageAsXML((ServerMessage) messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
+ printSingleMessageAsXML(messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
}
printPagedMessagesAsXML();
@@ -381,6 +381,8 @@ public final class XmlDataExporter extends OptionalLocking {
*/
private void printPagedMessagesAsXML() {
try {
+
+ // TODO-now: fix encodings
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
ExecutorFactory executorFactory = new ExecutorFactory() {
@@ -456,7 +458,7 @@ public final class XmlDataExporter extends OptionalLocking {
}
}
- private void printSingleMessageAsXML(ServerMessage message, List<String> queues) throws XMLStreamException {
+ private void printSingleMessageAsXML(Message message, List<String> queues) throws XMLStreamException {
xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
printMessageAttributes(message);
printMessageProperties(message);
@@ -466,7 +468,7 @@ public final class XmlDataExporter extends OptionalLocking {
messagesPrinted++;
}
- private void printMessageBody(ServerMessage message) throws XMLStreamException {
+ private void printMessageBody(Message message) throws XMLStreamException {
xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
if (message.isLargeMessage()) {
@@ -479,7 +481,7 @@ public final class XmlDataExporter extends OptionalLocking {
private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
- BodyEncoder encoder = null;
+ LargeBodyEncoder encoder = null;
try {
encoder = message.getBodyEncoder();
@@ -522,7 +524,7 @@ public final class XmlDataExporter extends OptionalLocking {
xmlWriter.writeEndElement(); // end QUEUES_PARENT
}
- private void printMessageProperties(ServerMessage message) throws XMLStreamException {
+ private void printMessageProperties(Message message) throws XMLStreamException {
xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
for (SimpleString key : message.getPropertyNames()) {
Object value = message.getObjectProperty(key);
@@ -539,7 +541,7 @@ public final class XmlDataExporter extends OptionalLocking {
xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
}
- private void printMessageAttributes(ServerMessage message) throws XMLStreamException {
+ private void printMessageAttributes(Message message) throws XMLStreamException {
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
index 8ee7678..a3807bd 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
@@ -17,10 +17,9 @@
package org.apache.activemq.artemis.cli.commands.tools;
import com.google.common.base.Preconditions;
-
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.utils.Base64;
/**
@@ -92,10 +91,10 @@ public class XmlDataExporterUtil {
* @param message
* @return
*/
- public static String encodeMessageBody(final ServerMessage message) {
+ public static String encodeMessageBody(final Message message) {
Preconditions.checkNotNull(message, "ServerMessage can not be null");
- int size = message.getEndOfBodyPosition() - message.getBodyBuffer().readerIndex();
+ int size = ((CoreMessage)message.toCore()).getEndOfBodyPosition() - message.getBodyBuffer().readerIndex();
byte[] buffer = new byte[size];
message.getBodyBuffer().readBytes(buffer);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
index 8e2bb9f..0f06738 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
@@ -59,7 +59,6 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.cli.commands.ActionAbstract;
import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -387,7 +386,7 @@ public final class XmlDataImporter extends ActionAbstract {
logger.debug(logMessage);
}
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+ message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
try (ClientProducer producer = session.createProducer(destination)) {
producer.send(message);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java
new file mode 100644
index 0000000..2f00c5d
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis;
+
+public interface Closeable {
+ void close(boolean failed);
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
index 5446f3f..3a208a6 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
@@ -1065,6 +1065,19 @@ public interface ActiveMQBuffer extends DataInput {
*/
void writeBytes(ByteBuffer src);
+
+ /**
+ * Transfers the specified source buffer's data to this buffer starting at
+ * the current {@code writerIndex} until the source buffer's position
+ * reaches its limit, and increases the {@code writerIndex} by the
+ * number of the transferred bytes.
+ *
+ * @param src The source buffer
+ * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than
+ * {@code this.writableBytes}
+ */
+ void writeBytes(ByteBuf src, int srcIndex, int length);
+
/**
* Returns a copy of this buffer's readable bytes. Modifying the content
* of the returned buffer or this buffer does not affect each other at all.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
index 32f9279..25fcfea 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.core;
import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
@@ -76,6 +77,20 @@ public final class ActiveMQBuffers {
}
/**
+ * Creates an ActiveMQBuffer wrapping an underlying ByteBuf
+ *
+ * The position on this buffer won't affect the position on the inner buffer
+ *
+ * @param underlying the underlying NIO ByteBuffer
+ * @return an ActiveMQBuffer wrapping the underlying NIO ByteBuffer
+ */
+ public static ActiveMQBuffer wrappedBuffer(final ByteBuf underlying) {
+ ActiveMQBuffer buff = new ChannelBufferWrapper(underlying.duplicate());
+
+ return buff;
+ }
+
+ /**
* Creates an ActiveMQBuffer wrapping an underlying byte array
*
* @param underlying the underlying byte array
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index b7f70c6..e8530e6 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -20,6 +20,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.utils.DataConstants;
/**
@@ -134,6 +135,39 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
}
+ public static SimpleString readNullableSimpleString(ByteBuf buffer) {
+ int b = buffer.readByte();
+ if (b == DataConstants.NULL) {
+ return null;
+ }
+ return readSimpleString(buffer);
+ }
+
+
+ public static SimpleString readSimpleString(ByteBuf buffer) {
+ int len = buffer.readInt();
+ byte[] data = new byte[len];
+ buffer.readBytes(data);
+ return new SimpleString(data);
+ }
+
+ public static void writeNullableSimpleString(ByteBuf buffer, SimpleString val) {
+ if (val == null) {
+ buffer.writeByte(DataConstants.NULL);
+ } else {
+ buffer.writeByte(DataConstants.NOT_NULL);
+ writeSimpleString(buffer, val);
+ }
+ }
+
+ public static void writeSimpleString(ByteBuf buffer, SimpleString val) {
+ byte[] data = val.getData();
+ buffer.writeInt(data.length);
+ buffer.writeBytes(data);
+ }
+
+
+
public SimpleString subSeq(final int start, final int end) {
int len = data.length >> 1;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
index 690dbd7..b2660fa 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
@@ -66,11 +66,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
@Override
public SimpleString readNullableSimpleString() {
- int b = buffer.readByte();
- if (b == DataConstants.NULL) {
- return null;
- }
- return readSimpleStringInternal();
+ return SimpleString.readNullableSimpleString(buffer);
}
@Override
@@ -84,14 +80,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
@Override
public SimpleString readSimpleString() {
- return readSimpleStringInternal();
- }
-
- private SimpleString readSimpleStringInternal() {
- int len = buffer.readInt();
- byte[] data = new byte[len];
- buffer.readBytes(data);
- return new SimpleString(data);
+ return SimpleString.readSimpleString(buffer);
}
@Override
@@ -111,11 +100,21 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
} else if (len < 0xfff) {
return readUTF();
} else {
- return readSimpleStringInternal().toString();
+ return SimpleString.readNullableSimpleString(buffer).toString();
}
}
@Override
+ public void writeNullableString(String val) {
+ UTF8Util.writeNullableString(buffer, val);
+ }
+
+ @Override
+ public void writeUTF(String utf) {
+ UTF8Util.saveUTF(buffer, utf);
+ }
+
+ @Override
public String readUTF() {
return UTF8Util.readUTF(this);
}
@@ -127,62 +126,17 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
@Override
public void writeNullableSimpleString(final SimpleString val) {
- if (val == null) {
- buffer.writeByte(DataConstants.NULL);
- } else {
- buffer.writeByte(DataConstants.NOT_NULL);
- writeSimpleStringInternal(val);
- }
- }
-
- @Override
- public void writeNullableString(final String val) {
- if (val == null) {
- buffer.writeByte(DataConstants.NULL);
- } else {
- buffer.writeByte(DataConstants.NOT_NULL);
- writeStringInternal(val);
- }
+ SimpleString.writeNullableSimpleString(buffer, val);
}
@Override
public void writeSimpleString(final SimpleString val) {
- writeSimpleStringInternal(val);
- }
-
- private void writeSimpleStringInternal(final SimpleString val) {
- byte[] data = val.getData();
- buffer.writeInt(data.length);
- buffer.writeBytes(data);
+ SimpleString.writeSimpleString(buffer, val);
}
@Override
public void writeString(final String val) {
- writeStringInternal(val);
- }
-
- private void writeStringInternal(final String val) {
- int length = val.length();
-
- buffer.writeInt(length);
-
- if (length < 9) {
- // If very small it's more performant to store char by char
- for (int i = 0; i < val.length(); i++) {
- buffer.writeShort((short) val.charAt(i));
- }
- } else if (length < 0xfff) {
- // Store as UTF - this is quicker than char by char for most strings
- writeUTF(val);
- } else {
- // Store as SimpleString, since can't store utf > 0xffff in length
- writeSimpleStringInternal(new SimpleString(val));
- }
- }
-
- @Override
- public void writeUTF(final String utf) {
- UTF8Util.saveUTF(this, utf);
+ UTF8Util.writeString(buffer, val);
}
@Override
@@ -576,6 +530,11 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
}
@Override
+ public void writeBytes(ByteBuf src, int srcIndex, int length) {
+ buffer.writeBytes(src, srcIndex, length);
+ }
+
+ @Override
public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) {
buffer.writeBytes(src.byteBuf(), srcIndex, length);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
new file mode 100644
index 0000000..fd68a77
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.persistence;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+
+public interface Persister<T extends Object> {
+
+ int getEncodeSize(T record);
+
+ void encode(ActiveMQBuffer buffer, T record);
+
+ T decode(ActiveMQBuffer buffer, T record);
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index bee8790..e70891d 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -101,6 +101,14 @@ public class ByteUtil {
}
public static String bytesToHex(byte[] bytes, int groupSize) {
+ if (bytes == null) {
+ return "NULL";
+ }
+
+ if (bytes.length == 0) {
+ return "[]";
+ }
+
char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)];
int outPos = 0;
for (int j = 0; j < bytes.length; j++) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
index 56cec48..a421484 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
@@ -24,7 +24,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
@@ -47,21 +47,23 @@ import static org.apache.activemq.artemis.utils.DataConstants.STRING;
* This implementation follows section 3.5.4 of the <i>Java Message Service</i> specification
* (Version 1.1 April 12, 2002).
* <p>
- * TODO - should have typed property getters and do conversions herein
*/
public final class TypedProperties {
- private static final SimpleString AMQ_PROPNAME = new SimpleString("_AMQ_");
-
private Map<SimpleString, PropertyValue> properties;
private volatile int size;
- private boolean internalProperties;
-
public TypedProperties() {
}
+ /**
+ * Return the number of properites
+ * */
+ public int size() {
+ return properties.size();
+ }
+
public int getMemoryOffset() {
// The estimate is basically the encode size + 2 object references for each entry in the map
// Note we don't include the attributes or anything else since they already included in the memory estimate
@@ -75,10 +77,6 @@ public final class TypedProperties {
size = other.size;
}
- public boolean hasInternalProperties() {
- return internalProperties;
- }
-
public void putBooleanProperty(final SimpleString key, final boolean value) {
checkCreateProperties();
doPutValue(key, new BooleanValue(value));
@@ -321,7 +319,7 @@ public final class TypedProperties {
}
}
- public synchronized void decode(final ActiveMQBuffer buffer) {
+ public synchronized void decode(final ByteBuf buffer) {
byte b = buffer.readByte();
if (b == DataConstants.NULL) {
@@ -406,7 +404,7 @@ public final class TypedProperties {
}
}
- public synchronized void encode(final ActiveMQBuffer buffer) {
+ public synchronized void encode(final ByteBuf buffer) {
if (properties == null) {
buffer.writeByte(DataConstants.NULL);
} else {
@@ -499,10 +497,6 @@ public final class TypedProperties {
}
private synchronized void doPutValue(final SimpleString key, final PropertyValue value) {
- if (key.startsWith(AMQ_PROPNAME)) {
- internalProperties = true;
- }
-
PropertyValue oldValue = properties.put(key, value);
if (oldValue != null) {
size += value.encodeSize() - oldValue.encodeSize();
@@ -547,7 +541,7 @@ public final class TypedProperties {
abstract Object getValue();
- abstract void write(ActiveMQBuffer buffer);
+ abstract void write(ByteBuf buffer);
abstract int encodeSize();
@@ -568,7 +562,7 @@ public final class TypedProperties {
}
@Override
- public void write(final ActiveMQBuffer buffer) {
+ public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.NULL);
}
@@ -587,7 +581,7 @@ public final class TypedProperties {
this.val = val;
}
- private BooleanValue(final ActiveMQBuffer buffer) {
+ private BooleanValue(final ByteBuf buffer) {
val = buffer.readBoolean();
}
@@ -597,7 +591,7 @@ public final class TypedProperties {
}
@Override
- public void write(final ActiveMQBuffer buffer) {
+ public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.BOOLEAN);
buffer.writeBoolean(val);
}
@@ -617,7 +611,7 @@ public final class TypedProperties {
this.val = val;
}
- private ByteValue(final ActiveMQBuffer buffer) {
+ private ByteValue(final ByteBuf buffer) {
val = buffer.readByte();
}
@@ -627,7 +621,7 @@ public final class TypedProperties {
}
@Override
- public void write(final ActiveMQBuffer buffer) {
+ public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.BYTE);
buffer.writeByte(val);
}
@@ -646,7 +640,7 @@ public final class TypedProperties {
this.val = val;
}
- private BytesValue(final ActiveMQBuffer buffer) {
+ private BytesValue(final ByteBuf buffer) {
int len = buffer.readInt();
val = new byte[len];
buffer.readBytes(val);
@@ -658,7 +652,7 @@ public final class TypedProperties {
}
@Override
- public void write(final ActiveMQBuffer buffer) {
+ public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.BYTES);
buffer.writeInt(val.length);
buffer.writeBytes(val);
@@ -679,7 +673,7 @@ public final class TypedProperties {
this.val = val;
}
- private ShortValue(final ActiveMQBuffer buffer) {
+ private ShortValue(final ByteBuf buffer) {
val = buffer.readShort();
}
@@ -689,7 +683,7 @@ public final class TypedProperties {
}
@Override
- public void write(final ActiveMQBuffer buffer) {
+ public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.SHORT);
buffer.writeShort(val);
}
@@ -708,7 +702,7 @@ public final class TypedProperties {
this.val = val;
}
- private IntValue(final ActiveMQBuffer buffer) {
+ private IntValue(final ByteBuf buffer) {
val = buffer.readInt();
}
@@ -718,7 +712,7 @@ public final class TypedProperties {
}
@Override
- public void write(final ActiveMQBuffer buffer) {
+ public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.INT);
buffer.writeInt(val);
}
@@ -737,7 +731,7 @@ public final class TypedProperties {
this.val = val;
}
- private LongValue(final ActiveMQBuffer buffer) {
+ private LongValue(final ByteBuf buffer) {
val = buffer.readLong();
}
@@ -747,7 +741,7 @@ public final class TypedProperties {
}
@Override
- public void write(final ActiveMQBuffer buffer) {
+ public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.LONG);
buffer.writeLong(val);
}
@@ -766,7 +760,7 @@ public final class TypedProperties {
this.val = val;
}
- private FloatValue(final ActiveMQBuffer buffer) {
+ private FloatValue(final ByteBuf buffer) {
val = Float.intBitsToFloat(buffer.readInt());
}
@@ -776,7 +770,7 @@ public final class TypedProperties {
}
@Override
- public void write(final ActiveMQBuffer buffer) {
+ public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.FLOAT);
buffer.writeInt(Float.floatToIntBits(val));
}
@@ -796,7 +790,7 @@ public final class TypedProperties {
this.val = val;
}
- private DoubleValue(final ActiveMQBuffer buffer) {
+ private DoubleValue(final ByteBuf buffer) {
val = Double.longBitsToDouble(buffer.readLong());
}
@@ -806,7 +800,7 @@ public final class TypedProperties {
}
@Override
- public void write(final ActiveMQBuffer buffer) {
+ public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.DOUBLE);
buffer.writeLong(Double.doubleToLongBits(val));
}
@@ -825,7 +819,7 @@ public final class TypedProperties {
this.val = val;
}
- private CharValue(final ActiveMQBuffer buffer) {
+ private CharValue(final ByteBuf buffer) {
val = (char) buffer.readShort();
}
@@ -835,7 +829,7 @@ public final class TypedProperties {
}
@Override
- public void write(final ActiveMQBuffer buffer) {
+ public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.CHAR);
buffer.writeShort((short) val);
}
@@ -854,8 +848,8 @@ public final class TypedProperties {
this.val = val;
}
- private StringValue(final ActiveMQBuffer buffer) {
- val = buffer.readSimpleString();
+ private StringValue(final ByteBuf buffer) {
+ val = SimpleString.readSimpleString(buffer);
}
@Override
@@ -864,9 +858,9 @@ public final class TypedProperties {
}
@Override
- public void write(final ActiveMQBuffer buffer) {
+ public void write(final ByteBuf buffer) {
buffer.writeByte(DataConstants.STRING);
- buffer.writeSimpleString(val);
+ SimpleString.writeSimpleString(buffer, val);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java
index e75395b..84e1557 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java
@@ -18,7 +18,9 @@ package org.apache.activemq.artemis.utils;
import java.lang.ref.SoftReference;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
import org.apache.activemq.artemis.logs.ActiveMQUtilLogger;
@@ -29,15 +31,43 @@ import org.apache.activemq.artemis.logs.ActiveMQUtilLogger;
*/
public final class UTF8Util {
+
+ private static final boolean isTrace = ActiveMQUtilLogger.LOGGER.isTraceEnabled();
+
+ private static final ThreadLocal<SoftReference<StringUtilBuffer>> currenBuffer = new ThreadLocal<>();
+
private UTF8Util() {
// utility class
}
+ public static void writeNullableString(ByteBuf buffer, final String val) {
+ if (val == null) {
+ buffer.writeByte(DataConstants.NULL);
+ } else {
+ buffer.writeByte(DataConstants.NOT_NULL);
+ writeString(buffer, val);
+ }
+ }
- private static final boolean isTrace = ActiveMQUtilLogger.LOGGER.isTraceEnabled();
+ public static void writeString(final ByteBuf buffer, final String val) {
+ int length = val.length();
- private static final ThreadLocal<SoftReference<StringUtilBuffer>> currenBuffer = new ThreadLocal<>();
+ buffer.writeInt(length);
+
+ if (length < 9) {
+ // If very small it's more performant to store char by char
+ for (int i = 0; i < val.length(); i++) {
+ buffer.writeShort((short) val.charAt(i));
+ }
+ } else if (length < 0xfff) {
+ // Store as UTF - this is quicker than char by char for most strings
+ saveUTF(buffer, val);
+ } else {
+ // Store as SimpleString, since can't store utf > 0xffff in length
+ SimpleString.writeSimpleString(buffer, new SimpleString(val));
+ }
+ }
- public static void saveUTF(final ActiveMQBuffer out, final String str) {
+ public static void saveUTF(final ByteBuf out, final String str) {
StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer();
if (str.length() > 0xffff) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
index 8013e96..cb6c8fe 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
@@ -187,12 +187,12 @@ public class TypedPropertiesTest {
props.putSimpleStringProperty(keyToRemove, RandomUtil.randomSimpleString());
ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024);
- props.encode(buffer);
+ props.encode(buffer.byteBuf());
Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex());
TypedProperties decodedProps = new TypedProperties();
- decodedProps.decode(buffer);
+ decodedProps.decode(buffer.byteBuf());
TypedPropertiesTest.assertEqualsTypeProperties(props, decodedProps);
@@ -200,7 +200,7 @@ public class TypedPropertiesTest {
// After removing a property, you should still be able to encode the Property
props.removeProperty(keyToRemove);
- props.encode(buffer);
+ props.encode(buffer.byteBuf());
Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex());
}
@@ -210,12 +210,12 @@ public class TypedPropertiesTest {
TypedProperties emptyProps = new TypedProperties();
ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024);
- emptyProps.encode(buffer);
+ emptyProps.encode(buffer.byteBuf());
Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex());
TypedProperties decodedProps = new TypedProperties();
- decodedProps.decode(buffer);
+ decodedProps.decode(buffer.byteBuf());
TypedPropertiesTest.assertEqualsTypeProperties(emptyProps, decodedProps);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 38ec105..c0d9db6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -262,12 +262,6 @@ public final class ActiveMQDefaultConfiguration {
// The minimal number of data files before we can start compacting
private static int DEFAULT_JOURNAL_COMPACT_MIN_FILES = 10;
- // XXX Only meant to be used by project developers
- private static int DEFAULT_JOURNAL_PERF_BLAST_PAGES = -1;
-
- // XXX Only meant to be used by project developers
- private static boolean DEFAULT_RUN_SYNC_SPEED_TEST = false;
-
// Interval to log server specific information (e.g. memory usage etc)
private static long DEFAULT_SERVER_DUMP_INTERVAL = -1;
@@ -801,20 +795,6 @@ public final class ActiveMQDefaultConfiguration {
}
/**
- * XXX Only meant to be used by project developers
- */
- public static int getDefaultJournalPerfBlastPages() {
- return DEFAULT_JOURNAL_PERF_BLAST_PAGES;
- }
-
- /**
- * XXX Only meant to be used by project developers
- */
- public static boolean isDefaultRunSyncSpeedTest() {
- return DEFAULT_RUN_SYNC_SPEED_TEST;
- }
-
- /**
* Interval to log server specific information (e.g. memory usage etc)
*/
public static long getDefaultServerDumpInterval() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 80116ed..4a5381c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -16,10 +16,14 @@
*/
package org.apache.activemq.artemis.api.core;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import org.apache.activemq.artemis.utils.UUID;
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.encode.BodyType;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.persistence.Persister;
/**
* A Message is a routable instance that has a payload.
@@ -48,9 +52,41 @@ import org.apache.activemq.artemis.utils.UUID;
* <p>
* If conversion is not allowed (for example calling {@code getFloatProperty} on a property set a
* {@code boolean}), a {@link ActiveMQPropertyConversionException} will be thrown.
+ *
+ *
+ * User cases that will be covered by Message
+ *
+ * Receiving a buffer:
+ *
+ * Message encode = new CoreMessage(); // or any other implementation
+ * encode.receiveBuffer(buffer);
+ *
+ *
+ * Sending to a buffer:
+ *
+ * Message encode;
+ * size = encode.getEncodeSize();
+ * encode.encodeDirectly(bufferOutput);
+ *
+ *
+ * Disabling temporary buffer:
+ *
+ * // This will make the message to only be encoded directly to the output stream, useful on client core API
+ * encode.disableInternalBuffer();
+
*/
public interface Message {
+
+ SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_AMQ_ROUTE_TO");
+
+ SimpleString HDR_SCALEDOWN_TO_IDS = new SimpleString("_AMQ_SCALEDOWN_TO");
+
+ SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_AMQ_ACK_ROUTE_TO");
+
+ // used by the bridges to set duplicates
+ SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_AMQ_BRIDGE_DUP");
+
/**
* the actual time the message was expired.
* * *
@@ -129,6 +165,60 @@ public interface Message {
byte STREAM_TYPE = 6;
+
+ void messageChanged();
+
+ /**
+ * Careful: Unless you are changing the body of the message, prefer getReadOnlyBodyBuffer
+ */
+ ActiveMQBuffer getBodyBuffer();
+
+ ActiveMQBuffer getReadOnlyBodyBuffer();
+
+ /** Used in the cases of large messages */
+ LargeBodyEncoder getBodyEncoder() throws ActiveMQException;
+
+ /** Context can be used by the application server to inject extra control, like a protocol specific on the server.
+ * There is only one per Object, use it wisely!
+ *
+ * Note: the intent of this was to replace PageStore reference on Message, but it will be later increased by adidn a ServerPojo
+ * */
+ RefCountMessageListener getContext();
+
+ Message setContext(RefCountMessageListener context);
+
+ /** The buffer will belong to this message, until release is called. */
+ Message setBuffer(ByteBuf buffer);
+
+ // TODO-now: Do we need this?
+ byte getType();
+
+ // TODO-now: Do we need this?
+ Message setType(byte type);
+
+ /**
+ * Returns whether this message is a <em>large message</em> or a regular message.
+ */
+ boolean isLargeMessage();
+
+ /**
+ * TODO: There's currently some treatment on LargeMessage that is done for server's side large message
+ * This needs to be refactored, this Method shouldn't be used at all.
+ * @Deprecated do not use this, internal use only. *It will* be removed for sure even on minor releases.
+ * */
+ @Deprecated
+ default boolean isServerMessage() {
+ return false;
+ }
+
+ ByteBuf getBuffer();
+
+ /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
+ Message copy();
+
+ /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
+ Message copy(long newID);
+
/**
* Returns the messageID.
* <br>
@@ -136,39 +226,43 @@ public interface Message {
*/
long getMessageID();
+ Message setMessageID(long id);
+
/**
- * Returns the userID - this is an optional user specified UUID that can be set to identify the message
- * and will be passed around with the message
- *
- * @return the user id
+ * Returns the expiration time of this message.
*/
- UUID getUserID();
+ long getExpiration();
/**
- * Sets the user ID
+ * Sets the expiration of this message.
*
- * @param userID
+ * @param expiration expiration time
*/
- Message setUserID(UUID userID);
+ Message setExpiration(long expiration);
/**
- * Returns the address this message is sent to.
+ * Returns whether this message is expired or not.
*/
- SimpleString getAddress();
+ default boolean isExpired() {
+ if (getExpiration() == 0) {
+ return false;
+ }
+
+ return System.currentTimeMillis() - getExpiration() >= 0;
+ }
+
/**
- * Sets the address to send this message to.
+ * Returns the userID - this is an optional user specified UUID that can be set to identify the message
+ * and will be passed around with the message
*
- * @param address address to send the message to
+ * @return the user id
*/
- Message setAddress(SimpleString address);
+ Object getUserID();
- /**
- * Returns this message type.
- * <p>
- * See fields {@literal *_TYPE} for possible values.
- */
- byte getType();
+ Message setUserID(Object userID);
+
+ void copyHeadersAndProperties(final Message msg);
/**
* Returns whether this message is durable or not.
@@ -182,36 +276,28 @@ public interface Message {
*/
Message setDurable(boolean durable);
- /**
- * Returns the expiration time of this message.
- */
- long getExpiration();
+ Persister<Message> getPersister();
- /**
- * Returns whether this message is expired or not.
- */
- boolean isExpired();
+ Object getProtocol();
- /**
- * Sets the expiration of this message.
- *
- * @param expiration expiration time
- */
- Message setExpiration(long expiration);
+ Message setProtocol(Object protocol);
+
+ Object getBody();
+
+ BodyType getBodyType();
+
+ Message setBody(BodyType type, Object body);
+
+ String getAddress();
+
+ Message setAddress(String address);
+
+ SimpleString getAddressSimpleString();
+
+ Message setAddress(SimpleString address);
- /**
- * Returns the message timestamp.
- * <br>
- * The timestamp corresponds to the time this message
- * was handled by an ActiveMQ Artemis server.
- */
long getTimestamp();
- /**
- * Sets the message timestamp.
- *
- * @param timestamp timestamp
- */
Message setTimestamp(long timestamp);
/**
@@ -230,164 +316,128 @@ public interface Message {
*/
Message setPriority(byte priority);
- /**
- * Returns the size of the <em>encoded</em> message.
- */
- int getEncodeSize();
+ /** Used to receive this message from an encoded medium buffer */
+ void receiveBuffer(ByteBuf buffer);
- /**
- * Returns whether this message is a <em>large message</em> or a regular message.
- */
- boolean isLargeMessage();
+ /** Used to send this message to an encoded medium buffer.
+ * @param buffer the buffer used.
+ * @param deliveryCount Some protocols (AMQP) will have this as part of the message. */
+ void sendBuffer(ByteBuf buffer, int deliveryCount);
- /**
- * Returns the message body as an ActiveMQBuffer
- */
- ActiveMQBuffer getBodyBuffer();
+ int getPersistSize();
- /**
- * Writes the input byte array to the message body ActiveMQBuffer
- */
- Message writeBodyBufferBytes(byte[] bytes);
+ void persist(ActiveMQBuffer targetRecord);
- /**
- * Writes the input String to the message body ActiveMQBuffer
- */
- Message writeBodyBufferString(String string);
+ void reloadPersistence(ActiveMQBuffer record);
- /**
- * Returns a <em>copy</em> of the message body as an ActiveMQBuffer. Any modification
- * of this buffer should not impact the underlying buffer.
- */
- ActiveMQBuffer getBodyBufferDuplicate();
+ default void releaseBuffer() {
+ ByteBuf buffer = getBuffer();
+ if (buffer != null) {
+ buffer.release();
+ }
+ setBuffer(null);
+ }
- // Properties
- // -----------------------------------------------------------------
+ default String getText() {
+ if (getBodyType() == BodyType.Text) {
+ return getBody().toString();
+ } else {
+ return null;
+ }
+ }
- /**
- * Puts a boolean property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putBooleanProperty(SimpleString key, boolean value);
+ // TODO-now: move this to some utility class
+ default void referenceOriginalMessage(final Message original, String originalQueue) {
+ String queueOnMessage = original.getStringProperty(Message.HDR_ORIGINAL_QUEUE.toString());
- /**
- * @see #putBooleanProperty(SimpleString, boolean)
- */
- Message putBooleanProperty(String key, boolean value);
+ if (queueOnMessage != null) {
+ putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), queueOnMessage);
+ } else if (originalQueue != null) {
+ putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), originalQueue);
+ }
- /**
- * Puts a byte property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putByteProperty(SimpleString key, byte value);
+ if (original.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) {
+ putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString()));
- /**
- * @see #putByteProperty(SimpleString, byte)
- */
- Message putByteProperty(String key, byte value);
+ putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString()));
+ } else {
+ putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getAddress());
- /**
- * Puts a byte[] property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putBytesProperty(SimpleString key, byte[] value);
+ putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getMessageID());
+ }
- /**
- * @see #putBytesProperty(SimpleString, byte[])
- */
- Message putBytesProperty(String key, byte[] value);
+ // reset expiry
+ setExpiration(0);
+ }
/**
- * Puts a short property in this message.
- *
- * @param key property name
- * @param value property value
+ * it will translate a property named HDR_DUPLICATE_DETECTION_ID.
+ * TODO-NOW: this can probably be replaced by an utility.
+ * @return
*/
- Message putShortProperty(SimpleString key, short value);
+ default byte[] getDuplicateIDBytes() {
+ Object duplicateID = getDuplicateProperty();
- /**
- * @see #putShortProperty(SimpleString, short)
- */
- Message putShortProperty(String key, short value);
+ if (duplicateID == null) {
+ return null;
+ } else {
+ if (duplicateID instanceof SimpleString) {
+ return ((SimpleString) duplicateID).getData();
+ } else if (duplicateID instanceof String) {
+ return new SimpleString(duplicateID.toString()).getData();
+ } else {
+ return (byte[]) duplicateID;
+ }
+ }
+ }
/**
- * Puts a char property in this message.
- *
- * @param key property name
- * @param value property value
+ * it will translate a property named HDR_DUPLICATE_DETECTION_ID.
+ * TODO-NOW: this can probably be replaced by an utility.
+ * @return
*/
- Message putCharProperty(SimpleString key, char value);
+ default Object getDuplicateProperty() {
+ return getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID.toString());
+ }
- /**
- * @see #putCharProperty(SimpleString, char)
- */
- Message putCharProperty(String key, char value);
- /**
- * Puts an int property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putIntProperty(SimpleString key, int value);
+ Message putBooleanProperty(String key, boolean value);
- /**
- * @see #putIntProperty(SimpleString, int)
- */
- Message putIntProperty(String key, int value);
+ Message putByteProperty(String key, byte value);
- /**
- * Puts a long property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putLongProperty(SimpleString key, long value);
+ Message putBytesProperty(String key, byte[] value);
- /**
- * @see #putLongProperty(SimpleString, long)
- */
- Message putLongProperty(String key, long value);
+ Message putShortProperty(String key, short value);
- /**
- * Puts a float property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putFloatProperty(SimpleString key, float value);
+ Message putCharProperty(String key, char value);
- /**
- * @see #putFloatProperty(SimpleString, float)
- */
- Message putFloatProperty(String key, float value);
+ Message putIntProperty(String key, int value);
- /**
- * Puts a double property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putDoubleProperty(SimpleString key, double value);
+ Message putLongProperty(String key, long value);
+
+ Message putFloatProperty(String key, float value);
- /**
- * @see #putDoubleProperty(SimpleString, double)
- */
Message putDoubleProperty(String key, double value);
- /**
- * Puts a SimpleString property in this message.
- *
- * @param key property name
- * @param value property value
- */
- Message putStringProperty(SimpleString key, SimpleString value);
+
+
+ Message putBooleanProperty(SimpleString key, boolean value);
+
+ Message putByteProperty(SimpleString key, byte value);
+
+ Message putBytesProperty(SimpleString key, byte[] value);
+
+ Message putShortProperty(SimpleString key, short value);
+
+ Message putCharProperty(SimpleString key, char value);
+
+ Message putIntProperty(SimpleString key, int value);
+
+ Message putLongProperty(SimpleString key, long value);
+
+ Message putFloatProperty(SimpleString key, float value);
+
+ Message putDoubleProperty(SimpleString key, double value);
/**
* Puts a String property in this message.
@@ -397,202 +447,125 @@ public interface Message {
*/
Message putStringProperty(String key, String value);
- /**
- * Puts an Object property in this message. <br>
- * Accepted types are:
- * <ul>
- * <li>Boolean</li>
- * <li>Byte</li>
- * <li>Short</li>
- * <li>Character</li>
- * <li>Integer</li>
- * <li>Long</li>
- * <li>Float</li>
- * <li>Double</li>
- * <li>String</li>
- * <li>SimpleString</li>
- * </ul>
- * Using any other type will throw a PropertyConversionException.
- *
- * @param key property name
- * @param value property value
- * @throws ActiveMQPropertyConversionException if the value is not one of the accepted property
- * types.
- */
- Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #putObjectProperty(SimpleString, Object)
- */
Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException;
- /**
- * Removes the property corresponding to the specified key.
- *
- * @param key property name
- * @return the value corresponding to the specified key or @{code null}
- */
- Object removeProperty(SimpleString key);
+ Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException;
- /**
- * @see #removeProperty(SimpleString)
- */
Object removeProperty(String key);
- /**
- * Returns {@code true} if this message contains a property with the given key, {@code false} else.
- *
- * @param key property name
- */
- boolean containsProperty(SimpleString key);
-
- /**
- * @see #containsProperty(SimpleString)
- */
boolean containsProperty(String key);
- /**
- * Returns the property corresponding to the specified key as a Boolean.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a Boolean
- */
- Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #getBooleanProperty(SimpleString)
- */
Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key as a Byte.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a Byte
- */
- Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #getByteProperty(SimpleString)
- */
Byte getByteProperty(String key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key as a Double.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a Double
- */
- Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #getDoubleProperty(SimpleString)
- */
Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key as an Integer.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to an Integer
- */
- Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #getIntProperty(SimpleString)
- */
Integer getIntProperty(String key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key as a Long.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a Long
- */
- Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #getLongProperty(SimpleString)
- */
Long getLongProperty(String key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key
- */
- Object getObjectProperty(SimpleString key);
-
- /**
- * @see #getBooleanProperty(SimpleString)
- */
Object getObjectProperty(String key);
- /**
- * Returns the property corresponding to the specified key as a Short.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a Short
- */
- Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #getShortProperty(SimpleString)
- */
Short getShortProperty(String key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key as a Float.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a Float
- */
- Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #getFloatProperty(SimpleString)
- */
Float getFloatProperty(String key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key as a String.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a String
- */
- String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #getStringProperty(SimpleString)
- */
String getStringProperty(String key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key as a SimpleString.
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a SimpleString
- */
- SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
- /**
- * @see #getSimpleStringProperty(SimpleString)
- */
SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException;
- /**
- * Returns the property corresponding to the specified key as a byte[].
- *
- * @throws ActiveMQPropertyConversionException if the value can not be converted to a byte[]
- */
+ byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException;
+
+
+ Object removeProperty(SimpleString key);
+
+ boolean containsProperty(SimpleString key);
+
+ Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+ Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+ Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+ Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+ Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+ Object getObjectProperty(SimpleString key);
+
+ Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+ Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+ String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+ SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+ Message putStringProperty(SimpleString key, SimpleString value);
+
/**
- * @see #getBytesProperty(SimpleString)
+ * Returns the size of the <em>encoded</em> message.
*/
- byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException;
+ int getEncodeSize();
/**
* Returns all the names of the properties for this message.
*/
Set<SimpleString> getPropertyNames();
+
+
+ int getRefCount();
+
+ int incrementRefCount() throws Exception;
+
+ int decrementRefCount() throws Exception;
+
+ int incrementDurableRefCount();
+
+ int decrementDurableRefCount();
+
/**
* @return Returns the message in Map form, useful when encoding to JSON
*/
- Map<String, Object> toMap();
+ default Map<String, Object> toMap() {
+ Map map = toPropertyMap();
+ map.put("messageID", getMessageID());
+ Object userID = getUserID();
+ if (getUserID() != null) {
+ map.put("userID", "ID:" + userID.toString());
+ }
+
+ map.put("address", getAddress());
+ map.put("type", getBodyType().toString());
+ map.put("durable", isDurable());
+ map.put("expiration", getExpiration());
+ map.put("timestamp", getTimestamp());
+ map.put("priority", (int)getPriority());
+
+ return map;
+ }
/**
* @return Returns the message properties in Map form, useful when encoding to JSON
*/
- Map<String, Object> toPropertyMap();
+ default Map<String, Object> toPropertyMap() {
+ Map map = new HashMap<>();
+ for (SimpleString name : getPropertyNames()) {
+ map.put(name.toString(), getObjectProperty(name.toString()));
+ }
+ return map;
+ }
+
+
+ /** This should make you convert your message into Core format. */
+ Message toCore();
+
+ int getMemoryEstimate();
+
+
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
new file mode 100644
index 0000000..64dd44d
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class RefCountMessage implements Message {
+
+ private final AtomicInteger durableRefCount = new AtomicInteger();
+
+ private final AtomicInteger refCount = new AtomicInteger();
+
+ private RefCountMessageListener context;
+
+ @Override
+ public Message setContext(RefCountMessageListener context) {
+ this.context = context;
+ return this;
+ }
+
+ @Override
+ public RefCountMessageListener getContext() {
+ return context;
+ }
+
+ @Override
+ public int getRefCount() {
+ return refCount.get();
+ }
+
+ @Override
+ public int incrementRefCount() throws Exception {
+ int count = refCount.incrementAndGet();
+ if (context != null) {
+ context.nonDurableUp(this, count);
+ }
+ return count;
+ }
+
+ @Override
+ public int incrementDurableRefCount() {
+ int count = durableRefCount.incrementAndGet();
+ if (context != null) {
+ context.durableUp(this, count);
+ }
+ return count;
+ }
+
+ @Override
+ public int decrementDurableRefCount() {
+ int count = durableRefCount.decrementAndGet();
+ if (context != null) {
+ context.durableDown(this, count);
+ }
+ return count;
+ }
+
+ @Override
+ public int decrementRefCount() throws Exception {
+ int count = refCount.decrementAndGet();
+ if (context != null) {
+ context.nonDurableDown(this, count);
+ }
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java
new file mode 100644
index 0000000..e68dffd
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core;
+
+/** If {@link Message#getContext()} != null and is implementing this interface.
+ * These methods will be called during refCount operations */
+public interface RefCountMessageListener {
+
+ void durableUp(Message message, int durableCount);
+
+ void durableDown(Message message, int durableCount);
+
+ void nonDurableUp(Message message, int nonDurableCoun);
+
+ void nonDurableDown(Message message, int nonDurableCoun);
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
index e87d365..daded00 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
@@ -234,24 +234,16 @@ public interface ClientMessage extends Message {
* Overridden from {@link Message} to enable fluent API
*/
@Override
- ClientMessage putStringProperty(SimpleString key, SimpleString value);
-
- /**
- * Overridden from {@link Message} to enable fluent API
- */
- @Override
ClientMessage putStringProperty(String key, String value);
/**
* Overridden from {@link Message} to enable fluent API
*/
- @Override
ClientMessage writeBodyBufferBytes(byte[] bytes);
/**
* Overridden from {@link Message} to enable fluent API
*/
- @Override
ClientMessage writeBodyBufferString(String string);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java
new file mode 100644
index 0000000..743583b
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core.encode;
+
+public enum BodyType {
+ Undefined, Bytes, Map, Object, Stream, Text
+}