You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/01 17:21:08 UTC
[14/23] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index eb7cda1..2108be7 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -31,11 +31,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
@@ -366,10 +368,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
@Override
- public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+ public void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
- r.setRecord(record);
+ r.setRecord(persister, record);
r.setSync(sync);
appendRecord(r);
}
@@ -377,12 +379,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
@Override
public void appendAddRecord(long id,
byte recordType,
- EncodingSupport record,
+ Persister persister,
+ Object record,
boolean sync,
IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
- r.setRecord(record);
+ r.setRecord(persister, record);
r.setSync(sync);
r.setIoCompletion(completionCallback);
appendRecord(r);
@@ -398,10 +401,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
@Override
- public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+ public void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
- r.setRecord(record);
+ r.setRecord(persister, record);
r.setSync(sync);
appendRecord(r);
}
@@ -409,12 +412,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
@Override
public void appendUpdateRecord(long id,
byte recordType,
- EncodingSupport record,
+ Persister persister,
+ Object record,
boolean sync,
IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
- r.setRecord(record);
+ r.setRecord(persister, record);
r.setSync(sync);
r.setIoCompletion(completionCallback);
appendRecord(r);
@@ -448,10 +452,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
public void appendAddRecordTransactional(long txID,
long id,
byte recordType,
- EncodingSupport record) throws Exception {
+ Persister persister,
+ Object record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
r.setUserRecordType(recordType);
- r.setRecord(record);
+ r.setRecord(persister, record);
r.setTxId(txID);
appendRecord(r);
}
@@ -469,10 +474,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
public void appendUpdateRecordTransactional(long txID,
long id,
byte recordType,
- EncodingSupport record) throws Exception {
+ Persister persister,
+ Object record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet());
r.setUserRecordType(recordType);
- r.setRecord(record);
+ r.setRecord(persister, record);
r.setTxId(txID);
appendRecord(r);
}
@@ -488,7 +494,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
@Override
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
- r.setRecord(record);
+ r.setRecord(EncoderPersister.getInstance(), record);
r.setTxId(txID);
appendRecord(r);
}
@@ -685,10 +691,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
@Override
- public void perfBlast(int pages) {
- }
-
- @Override
public void runDirectJournalBlast() throws Exception {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
index 9691d3e..b094164 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
@@ -237,11 +238,11 @@ class JDBCJournalRecord {
this.record = record;
}
- public void setRecord(EncodingSupport record) {
- this.variableSize = record.getEncodeSize();
+ public void setRecord(Persister persister, Object record) {
+ this.variableSize = persister.getEncodeSize(record);
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(variableSize);
- record.encode(encodedBuffer);
+ persister.encode(encodedBuffer, record);
this.record = new ActiveMQBufferInputStream(encodedBuffer);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
index 59f04e8..6da3912 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
@@ -26,7 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset;
import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean;
@@ -374,7 +374,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
if (bodyLength == 0)
return null;
byte[] dst = new byte[bodyLength];
- message.getBodyBuffer().getBytes(MessageImpl.BODY_OFFSET, dst);
+ message.getBodyBuffer().getBytes(CoreMessage.BODY_OFFSET, dst);
return (T) dst;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index 47dcfb2..80a07ac 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -43,7 +43,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID;
@@ -293,7 +293,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public String getJMSMessageID() {
if (msgID == null) {
- UUID uid = message.getUserID();
+ UUID uid = (UUID)message.getUserID();
msgID = uid == null ? null : "ID:" + uid.toString();
}
@@ -397,7 +397,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public Destination getJMSDestination() throws JMSException {
if (dest == null) {
- SimpleString address = message.getAddress();
+ SimpleString address = message.getAddressSimpleString();
String prefix = "";
if (message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)) {
RoutingType routingType = RoutingType.getType(message.getByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE));
@@ -756,7 +756,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@SuppressWarnings("unchecked")
protected <T> T getBodyInternal(Class<T> c) throws MessageFormatException {
- InputStream is = ((MessageInternal) message).getBodyInputStream();
+ InputStream is = ((ClientMessageInternal) message).getBodyInputStream();
try {
ObjectInputStream ois = new ObjectInputStream(is);
return (T) ois.readObject();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
index 6cf20ff..289f88c 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.jms.transaction;
import javax.transaction.xa.Xid;
import java.util.Map;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionDetail;
import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
@@ -36,7 +36,7 @@ public class JMSTransactionDetail extends TransactionDetail {
}
@Override
- public String decodeMessageType(ServerMessage msg) {
+ public String decodeMessageType(Message msg) {
int type = msg.getType();
switch (type) {
case ActiveMQMessage.TYPE: // 0
@@ -57,7 +57,7 @@ public class JMSTransactionDetail extends TransactionDetail {
}
@Override
- public Map<String, Object> decodeMessageProperties(ServerMessage msg) {
+ public Map<String, Object> decodeMessageProperties(Message msg) {
try {
return ActiveMQMessage.coreMaptoJMSMap(msg.toMap());
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
new file mode 100644
index 0000000..8fc2a5aa
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.persistence.Persister;
+
+/** This is a facade between the new Persister and the former EncodingSupport.
+ * Methods using the old interface will use this as a facade to provide the previous semantic. */
+public class EncoderPersister implements Persister<EncodingSupport> {
+
+ private static final EncoderPersister theInstance = new EncoderPersister();
+
+ private EncoderPersister() {
+ }
+
+ public static EncoderPersister getInstance() {
+ return theInstance;
+ }
+
+ @Override
+ public int getEncodeSize(EncodingSupport record) {
+ return record.getEncodeSize();
+ }
+
+ @Override
+ public void encode(ActiveMQBuffer buffer, EncodingSupport record) {
+ record.encode(buffer);
+ }
+
+ @Override
+ public EncodingSupport decode(ActiveMQBuffer buffer, EncodingSupport record) {
+ record.decode(buffer);
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index fbd4182..ca194b8 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
/**
@@ -60,23 +61,49 @@ public interface Journal extends ActiveMQComponent {
void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
- void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+ default void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+ appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
+ }
+
+ void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
void appendAddRecord(long id,
byte recordType,
- EncodingSupport record,
+ Persister persister,
+ Object record,
boolean sync,
IOCompletion completionCallback) throws Exception;
+ default void appendAddRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception {
+ appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
+ }
+
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
- void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+ default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+ appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
+ }
- void appendUpdateRecord(long id,
+ void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
+
+ default void appendUpdateRecord(long id,
byte recordType,
EncodingSupport record,
boolean sync,
- IOCompletion completionCallback) throws Exception;
+ IOCompletion completionCallback) throws Exception {
+ appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
+ }
+
+ void appendUpdateRecord(final long id,
+ final byte recordType,
+ final Persister persister,
+ final Object record,
+ final boolean sync,
+ final IOCompletion callback) throws Exception;
void appendDeleteRecord(long id, boolean sync) throws Exception;
@@ -86,11 +113,23 @@ public interface Journal extends ActiveMQComponent {
void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
- void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+ default void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception {
+ appendAddRecordTransactional(txID, id, recordType, EncoderPersister.getInstance(), record);
+ }
+
+ void appendAddRecordTransactional(final long txID,
+ final long id,
+ final byte recordType,
+ final Persister persister,
+ final Object record) throws Exception;
void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
- void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+ default void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception {
+ appendUpdateRecordTransactional(txID, id, recordType, EncoderPersister.getInstance(), record);
+ }
+
+ void appendUpdateRecordTransactional(long txID, long id, byte recordType, Persister persister, Object record) throws Exception;
void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception;
@@ -165,8 +204,6 @@ public interface Journal extends ActiveMQComponent {
int getUserVersion();
- void perfBlast(int pages);
-
void runDirectJournalBlast() throws Exception;
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index 8bbecd2..943077c 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
@@ -127,7 +128,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
}
}
- JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, new ByteArrayEncoding(filesToRename.toByteBuffer().array()));
+ JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, EncoderPersister.getInstance(), new ByteArrayEncoding(filesToRename.toByteBuffer().array()));
ActiveMQBuffer renameBuffer = ActiveMQBuffers.dynamicBuffer(filesToRename.writerIndex());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 0b702a5..8e5ca2c 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
@@ -90,10 +91,11 @@ public final class FileWrapperJournal extends JournalBase {
@Override
public void appendAddRecord(long id,
byte recordType,
- EncodingSupport record,
+ Persister persister,
+ Object record,
boolean sync,
IOCompletion callback) throws Exception {
- JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
+ JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
writeRecord(addRecord, sync, callback);
}
@@ -144,19 +146,21 @@ public final class FileWrapperJournal extends JournalBase {
public void appendAddRecordTransactional(long txID,
long id,
byte recordType,
- EncodingSupport record) throws Exception {
+ Persister persister,
+ Object record) throws Exception {
count(txID);
- JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+ JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
writeRecord(addRecord, false, null);
}
@Override
public void appendUpdateRecord(long id,
byte recordType,
- EncodingSupport record,
+ Persister persister,
+ Object record,
boolean sync,
IOCompletion callback) throws Exception {
- JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
+ JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
writeRecord(updateRecord, sync, callback);
}
@@ -164,9 +168,10 @@ public final class FileWrapperJournal extends JournalBase {
public void appendUpdateRecordTransactional(long txID,
long id,
byte recordType,
- EncodingSupport record) throws Exception {
+ Persister persister,
+ Object record) throws Exception {
count(txID);
- JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
+ JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, persister, record);
writeRecord(updateRecordTX, false, null);
}
@@ -261,11 +266,6 @@ public final class FileWrapperJournal extends JournalBase {
}
@Override
- public void perfBlast(int pages) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void runDirectJournalBlast() throws Exception {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
index e2ca84d..e6bd99e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
@@ -21,6 +21,7 @@ import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
abstract class JournalBase implements Journal {
@@ -37,68 +38,15 @@ abstract class JournalBase implements Journal {
}
@Override
- public abstract void appendAddRecord(final long id,
- final byte recordType,
- final EncodingSupport record,
- final boolean sync,
- final IOCompletion callback) throws Exception;
-
- @Override
- public abstract void appendAddRecordTransactional(final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record) throws Exception;
-
- @Override
- public abstract void appendCommitRecord(final long txID,
- final boolean sync,
- final IOCompletion callback,
- boolean lineUpContext) throws Exception;
-
- @Override
- public abstract void appendDeleteRecord(final long id,
- final boolean sync,
- final IOCompletion callback) throws Exception;
-
- @Override
- public abstract void appendDeleteRecordTransactional(final long txID,
- final long id,
- final EncodingSupport record) throws Exception;
-
- @Override
- public abstract void appendPrepareRecord(final long txID,
- final EncodingSupport transactionData,
- final boolean sync,
- final IOCompletion callback) throws Exception;
-
- @Override
- public abstract void appendUpdateRecord(final long id,
- final byte recordType,
- final EncodingSupport record,
- final boolean sync,
- final IOCompletion callback) throws Exception;
-
- @Override
- public abstract void appendUpdateRecordTransactional(final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record) throws Exception;
-
- @Override
- public abstract void appendRollbackRecord(final long txID,
- final boolean sync,
- final IOCompletion callback) throws Exception;
-
- @Override
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
@Override
- public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+ public void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
SyncIOCompletion callback = getSyncCallback(sync);
- appendAddRecord(id, recordType, record, sync, callback);
+ appendAddRecord(id, recordType, persister, record, sync, callback);
if (callback != null) {
callback.waitCompletion();
@@ -176,11 +124,12 @@ abstract class JournalBase implements Journal {
@Override
public void appendUpdateRecord(final long id,
final byte recordType,
- final EncodingSupport record,
+ final Persister persister,
+ final Object record,
final boolean sync) throws Exception {
SyncIOCompletion callback = getSyncCallback(sync);
- appendUpdateRecord(id, recordType, record, sync, callback);
+ appendUpdateRecord(id, recordType, persister, record, sync, callback);
if (callback != null) {
callback.waitCompletion();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
index b95d641..c62b27b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
@@ -252,7 +253,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override
public void onReadAddRecord(final RecordInfo info) throws Exception {
if (lookupRecord(info.id)) {
- JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data));
+ JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
addRecord.setCompactCount((short) (info.compactCount + 1));
checkSize(addRecord.getEncodeSize(), info.compactCount);
@@ -268,7 +269,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data));
+ JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), EncoderPersister.getInstance(),new ByteArrayEncoding(info.data));
record.setCompactCount((short) (info.compactCount + 1));
@@ -374,7 +375,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override
public void onReadUpdateRecord(final RecordInfo info) throws Exception {
if (lookupRecord(info.id)) {
- JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, new ByteArrayEncoding(info.data));
+ JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
updateRecord.setCompactCount((short) (info.compactCount + 1));
@@ -397,7 +398,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, new ByteArrayEncoding(info.data));
+ JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
updateRecordTX.setCompactCount((short) (info.compactCount + 1));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index db615f8..24bb916 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -57,11 +57,11 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TestableJournal;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
-import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecordTX;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalCompleteRecordTX;
@@ -713,7 +713,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
@Override
public void appendAddRecord(final long id,
final byte recordType,
- final EncodingSupport record,
+ final Persister persister,
+ final Object record,
final boolean sync,
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
@@ -727,7 +728,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void run() {
journalLock.readLock().lock();
try {
- JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
+ JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
@@ -762,7 +763,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
@Override
public void appendUpdateRecord(final long id,
final byte recordType,
- final EncodingSupport record,
+ final Persister persister,
+ final Object record,
final boolean sync,
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
@@ -777,7 +779,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
journalLock.readLock().lock();
try {
JournalRecord jrnRecord = records.get(id);
- JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
+ JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
if (logger.isTraceEnabled()) {
@@ -873,7 +875,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void appendAddRecordTransactional(final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception {
+ final Persister persister,
+ final Object record) throws Exception {
checkJournalIsLoaded();
final JournalTransaction tx = getTransactionInfo(txID);
@@ -885,7 +888,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void run() {
journalLock.readLock().lock();
try {
- JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+ JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
if (logger.isTraceEnabled()) {
@@ -952,7 +955,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception {
+ final Persister persister,
+ final Object record) throws Exception {
checkJournalIsLoaded();
final JournalTransaction tx = getTransactionInfo(txID);
@@ -965,7 +969,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
journalLock.readLock().lock();
try {
- JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, record );
+ JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, persister, record );
JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null );
if ( logger.isTraceEnabled() ) {
@@ -2165,45 +2169,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
- @Override
- public void perfBlast(final int pages) {
-
- checkJournalIsLoaded();
-
- final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
-
- final JournalInternalRecord blastRecord = new JournalInternalRecord() {
-
- @Override
- public int getEncodeSize() {
- return byteEncoder.getEncodeSize();
- }
-
- @Override
- public void encode(final ActiveMQBuffer buffer) {
- byteEncoder.encode(buffer);
- }
- };
-
- appendExecutor.execute(new Runnable() {
- @Override
- public void run() {
- journalLock.readLock().lock();
- try {
-
- for (int i = 0; i < pages; i++) {
- appendRecord(blastRecord, false, false, null, null);
- }
-
- } catch (Exception e) {
- ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e);
- } finally {
- journalLock.readLock().unlock();
- }
- }
- });
- }
-
// ActiveMQComponent implementation
// ---------------------------------------------------
@@ -2921,5 +2886,4 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public int getCompactCount() {
return compactCount;
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
index c6a5d4a..6e5b651 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
@@ -17,14 +17,16 @@
package org.apache.activemq.artemis.core.journal.impl.dataformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
public class JournalAddRecord extends JournalInternalRecord {
protected final long id;
- protected final EncodingSupport record;
+ protected final Persister persister;
+
+ protected final Object record;
protected final byte recordType;
@@ -35,7 +37,7 @@ public class JournalAddRecord extends JournalInternalRecord {
* @param recordType
* @param record
*/
- public JournalAddRecord(final boolean add, final long id, final byte recordType, final EncodingSupport record) {
+ public JournalAddRecord(final boolean add, final long id, final byte recordType, final Persister persister, Object record) {
this.id = id;
this.record = record;
@@ -43,6 +45,8 @@ public class JournalAddRecord extends JournalInternalRecord {
this.recordType = recordType;
this.add = add;
+
+ this.persister = persister;
}
@Override
@@ -59,17 +63,19 @@ public class JournalAddRecord extends JournalInternalRecord {
buffer.writeLong(id);
- buffer.writeInt(record.getEncodeSize());
+ int recordEncodeSize = persister.getEncodeSize(record);
+
+ buffer.writeInt(persister.getEncodeSize(record));
buffer.writeByte(recordType);
- record.encode(buffer);
+ persister.encode(buffer, record);
- buffer.writeInt(getEncodeSize());
+ buffer.writeInt(recordEncodeSize + JournalImpl.SIZE_ADD_RECORD + 1);
}
@Override
public int getEncodeSize() {
- return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize() + 1;
+ return JournalImpl.SIZE_ADD_RECORD + persister.getEncodeSize(record) + 1;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
index 6cec122..483418f 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
@@ -17,7 +17,7 @@
package org.apache.activemq.artemis.core.journal.impl.dataformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
public class JournalAddRecordTX extends JournalInternalRecord {
@@ -26,7 +26,9 @@ public class JournalAddRecordTX extends JournalInternalRecord {
private final long id;
- private final EncodingSupport record;
+ protected final Persister persister;
+
+ protected final Object record;
private final byte recordType;
@@ -41,12 +43,15 @@ public class JournalAddRecordTX extends JournalInternalRecord {
final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) {
+ final Persister persister,
+ Object record) {
this.txID = txID;
this.id = id;
+ this.persister = persister;
+
this.record = record;
this.recordType = recordType;
@@ -70,17 +75,17 @@ public class JournalAddRecordTX extends JournalInternalRecord {
buffer.writeLong(id);
- buffer.writeInt(record.getEncodeSize());
+ buffer.writeInt(persister.getEncodeSize(record));
buffer.writeByte(recordType);
- record.encode(buffer);
+ persister.encode(buffer, record);
buffer.writeInt(getEncodeSize());
}
@Override
public int getEncodeSize() {
- return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize() + 1;
+ return JournalImpl.SIZE_ADD_RECORD_TX + persister.getEncodeSize(record) + 1;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
new file mode 100644
index 0000000..ee2f870
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -0,0 +1,761 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Set;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.encode.BodyType;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.apache.qpid.proton.util.TLSEncoder;
+
+// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
+public class AMQPMessage extends RefCountMessage {
+
+ final long messageFormat;
+ private ProtonProtocolManager protocolManager;
+ ByteBuf data;
+ boolean bufferValid;
+ byte type;
+ long messageID;
+ String address;
+ MessageImpl protonMessage;
+ private long expiration = 0;
+ // this can be used to encode the header again and the rest of the message buffer
+ private int headerEnd = -1;
+ private Header _header;
+ private DeliveryAnnotations _deliveryAnnotations;
+ private MessageAnnotations _messageAnnotations;
+ private Properties _properties;
+ private ApplicationProperties applicationProperties;
+
+ public AMQPMessage(long messageFormat, byte[] data, ProtonProtocolManager protocolManager) {
+ this.protocolManager = protocolManager;
+ this.data = Unpooled.wrappedBuffer(data);
+ this.messageFormat = messageFormat;
+ this.bufferValid = true;
+
+ }
+
+ /** for persistence reload */
+ public AMQPMessage(long messageFormat) {
+ this.messageFormat = messageFormat;
+ this.bufferValid = false;
+
+ }
+
+ public AMQPMessage(long messageFormat, Message message, ProtonProtocolManager protocolManager) {
+ this.protocolManager = protocolManager;
+ this.protonMessage = (MessageImpl)message;
+ this.messageFormat = messageFormat;
+
+ }
+
+ public AMQPMessage(Message message, ProtonProtocolManager protocolManager) {
+ this(0, message, protocolManager);
+ }
+
+ public MessageImpl getProtonMessage() {
+ if (protonMessage == null) {
+ protonMessage = (MessageImpl) Message.Factory.create();
+
+ if (data != null) {
+ data.readerIndex(0);
+ protonMessage.decode(data.nioBuffer());
+ this._header = protonMessage.getHeader();
+ protonMessage.setHeader(null);
+ }
+ }
+
+ return protonMessage;
+ }
+
+ private void initalizeObjects() {
+ if (protonMessage == null) {
+ if (data == null) {
+ this.headerEnd = -1;
+ _header = new Header();
+ _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
+ _properties = new Properties();
+ this.applicationProperties = new ApplicationProperties(new HashMap<>());
+ this.protonMessage = (MessageImpl)Message.Factory.create();
+ this.protonMessage.setApplicationProperties(applicationProperties);
+ this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
+ }
+ }
+ }
+
+ private ApplicationProperties getApplicationProperties() {
+ if (applicationProperties == null) {
+ if (data != null) {
+ partialDecode(data.nioBuffer(), true);
+ } else {
+ initalizeObjects();
+ }
+ }
+
+ return applicationProperties;
+ }
+
+ public Header getHeader() {
+ if (_header == null) {
+ if (data == null) {
+ initalizeObjects();
+ } else {
+ partialDecode(this.data.nioBuffer(), false);
+ }
+ }
+
+ return _header;
+ }
+
+ public Properties getProperties() {
+ if (_properties == null) {
+ if (data == null) {
+ initalizeObjects();
+ } else {
+ partialDecode(this.data.nioBuffer(), true);
+ }
+ }
+
+ return _properties;
+ }
+
+ @Override
+ public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() {
+ return AMQPMessagePersister.getInstance();
+ }
+
+ private synchronized void partialDecode(ByteBuffer buffer, boolean readApplicationProperties) {
+ DecoderImpl decoder = TLSEncoder.getDecoder();
+ decoder.setByteBuffer(buffer);
+ buffer.position(0);
+
+ _header = null;
+ _deliveryAnnotations = null;
+ _messageAnnotations = null;
+ _properties = null;
+ applicationProperties = null;
+ Section section = null;
+
+ try {
+ if (buffer.hasRemaining()) {
+ section = (Section) decoder.readObject();
+ }
+
+ if (section instanceof Header) {
+ headerEnd = buffer.position();
+ _header = (Header) section;
+
+ if (!readApplicationProperties) {
+ return;
+ }
+
+ if (buffer.hasRemaining() && readApplicationProperties) {
+ section = (Section) decoder.readObject();
+ } else {
+ section = null;
+ }
+ }
+
+ if (!readApplicationProperties) {
+ return;
+ }
+ if (section instanceof DeliveryAnnotations) {
+ _deliveryAnnotations = (DeliveryAnnotations) section;
+
+ if (buffer.hasRemaining()) {
+ section = (Section) decoder.readObject();
+ } else {
+ section = null;
+ }
+
+ }
+ if (section instanceof MessageAnnotations) {
+ _messageAnnotations = (MessageAnnotations) section;
+
+ if (buffer.hasRemaining()) {
+ section = (Section) decoder.readObject();
+ } else {
+ section = null;
+ }
+
+ }
+ if (section instanceof Properties) {
+ _properties = (Properties) section;
+
+ if (_header.getTtl() != null) {
+ this.expiration = System.currentTimeMillis() + _header.getTtl().intValue();
+ }
+
+ if (buffer.hasRemaining()) {
+ section = (Section) decoder.readObject();
+ } else {
+ section = null;
+ }
+
+ }
+ if (section instanceof ApplicationProperties) {
+ applicationProperties = (ApplicationProperties) section;
+ }
+ } finally {
+ decoder.setByteBuffer(null);
+ }
+ }
+
+ public long getMessageFormat() {
+ return messageFormat;
+ }
+
+ public int getLength() {
+ return data.array().length;
+ }
+
+ public byte[] getArray() {
+ return data.array();
+ }
+
+ @Override
+ public void messageChanged() {
+ bufferValid = false;
+ this.data = null;
+ }
+
+ // TODO-now this only make sense on Core
+ @Override
+ public ActiveMQBuffer getBodyBuffer() {
+ return null;
+ }
+
+ // TODO-now this only make sense on Core
+ @Override
+ public ActiveMQBuffer getReadOnlyBodyBuffer() {
+ return null;
+ }
+
+ // TODO: Refactor Large message
+ @Override
+ public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
+ return null;
+ }
+
+ @Override
+ public byte getType() {
+ // TODO-now: what to do here?
+ return type;
+ }
+
+ @Override
+ public AMQPMessage setType(byte type) {
+ this.type = type;
+ return this;
+ }
+
+ @Override
+ public boolean isLargeMessage() {
+ return false;
+ }
+
+ @Override
+ public ByteBuf getBuffer() {
+ if (data == null) {
+ return null;
+ } else {
+ return Unpooled.wrappedBuffer(data);
+ }
+ }
+
+ @Override
+ public AMQPMessage setBuffer(ByteBuf buffer) {
+ this.data = null;
+ return this;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message copy() {
+ // TODO-now: what to do with this?
+ AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array(), protocolManager);
+ return newEncode;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message copy(long newID) {
+ return copy().setMessageID(newID);
+ }
+
+ @Override
+ public long getMessageID() {
+ return messageID;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message setMessageID(long id) {
+ this.messageID = id;
+ return this;
+ }
+
+ @Override
+ public long getExpiration() {
+ return expiration;
+ }
+
+ @Override
+ public AMQPMessage setExpiration(long expiration) {
+ this.expiration = expiration;
+ return this;
+ }
+
+ @Override
+ public Object getUserID() {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message setUserID(Object userID) {
+ return null;
+ }
+
+ @Override
+ public void copyHeadersAndProperties(org.apache.activemq.artemis.api.core.Message msg) {
+
+ }
+
+ @Override
+ public boolean isDurable() {
+ if (getHeader() != null) {
+ return getHeader().getDurable().booleanValue();
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) {
+ return null;
+ }
+
+ @Override
+ public Object getProtocol() {
+ return protocolManager;
+ }
+
+ @Override
+ public AMQPMessage setProtocol(Object protocol) {
+ this.protocolManager = (ProtonProtocolManager)protocol;
+ return this;
+ }
+
+ @Override
+ public Object getBody() {
+ return null;
+ }
+
+ @Override
+ public BodyType getBodyType() {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message setBody(BodyType type, Object body) {
+ return null;
+ }
+
+ @Override
+ public String getAddress() {
+ if (address == null) {
+ Properties properties = getProtonMessage().getProperties();
+ if (properties != null) {
+ return properties.getTo();
+ } else {
+ return null;
+ }
+ } else {
+ return address;
+ }
+ }
+
+ @Override
+ public AMQPMessage setAddress(SimpleString address) {
+ return setAddress(address.toString());
+ }
+
+ @Override
+ public AMQPMessage setAddress(String address) {
+ this.address = address;
+ return this;
+ }
+
+ @Override
+ public SimpleString getAddressSimpleString() {
+ return SimpleString.toSimpleString(getAddress());
+ }
+
+ @Override
+ public long getTimestamp() {
+ return 0;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message setTimestamp(long timestamp) {
+ return null;
+ }
+
+ @Override
+ public byte getPriority() {
+ return 0;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message setPriority(byte priority) {
+ return null;
+ }
+
+ @Override
+ public void receiveBuffer(ByteBuf buffer) {
+
+ }
+
+ private synchronized void checkBuffer() {
+ if (!bufferValid) {
+ ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500);
+ try {
+ getProtonMessage().encode(new NettyWritable(buffer));
+ byte[] bytes = new byte[buffer.writerIndex()];
+ buffer.readBytes(bytes);
+ this.data = Unpooled.wrappedBuffer(bytes);
+ } finally {
+ buffer.release();
+ }
+ }
+ }
+
+ @Override
+ public void sendBuffer(ByteBuf buffer, int deliveryCount) {
+ // TODO: do I need to change the Header with deliveryCount?
+ // I would send a new instance of Header with a new delivery count, and only send partial of the buffer
+ // previously received
+ checkBuffer();
+ buffer.writeBytes(data);
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putByteProperty(String key, byte value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putShortProperty(String key, short value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putCharProperty(String key, char value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putIntProperty(String key, int value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putLongProperty(String key, long value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putFloatProperty(String key, float value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putDoubleProperty(String key, double value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putBooleanProperty(SimpleString key, boolean value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putByteProperty(SimpleString key, byte value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putBytesProperty(SimpleString key, byte[] value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putShortProperty(SimpleString key, short value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putCharProperty(SimpleString key, char value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putIntProperty(SimpleString key, int value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putLongProperty(SimpleString key, long value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putFloatProperty(SimpleString key, float value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putDoubleProperty(SimpleString key, double value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putStringProperty(String key, String value) {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putObjectProperty(String key,
+ Object value) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putObjectProperty(SimpleString key,
+ Object value) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Object removeProperty(String key) {
+ return null;
+ }
+
+ @Override
+ public boolean containsProperty(String key) {
+ return false;
+ }
+
+ @Override
+ public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Object getObjectProperty(String key) {
+ return null;
+ }
+
+ @Override
+ public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public String getStringProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException {
+ return new byte[0];
+ }
+
+ @Override
+ public Object removeProperty(SimpleString key) {
+ return null;
+ }
+
+ @Override
+ public boolean containsProperty(SimpleString key) {
+ return false;
+ }
+
+ @Override
+ public Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Object getObjectProperty(SimpleString key) {
+ return null;
+ }
+
+ @Override
+ public Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return new byte[0];
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, SimpleString value) {
+ return null;
+ }
+
+ @Override
+ public int getEncodeSize() {
+ return 0;
+ }
+
+ @Override
+ public Set<SimpleString> getPropertyNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public int getMemoryEstimate() {
+ return 0;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message toCore() {
+ MessageImpl protonMessage = getProtonMessage();
+ return null;
+ }
+
+ @Override
+ public int getPersistSize() {
+ checkBuffer();
+ return data.array().length + DataConstants.SIZE_INT;
+ }
+
+ @Override
+ public void persist(ActiveMQBuffer targetRecord) {
+ checkBuffer();
+ targetRecord.writeInt(data.array().length);
+ targetRecord.writeBytes(data.array());
+ }
+
+ @Override
+ public void reloadPersistence(ActiveMQBuffer record) {
+ int size = record.readInt();
+ byte[] recordArray = new byte[size];
+ record.readBytes(recordArray);
+ this.data = Unpooled.wrappedBuffer(recordArray);
+ this.bufferValid = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
new file mode 100644
index 0000000..3b5bdda
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class AMQPMessagePersister extends MessagePersister {
+
+ public static AMQPMessagePersister theInstance = new AMQPMessagePersister();
+
+ public static AMQPMessagePersister getInstance() {
+ return theInstance;
+ }
+
+ private AMQPMessagePersister() {
+ }
+
+ @Override
+ protected byte getID() {
+ return ProtonProtocolManagerFactory.ID;
+ }
+
+ @Override
+ public int getEncodeSize(Message record) {
+ return DataConstants.SIZE_BYTE + record.getPersistSize() +
+ SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG;
+ }
+
+
+ /** Sub classes must add the first short as the protocol-id */
+ @Override
+ public void encode(ActiveMQBuffer buffer, Message record) {
+ super.encode(buffer, record);
+ AMQPMessage msgEncode = (AMQPMessage)record;
+ buffer.writeLong(record.getMessageID());
+ buffer.writeLong(msgEncode.getMessageFormat());
+ buffer.writeNullableSimpleString(record.getAddressSimpleString());
+ record.persist(buffer);
+ }
+
+
+ @Override
+ public Message decode(ActiveMQBuffer buffer, Message record) {
+ long id = buffer.readLong();
+ long format = buffer.readLong();
+ SimpleString address = buffer.readNullableSimpleString();
+ record = new AMQPMessage(format);
+ record.reloadPersistence(buffer);
+ record.setMessageID(id);
+ if (address != null) {
+ record.setAddress(address);
+ }
+ return record;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 18c6b05..0b02838 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback;
@@ -34,14 +35,12 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
@@ -69,7 +68,6 @@ import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
-import io.netty.buffer.ByteBuf;
import org.jboss.logging.Logger;
public class AMQPSessionCallback implements SessionCallback {
@@ -298,11 +296,11 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
- public long encodeMessage(Object message, int deliveryCount, WritableBuffer buffer) throws Exception {
+ public long encodeMessage(Message message, int deliveryCount, WritableBuffer buffer) throws Exception {
ProtonMessageConverter converter = (ProtonMessageConverter) manager.getConverter();
// The Proton variant accepts a WritableBuffer to allow for a faster more direct encode.
- return (long) converter.outbound((ServerMessage) message, deliveryCount, buffer);
+ return (long) converter.outbound(message, deliveryCount, buffer);
}
public String tempQueueName() {
@@ -321,22 +319,22 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
- public void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception {
+ public void ack(Transaction transaction, Object brokerConsumer, Message message) throws Exception {
if (transaction == null) {
transaction = serverSession.getCurrentTransaction();
}
recoverContext();
try {
- ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, ((ServerMessage) message).getMessageID());
+ ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, message.getMessageID());
} finally {
resetContext();
}
}
- public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception {
+ public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
recoverContext();
try {
- ((ServerConsumer) brokerConsumer).individualCancel(((ServerMessage) message).getMessageID(), updateCounts);
+ ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
} finally {
resetContext();
}
@@ -351,11 +349,8 @@ public class AMQPSessionCallback implements SessionCallback {
final Delivery delivery,
String address,
int messageFormat,
- ByteBuf messageEncoded) throws Exception {
- EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex());
-
- ServerMessage message = manager.getConverter().inbound(encodedMessage);
- //use the address on the receiver if not null, if null let's hope it was set correctly on the message
+ byte[] data) throws Exception {
+ AMQPMessage message = new AMQPMessage(messageFormat, data, manager);
if (address != null) {
message.setAddress(new SimpleString(address));
} else {
@@ -372,7 +367,7 @@ public class AMQPSessionCallback implements SessionCallback {
recoverContext();
- PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress());
+ PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
if (store.isRejectingMessages()) {
// We drop pre-settled messages (and abort any associated Tx)
if (delivery.remotelySettled()) {
@@ -401,7 +396,7 @@ public class AMQPSessionCallback implements SessionCallback {
}
private void serverSend(final Transaction transaction,
- final ServerMessage message,
+ final Message message,
final Delivery delivery,
final Receiver receiver) throws Exception {
try {
@@ -416,8 +411,8 @@ public class AMQPSessionCallback implements SessionCallback {
synchronized (connection.getLock()) {
delivery.disposition(Accepted.getInstance());
delivery.settle();
- connection.flush();
}
+ connection.flush(true);
}
@Override
@@ -492,7 +487,7 @@ public class AMQPSessionCallback implements SessionCallback {
}
@Override
- public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+ public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString());
@@ -512,7 +507,7 @@ public class AMQPSessionCallback implements SessionCallback {
@Override
public int sendLargeMessage(MessageReference ref,
- ServerMessage message,
+ Message message,
ServerConsumer consumer,
long bodySize,
int deliveryCount) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
index bef8ef0..98ec228 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
@@ -22,6 +22,8 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -32,6 +34,8 @@ import org.osgi.service.component.annotations.Component;
@Component(service = ProtocolManagerFactory.class)
public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
+ public static final byte ID = 2;
+
private static final String AMQP_PROTOCOL_NAME = "AMQP";
private static final String MODULE_NAME = "artemis-amqp-protocol";
@@ -39,6 +43,16 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
@Override
+ public byte getStoreID() {
+ return ID;
+ }
+
+ @Override
+ public Persister<Message> getPersister() {
+ return AMQPMessagePersister.getInstance();
+ }
+
+ @Override
public ProtocolManager createProtocolManager(ActiveMQServer server,
final Map<String, Object> parameters,
List<BaseInterceptor> incomingInterceptors,