You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/01/13 15:49:41 UTC
[2/8] activemq-artemis git commit: ARTEMIS-27 / ARTEMIS-338 Refactor
Journal Encodings into new package
ARTEMIS-27 / ARTEMIS-338 Refactor Journal Encodings into new package
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9b351d82
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9b351d82
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9b351d82
Branch: refs/heads/master
Commit: 9b351d82368723fc2d549d2ce8f952d9def1136f
Parents: aab09a7
Author: Martyn Taylor <mt...@redhat.com>
Authored: Wed Jan 6 13:50:35 2016 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jan 13 09:38:08 2016 -0500
----------------------------------------------------------------------
.../artemis/cli/commands/tools/PrintData.java | 9 +-
.../cli/commands/tools/XmlDataExporter.java | 8 +-
.../core/journal/PreparedTransactionInfo.java | 24 +-
.../artemis/core/journal/impl/JournalImpl.java | 4 +-
.../impl/dataformat/JournalAddRecord.java | 8 +-
.../persistence/impl/journal/AckDescribe.java | 34 +
.../impl/journal/AddMessageRecord.java | 7 +-
.../impl/journal/DescribeJournal.java | 57 +-
.../impl/journal/DummyOperationContext.java | 63 ++
.../impl/journal/JournalRecordIds.java | 4 +-
.../impl/journal/JournalStorageManager.java | 971 +------------------
.../journal/LargeMessageTXFailureCallback.java | 64 ++
.../TXLargeMessageConfirmationOperation.java | 46 +
.../journal/codec/CursorAckRecordEncoding.java | 61 ++
.../impl/journal/codec/DeleteEncoding.java | 59 ++
.../codec/DeliveryCountUpdateEncoding.java | 57 ++
.../impl/journal/codec/DuplicateIDEncoding.java | 105 ++
.../codec/FinishPageMessageOperation.java | 55 ++
.../impl/journal/codec/GroupingEncoding.java | 75 ++
.../codec/HeuristicCompletionEncoding.java | 58 ++
.../journal/codec/LargeMessageEncoding.java | 52 +
.../journal/codec/PageCountPendingImpl.java | 79 ++
.../impl/journal/codec/PageCountRecord.java | 68 ++
.../impl/journal/codec/PageCountRecordInc.java | 64 ++
.../journal/codec/PageUpdateTXEncoding.java | 64 ++
.../codec/PendingLargeMessageEncoding.java | 60 ++
.../codec/PersistentQueueBindingEncoding.java | 142 +++
.../impl/journal/codec/QueueEncoding.java | 52 +
.../impl/journal/codec/RefEncoding.java | 28 +
.../codec/ScheduledDeliveryEncoding.java | 54 ++
.../impl/journal/codec/XidEncoding.java | 52 +
.../journal/impl/AlignedJournalImplTest.java | 16 +-
.../core/journal/impl/JournalImplTestBase.java | 14 +-
33 files changed, 1521 insertions(+), 993 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
index c148484..cdc1bbf 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
@@ -47,7 +47,8 @@ import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -235,7 +236,7 @@ public class PrintData extends LockAbstract {
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
if (record.userRecordType == JournalRecordIds.ACKNOWLEDGE_CURSOR) {
- JournalStorageManager.CursorAckRecordEncoding encoding = new JournalStorageManager.CursorAckRecordEncoding();
+ CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
Set<PagePosition> set = cursorInfo.getCursorRecords().get(encoding.queueID);
@@ -248,7 +249,7 @@ public class PrintData extends LockAbstract {
set.add(encoding.position);
}
else if (record.userRecordType == JournalRecordIds.PAGE_CURSOR_COMPLETE) {
- JournalStorageManager.CursorAckRecordEncoding encoding = new JournalStorageManager.CursorAckRecordEncoding();
+ CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
Long queueID = Long.valueOf(encoding.queueID);
@@ -260,7 +261,7 @@ public class PrintData extends LockAbstract {
}
else if (record.userRecordType == JournalRecordIds.PAGE_TRANSACTION) {
if (record.isUpdate) {
- JournalStorageManager.PageUpdateTXEncoding pageUpdate = new JournalStorageManager.PageUpdateTXEncoding();
+ PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
pageUpdate.decode(buff);
cursorInfo.getPgTXs().add(pageUpdate.pageTX);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
index 8994c72..2556db0 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
@@ -71,10 +71,10 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.AckDescribe;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PersistentQueueBindingEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java
index 82abc4f..afad7d7 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java
@@ -21,17 +21,33 @@ import java.util.List;
public class PreparedTransactionInfo {
- public final long id;
+ private final long id;
- public final byte[] extraData;
+ private final byte[] extraData;
- public final List<RecordInfo> records = new ArrayList<>();
+ private final List<RecordInfo> records = new ArrayList<RecordInfo>();
- public final List<RecordInfo> recordsToDelete = new ArrayList<>();
+ private final List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
public PreparedTransactionInfo(final long id, final byte[] extraData) {
this.id = id;
this.extraData = extraData;
}
+
+ public long getId() {
+ return id;
+ }
+
+ public byte[] getExtraData() {
+ return extraData;
+ }
+
+ public List<RecordInfo> getRecords() {
+ return records;
+ }
+
+ public List<RecordInfo> getRecordsToDelete() {
+ return recordsToDelete;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 14b6d92..ef6de60 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -1786,9 +1786,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
- info.records.addAll(transaction.recordInfos);
+ info.getRecords().addAll(transaction.recordInfos);
- info.recordsToDelete.addAll(transaction.recordsToDelete);
+ info.getRecordsToDelete().addAll(transaction.recordsToDelete);
loadManager.addPreparedTransaction(info);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
index 69734bc..aa0e961 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
@@ -22,13 +22,13 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
public class JournalAddRecord extends JournalInternalRecord {
- private final long id;
+ protected final long id;
- private final EncodingSupport record;
+ protected final EncodingSupport record;
- private final byte recordType;
+ protected final byte recordType;
- private final boolean add;
+ protected final boolean add;
/**
* @param id
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AckDescribe.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AckDescribe.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AckDescribe.java
new file mode 100644
index 0000000..32e0f4d
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AckDescribe.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal;
+
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
+
+public final class AckDescribe {
+
+ public RefEncoding refEncoding;
+
+ public AckDescribe(RefEncoding refEncoding) {
+ this.refEncoding = refEncoding;
+ }
+
+ @Override
+ public String toString() {
+ return "ACK;" + refEncoding;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java
index 49ac289..fdae483 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java
@@ -26,9 +26,11 @@ public final class AddMessageRecord {
final ServerMessage message;
- long scheduledDeliveryTime;
+ // mtaylor (Added to compile)
+ public long scheduledDeliveryTime;
- int deliveryCount;
+ // mtaylor (Added to compile)
+ public int deliveryCount;
public ServerMessage getMessage() {
return message;
@@ -41,4 +43,5 @@ public final class AddMessageRecord {
public int getDeliveryCount() {
return deliveryCount;
}
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
index d41f0ed..f3ecd76 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
@@ -28,31 +28,30 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
-import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator.IDCounterEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.AckDescribe;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.DeliveryCountUpdateEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.DuplicateIDEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.HeuristicCompletionEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.LargeMessageEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageCountPendingImpl;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageCountRecord;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageCountRecordInc;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PendingLargeMessageEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.RefEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.ScheduledDeliveryEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DuplicateIDEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.HeuristicCompletionEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountPendingImpl;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
@@ -200,15 +199,15 @@ public final class DescribeJournal {
public void checkRecordCounter(RecordInfo info) {
if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) {
PageCountRecord encoding = (PageCountRecord) newObjectEncoding(info);
- long queueIDForCounter = encoding.queueID;
+ long queueIDForCounter = encoding.getQueueID();
PageSubscriptionCounterImpl subsCounter = lookupCounter(counters, queueIDForCounter);
- if (subsCounter.getValue() != 0 && subsCounter.getValue() != encoding.value) {
- out.println("####### Counter replace wrongly on queue " + queueIDForCounter + " oldValue=" + subsCounter.getValue() + " newValue=" + encoding.value);
+ if (subsCounter.getValue() != 0 && subsCounter.getValue() != encoding.getValue()) {
+ out.println("####### Counter replace wrongly on queue " + queueIDForCounter + " oldValue=" + subsCounter.getValue() + " newValue=" + encoding.getValue());
}
- subsCounter.loadValue(info.id, encoding.value);
+ subsCounter.loadValue(info.id, encoding.getValue());
subsCounter.processReload();
out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + ", result=" + subsCounter.getValue());
if (subsCounter.getValue() < 0) {
@@ -221,13 +220,13 @@ public final class DescribeJournal {
}
else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
PageCountRecordInc encoding = (PageCountRecordInc) newObjectEncoding(info);
- long queueIDForCounter = encoding.queueID;
+ long queueIDForCounter = encoding.getQueueID();
PageSubscriptionCounterImpl subsCounter = lookupCounter(counters, queueIDForCounter);
- subsCounter.loadInc(info.id, encoding.value);
+ subsCounter.loadInc(info.id, encoding.getValue());
subsCounter.processReload();
- out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " increased by " + encoding.value);
+ out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " increased by " + encoding.getValue());
if (subsCounter.getValue() < 0) {
out.println(" #NegativeCounter!!!!");
}
@@ -311,20 +310,20 @@ public final class DescribeJournal {
}
else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) {
PageCountRecord encoding = (PageCountRecord) o;
- queueIDForCounter = encoding.queueID;
+ queueIDForCounter = encoding.getQueueID();
subsCounter = lookupCounter(counters, queueIDForCounter);
- subsCounter.loadValue(info.id, encoding.value);
+ subsCounter.loadValue(info.id, encoding.getValue());
subsCounter.processReload();
}
else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
PageCountRecordInc encoding = (PageCountRecordInc) o;
- queueIDForCounter = encoding.queueID;
+ queueIDForCounter = encoding.getQueueID();
subsCounter = lookupCounter(counters, queueIDForCounter);
- subsCounter.loadInc(info.id, encoding.value);
+ subsCounter.loadInc(info.id, encoding.getValue());
subsCounter.processReload();
}
@@ -345,8 +344,8 @@ public final class DescribeJournal {
out.println("### Prepared TX ###");
for (PreparedTransactionInfo tx : preparedTransactions) {
- out.println(tx.id);
- for (RecordInfo info : tx.records) {
+ out.println(tx.getId());
+ for (RecordInfo info : tx.getRecords()) {
Object o = newObjectEncoding(info);
out.println("- " + describeRecord(info, o));
if (info.getUserRecordType() == 31) {
@@ -365,7 +364,7 @@ public final class DescribeJournal {
}
}
- for (RecordInfo info : tx.recordsToDelete) {
+ for (RecordInfo info : tx.getRecordsToDelete()) {
out.println("- " + describeRecord(info) + " <marked to delete>");
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
new file mode 100644
index 0000000..194d2b1
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal;
+
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+
+final class DummyOperationContext implements OperationContext {
+
+ private static DummyOperationContext instance = new DummyOperationContext();
+
+ public static OperationContext getInstance() {
+ return DummyOperationContext.instance;
+ }
+
+ public void executeOnCompletion(final IOCallback runnable) {
+ // There are no executeOnCompletion calls while using the DummyOperationContext
+ // However we keep the code here for correctness
+ runnable.done();
+ }
+
+ public void replicationDone() {
+ }
+
+ public void replicationLineUp() {
+ }
+
+ public void storeLineUp() {
+ }
+
+ public void done() {
+ }
+
+ public void onError(final int errorCode, final String errorMessage) {
+ }
+
+ public void waitCompletion() {
+ }
+
+ public boolean waitCompletion(final long timeout) {
+ return true;
+ }
+
+ public void pageSyncLineUp() {
+ }
+
+ public void pageSyncDone() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
index 5b12345..0242b50 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
@@ -26,7 +26,9 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
public final class JournalRecordIds {
// grouping journal record type
- static final byte GROUP_RECORD = 20;
+
+ // mtaylor Added to compile
+ public static final byte GROUP_RECORD = 20;
// BindingsImpl journal record type
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 390e742..c272a12 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -27,12 +27,10 @@ import java.security.MessageDigest;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -62,12 +60,10 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
@@ -78,7 +74,6 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl;
-import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -87,6 +82,23 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeleteEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DuplicateIDEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.FinishPageMessageOperation;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.GroupingEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.HeuristicCompletionEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountPendingImpl;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.XidEncoding;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
@@ -106,22 +118,14 @@ import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.Transaction.State;
-import org.apache.activemq.artemis.core.transaction.TransactionOperation;
-import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Base64;
-import org.apache.activemq.artemis.utils.ByteUtil;
-import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.UUID;
-import org.apache.activemq.artemis.utils.XidCodecSupport;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
@@ -143,16 +147,6 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
*/
public class JournalStorageManager implements StorageManager {
- private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
-
- private final Semaphore pageMaxConcurrentIO;
-
- private final BatchingIDGenerator idGenerator;
-
- private final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
-
- private ReplicationManager replicator;
-
public enum JournalContent {
BINDINGS((byte) 0), MESSAGES((byte) 1);
@@ -171,6 +165,16 @@ public class JournalStorageManager implements StorageManager {
}
}
+ private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
+
+ private final Semaphore pageMaxConcurrentIO;
+
+ private final BatchingIDGenerator idGenerator;
+
+ private final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
+
+ private ReplicationManager replicator;
+
private final SequentialFileFactory journalFF;
private Journal messageJournal;
@@ -1527,13 +1531,13 @@ public class JournalStorageManager implements StorageManager {
encoding.decode(buff);
- PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+ PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
- sub.getCounter().loadValue(record.id, encoding.value);
+ sub.getCounter().loadValue(record.id, encoding.getValue());
}
else {
- ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.queueID);
+ ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID());
messageJournal.appendDeleteRecord(record.id, false);
}
@@ -1545,13 +1549,13 @@ public class JournalStorageManager implements StorageManager {
encoding.decode(buff);
- PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+ PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
- sub.getCounter().loadInc(record.id, encoding.value);
+ sub.getCounter().loadInc(record.id, encoding.getValue());
}
else {
- ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.queueID);
+ ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID());
messageJournal.appendDeleteRecord(record.id, false);
}
@@ -2024,7 +2028,7 @@ public class JournalStorageManager implements StorageManager {
// Package protected ---------------------------------------------
- private void confirmLargeMessage(final LargeServerMessage largeServerMessage) {
+ protected void confirmLargeMessage(final LargeServerMessage largeServerMessage) {
if (largeServerMessage.getPendingRecordID() >= 0) {
try {
confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
@@ -2138,7 +2142,7 @@ public class JournalStorageManager implements StorageManager {
* @return
* @throws Exception
*/
- private LargeServerMessage parseLargeMessage(final Map<Long, ServerMessage> messages,
+ protected LargeServerMessage parseLargeMessage(final Map<Long, ServerMessage> messages,
final ActiveMQBuffer buff) throws Exception {
LargeServerMessage largeMessage = createLargeMessage();
@@ -2177,11 +2181,11 @@ public class JournalStorageManager implements StorageManager {
JournalLoader journalLoader) throws Exception {
// recover prepared transactions
for (PreparedTransactionInfo preparedTransaction : preparedTransactions) {
- XidEncoding encodingXid = new XidEncoding(preparedTransaction.extraData);
+ XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData());
Xid xid = encodingXid.xid;
- Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this);
+ Transaction tx = new TransactionImpl(preparedTransaction.getId(), xid, this);
List<MessageReference> referencesToAck = new ArrayList<>();
@@ -2191,7 +2195,7 @@ public class JournalStorageManager implements StorageManager {
// Then have reacknowledge(tx) methods on queue, which needs to add the page size
// first get any sent messages for this tx and recreate
- for (RecordInfo record : preparedTransaction.records) {
+ for (RecordInfo record : preparedTransaction.getRecords()) {
byte[] data = record.data;
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
@@ -2310,14 +2314,14 @@ public class JournalStorageManager implements StorageManager {
encoding.decode(buff);
- PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+ PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
- sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.value);
+ sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue());
sub.notEmpty();
}
else {
- ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID);
+ ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID());
}
break;
@@ -2329,7 +2333,7 @@ public class JournalStorageManager implements StorageManager {
}
}
- for (RecordInfo recordDeleted : preparedTransaction.recordsToDelete) {
+ for (RecordInfo recordDeleted : preparedTransaction.getRecordsToDelete()) {
byte[] data = recordDeleted.data;
if (data.length > 0) {
@@ -2431,879 +2435,7 @@ public class JournalStorageManager implements StorageManager {
}
}
- /**
- * It's public as other classes may want to unparse data on tools
- */
- public static class XidEncoding implements EncodingSupport {
-
- public final Xid xid;
-
- XidEncoding(final Xid xid) {
- this.xid = xid;
- }
-
- XidEncoding(final byte[] data) {
- xid = XidCodecSupport.decodeXid(ActiveMQBuffers.wrappedBuffer(data));
- }
-
- @Override
- public void decode(final ActiveMQBuffer buffer) {
- throw new IllegalStateException("Non Supported Operation");
- }
-
- @Override
- public void encode(final ActiveMQBuffer buffer) {
- XidCodecSupport.encodeXid(xid, buffer);
- }
-
- @Override
- public int getEncodeSize() {
- return XidCodecSupport.getXidEncodeLength(xid);
- }
- }
-
- protected static class HeuristicCompletionEncoding implements EncodingSupport {
-
- public Xid xid;
-
- public boolean isCommit;
-
- @Override
- public String toString() {
- return "HeuristicCompletionEncoding [xid=" + xid + ", isCommit=" + isCommit + "]";
- }
-
- HeuristicCompletionEncoding(final Xid xid, final boolean isCommit) {
- this.xid = xid;
- this.isCommit = isCommit;
- }
-
- HeuristicCompletionEncoding() {
- }
-
- @Override
- public void decode(final ActiveMQBuffer buffer) {
- xid = XidCodecSupport.decodeXid(buffer);
- isCommit = buffer.readBoolean();
- }
-
- @Override
- public void encode(final ActiveMQBuffer buffer) {
- XidCodecSupport.encodeXid(xid, buffer);
- buffer.writeBoolean(isCommit);
- }
-
- @Override
- public int getEncodeSize() {
- return XidCodecSupport.getXidEncodeLength(xid) + DataConstants.SIZE_BOOLEAN;
- }
- }
-
- private static class GroupingEncoding implements EncodingSupport, GroupingInfo {
-
- public long id;
-
- public SimpleString groupId;
-
- public SimpleString clusterName;
-
- public GroupingEncoding(final long id, final SimpleString groupId, final SimpleString clusterName) {
- this.id = id;
- this.groupId = groupId;
- this.clusterName = clusterName;
- }
-
- public GroupingEncoding() {
- }
-
- @Override
- public int getEncodeSize() {
- return SimpleString.sizeofString(groupId) + SimpleString.sizeofString(clusterName);
- }
-
- @Override
- public void encode(final ActiveMQBuffer buffer) {
- buffer.writeSimpleString(groupId);
- buffer.writeSimpleString(clusterName);
- }
-
- @Override
- public void decode(final ActiveMQBuffer buffer) {
- groupId = buffer.readSimpleString();
- clusterName = buffer.readSimpleString();
- }
-
- @Override
- public long getId() {
- return id;
- }
-
- public void setId(final long id) {
- this.id = id;
- }
-
- @Override
- public SimpleString getGroupId() {
- return groupId;
- }
-
- @Override
- public SimpleString getClusterName() {
- return clusterName;
- }
-
- @Override
- public String toString() {
- return "GroupingEncoding [id=" + id + ", groupId=" + groupId + ", clusterName=" + clusterName + "]";
- }
- }
-
- public static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo {
-
- public long id;
-
- public SimpleString name;
-
- public SimpleString address;
-
- public SimpleString filterString;
-
- public boolean autoCreated;
-
- public SimpleString user;
-
- public PersistentQueueBindingEncoding() {
- }
-
- @Override
- public String toString() {
- return "PersistentQueueBindingEncoding [id=" + id +
- ", name=" +
- name +
- ", address=" +
- address +
- ", filterString=" +
- filterString +
- ", user=" +
- user +
- ", autoCreated=" +
- autoCreated +
- "]";
- }
-
- public PersistentQueueBindingEncoding(final SimpleString name,
- final SimpleString address,
- final SimpleString filterString,
- final SimpleString user,
- final boolean autoCreated) {
- this.name = name;
- this.address = address;
- this.filterString = filterString;
- this.user = user;
- this.autoCreated = autoCreated;
- }
-
- @Override
- public long getId() {
- return id;
- }
-
- public void setId(final long id) {
- this.id = id;
- }
-
- @Override
- public SimpleString getAddress() {
- return address;
- }
-
- @Override
- public void replaceQueueName(SimpleString newName) {
- this.name = newName;
- }
-
- @Override
- public SimpleString getFilterString() {
- return filterString;
- }
-
- @Override
- public SimpleString getQueueName() {
- return name;
- }
-
- @Override
- public SimpleString getUser() {
- return user;
- }
-
- @Override
- public boolean isAutoCreated() {
- return autoCreated;
- }
-
- @Override
- public void decode(final ActiveMQBuffer buffer) {
- name = buffer.readSimpleString();
- address = buffer.readSimpleString();
- filterString = buffer.readNullableSimpleString();
-
- String metadata = buffer.readNullableSimpleString().toString();
- if (metadata != null) {
- String[] elements = metadata.split(";");
- for (String element : elements) {
- String[] keyValuePair = element.split("=");
- if (keyValuePair.length == 2) {
- if (keyValuePair[0].equals("user")) {
- user = SimpleString.toSimpleString(keyValuePair[1]);
- }
- }
- }
- }
-
- autoCreated = buffer.readBoolean();
- }
-
- @Override
- public void encode(final ActiveMQBuffer buffer) {
- buffer.writeSimpleString(name);
- buffer.writeSimpleString(address);
- buffer.writeNullableSimpleString(filterString);
- buffer.writeNullableSimpleString(createMetadata());
- buffer.writeBoolean(autoCreated);
- }
-
- @Override
- public int getEncodeSize() {
- return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
- SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN +
- SimpleString.sizeofNullableString(createMetadata());
- }
-
- private SimpleString createMetadata() {
- StringBuilder metadata = new StringBuilder();
- metadata.append("user=").append(user).append(";");
- return SimpleString.toSimpleString(metadata.toString());
- }
- }
-
- public static class LargeMessageEncoding implements EncodingSupport {
-
- public final LargeServerMessage message;
-
- public LargeMessageEncoding(final LargeServerMessage message) {
- this.message = message;
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
- */
- @Override
- public void decode(final ActiveMQBuffer buffer) {
- message.decodeHeadersAndProperties(buffer);
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
- */
- @Override
- public void encode(final ActiveMQBuffer buffer) {
- message.encode(buffer);
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
- */
- @Override
- public int getEncodeSize() {
- return message.getEncodeSize();
- }
-
- }
-
- public static class PendingLargeMessageEncoding implements EncodingSupport {
-
- public long largeMessageID;
-
- public PendingLargeMessageEncoding(final long pendingLargeMessageID) {
- this.largeMessageID = pendingLargeMessageID;
- }
-
- public PendingLargeMessageEncoding() {
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
- */
- @Override
- public void decode(final ActiveMQBuffer buffer) {
- largeMessageID = buffer.readLong();
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
- */
- @Override
- public void encode(final ActiveMQBuffer buffer) {
- buffer.writeLong(largeMessageID);
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
- */
- @Override
- public int getEncodeSize() {
- return DataConstants.SIZE_LONG;
- }
-
- @Override
- public String toString() {
- return "PendingLargeMessageEncoding::MessageID=" + largeMessageID;
- }
-
- }
-
- public static class DeliveryCountUpdateEncoding implements EncodingSupport {
-
- public long queueID;
-
- public int count;
-
- public DeliveryCountUpdateEncoding() {
- super();
- }
-
- public DeliveryCountUpdateEncoding(final long queueID, final int count) {
- super();
- this.queueID = queueID;
- this.count = count;
- }
-
- @Override
- public void decode(final ActiveMQBuffer buffer) {
- queueID = buffer.readLong();
- count = buffer.readInt();
- }
-
- @Override
- public void encode(final ActiveMQBuffer buffer) {
- buffer.writeLong(queueID);
- buffer.writeInt(count);
- }
-
- @Override
- public int getEncodeSize() {
- return 8 + 4;
- }
-
- @Override
- public String toString() {
- return "DeliveryCountUpdateEncoding [queueID=" + queueID + ", count=" + count + "]";
- }
-
- }
-
- public static class QueueEncoding implements EncodingSupport {
-
- public long queueID;
-
- public QueueEncoding(final long queueID) {
- super();
- this.queueID = queueID;
- }
-
- public QueueEncoding() {
- super();
- }
-
- @Override
- public void decode(final ActiveMQBuffer buffer) {
- queueID = buffer.readLong();
- }
-
- @Override
- public void encode(final ActiveMQBuffer buffer) {
- buffer.writeLong(queueID);
- }
-
- @Override
- public int getEncodeSize() {
- return 8;
- }
-
- @Override
- public String toString() {
- return "QueueEncoding [queueID=" + queueID + "]";
- }
-
- }
-
- private static class DeleteEncoding implements EncodingSupport {
-
- public byte recordType;
-
- public long id;
-
- public DeleteEncoding(final byte recordType, final long id) {
- this.recordType = recordType;
- this.id = id;
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
- */
- @Override
- public int getEncodeSize() {
- return DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)
- */
- @Override
- public void encode(ActiveMQBuffer buffer) {
- buffer.writeByte(recordType);
- buffer.writeLong(id);
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)
- */
- @Override
- public void decode(ActiveMQBuffer buffer) {
- recordType = buffer.readByte();
- id = buffer.readLong();
- }
- }
-
- public static class RefEncoding extends QueueEncoding {
-
- public RefEncoding() {
- super();
- }
-
- public RefEncoding(final long queueID) {
- super(queueID);
- }
- }
-
- public static class PageUpdateTXEncoding implements EncodingSupport {
-
- public long pageTX;
-
- public int recods;
-
- @Override
- public String toString() {
- return "PageUpdateTXEncoding [pageTX=" + pageTX + ", recods=" + recods + "]";
- }
-
- public PageUpdateTXEncoding() {
- }
-
- public PageUpdateTXEncoding(final long pageTX, final int records) {
- this.pageTX = pageTX;
- this.recods = records;
- }
-
- @Override
- public void decode(ActiveMQBuffer buffer) {
- this.pageTX = buffer.readLong();
- this.recods = buffer.readInt();
- }
-
- @Override
- public void encode(ActiveMQBuffer buffer) {
- buffer.writeLong(pageTX);
- buffer.writeInt(recods);
- }
-
- @Override
- public int getEncodeSize() {
- return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
- }
-
- public List<MessageReference> getRelatedMessageReferences() {
- return null;
- }
- }
-
- protected static class ScheduledDeliveryEncoding extends QueueEncoding {
-
- long scheduledDeliveryTime;
-
- @Override
- public String toString() {
- return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "(" + new Date(scheduledDeliveryTime) + ")]";
- }
-
- private ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long queueID) {
- super(queueID);
- this.scheduledDeliveryTime = scheduledDeliveryTime;
- }
-
- public ScheduledDeliveryEncoding() {
- }
-
- @Override
- public int getEncodeSize() {
- return super.getEncodeSize() + 8;
- }
-
- @Override
- public void encode(final ActiveMQBuffer buffer) {
- super.encode(buffer);
- buffer.writeLong(scheduledDeliveryTime);
- }
-
- @Override
- public void decode(final ActiveMQBuffer buffer) {
- super.decode(buffer);
- scheduledDeliveryTime = buffer.readLong();
- }
- }
-
- public static class DuplicateIDEncoding implements EncodingSupport {
-
- SimpleString address;
-
- byte[] duplID;
-
- public DuplicateIDEncoding(final SimpleString address, final byte[] duplID) {
- this.address = address;
-
- this.duplID = duplID;
- }
-
- public DuplicateIDEncoding() {
- }
-
- @Override
- public void decode(final ActiveMQBuffer buffer) {
- address = buffer.readSimpleString();
-
- int size = buffer.readInt();
-
- duplID = new byte[size];
-
- buffer.readBytes(duplID);
- }
-
- @Override
- public void encode(final ActiveMQBuffer buffer) {
- buffer.writeSimpleString(address);
-
- buffer.writeInt(duplID.length);
-
- buffer.writeBytes(duplID);
- }
-
- @Override
- public int getEncodeSize() {
- return SimpleString.sizeofString(address) + DataConstants.SIZE_INT + duplID.length;
- }
-
- @Override
- public String toString() {
- // this would be useful when testing. Most tests on the testsuite will use a SimpleString on the duplicate ID
- // and this may be useful to validate the journal on those tests
- // You may uncomment these two lines on that case and replcate the toString for the PrintData
-
- // SimpleString simpleStr = new SimpleString(duplID);
- // return "DuplicateIDEncoding [address=" + address + ", duplID=" + simpleStr + "]";
-
- String bridgeRepresentation = null;
-
- // The bridge will generate IDs on these terms:
- // This will make them easier to read
- if (address.toString().startsWith("BRIDGE") && duplID.length == 24) {
- try {
- ByteBuffer buff = ByteBuffer.wrap(duplID);
-
- // 16 for UUID
- byte[] bytesUUID = new byte[16];
-
- buff.get(bytesUUID);
-
- UUID uuid = new UUID(UUID.TYPE_TIME_BASED, bytesUUID);
-
- long id = buff.getLong();
- bridgeRepresentation = "nodeUUID=" + uuid.toString() + " messageID=" + id;
- }
- catch (Throwable ignored) {
- bridgeRepresentation = null;
- }
- }
-
- if (bridgeRepresentation != null) {
- return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + " / " +
- bridgeRepresentation + "]";
- }
- else {
- return "DuplicateIDEncoding [address=" + address + ",str=" + ByteUtil.toSimpleString(duplID) + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]";
- }
- }
- }
-
- /**
- * This is only used when loading a transaction.
- * <p>
- * it might be possible to merge the functionality of this class with
- * {@link org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.FinishPageMessageOperation}
- */
- // TODO: merge this class with the one on the PagingStoreImpl
- private static class FinishPageMessageOperation extends TransactionOperationAbstract implements TransactionOperation {
-
- @Override
- public void afterCommit(final Transaction tx) {
- // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
- // transaction until all the messages were added to the queue
- // or else we could deliver the messages out of order
-
- PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-
- if (pageTransaction != null) {
- pageTransaction.commit();
- }
- }
-
- @Override
- public void afterRollback(final Transaction tx) {
- PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-
- if (tx.getState() == State.PREPARED && pageTransaction != null) {
- pageTransaction.rollback();
- }
- }
- }
-
- protected static final class PageCountRecord implements EncodingSupport {
-
- @Override
- public String toString() {
- return "PageCountRecord [queueID=" + queueID + ", value=" + value + "]";
- }
-
- PageCountRecord() {
-
- }
-
- PageCountRecord(long queueID, long value) {
- this.queueID = queueID;
- this.value = value;
- }
-
- long queueID;
-
- long value;
-
- @Override
- public int getEncodeSize() {
- return DataConstants.SIZE_LONG * 2;
- }
-
- @Override
- public void encode(ActiveMQBuffer buffer) {
- buffer.writeLong(queueID);
- buffer.writeLong(value);
- }
-
- @Override
- public void decode(ActiveMQBuffer buffer) {
- queueID = buffer.readLong();
- value = buffer.readLong();
- }
-
- }
-
- protected static final class PageCountPendingImpl implements EncodingSupport, PageCountPending {
-
- @Override
- public String toString() {
- return "PageCountPending [queueID=" + queueID + ", pageID=" + pageID + "]";
- }
-
- PageCountPendingImpl() {
-
- }
-
- PageCountPendingImpl(long queueID, long pageID, int inc) {
- this.queueID = queueID;
- this.pageID = pageID;
- }
-
- long id;
-
- long queueID;
-
- long pageID;
-
- public void setID(long id) {
- this.id = id;
- }
-
- @Override
- public long getID() {
- return id;
- }
-
- @Override
- public long getQueueID() {
- return queueID;
- }
-
- @Override
- public long getPageID() {
- return pageID;
- }
-
- @Override
- public int getEncodeSize() {
- return DataConstants.SIZE_LONG * 2;
- }
-
- @Override
- public void encode(ActiveMQBuffer buffer) {
- buffer.writeLong(queueID);
- buffer.writeLong(pageID);
- }
-
- @Override
- public void decode(ActiveMQBuffer buffer) {
- queueID = buffer.readLong();
- pageID = buffer.readLong();
- }
-
- }
-
- protected static final class PageCountRecordInc implements EncodingSupport {
-
- @Override
- public String toString() {
- return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + "]";
- }
-
- PageCountRecordInc() {
-
- }
-
- PageCountRecordInc(long queueID, int value) {
- this.queueID = queueID;
- this.value = value;
- }
-
- long queueID;
-
- int value;
-
- @Override
- public int getEncodeSize() {
- return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
- }
-
- @Override
- public void encode(ActiveMQBuffer buffer) {
- buffer.writeLong(queueID);
- buffer.writeInt(value);
- }
-
- @Override
- public void decode(ActiveMQBuffer buffer) {
- queueID = buffer.readLong();
- value = buffer.readInt();
- }
-
- }
-
- public static class CursorAckRecordEncoding implements EncodingSupport {
-
- public CursorAckRecordEncoding(final long queueID, final PagePosition position) {
- this.queueID = queueID;
- this.position = position;
- }
-
- public CursorAckRecordEncoding() {
- this.position = new PagePositionImpl();
- }
-
- @Override
- public String toString() {
- return "CursorAckRecordEncoding [queueID=" + queueID + ", position=" + position + "]";
- }
-
- public long queueID;
-
- public PagePosition position;
-
- @Override
- public int getEncodeSize() {
- return DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
- }
-
- @Override
- public void encode(ActiveMQBuffer buffer) {
- buffer.writeLong(queueID);
- buffer.writeLong(position.getPageNr());
- buffer.writeInt(position.getMessageNr());
- }
-
- @Override
- public void decode(ActiveMQBuffer buffer) {
- queueID = buffer.readLong();
- long pageNR = buffer.readLong();
- int messageNR = buffer.readInt();
- this.position = new PagePositionImpl(pageNR, messageNR);
- }
- }
-
- private class LargeMessageTXFailureCallback implements TransactionFailureCallback {
-
- private final Map<Long, ServerMessage> messages;
-
- public LargeMessageTXFailureCallback(final Map<Long, ServerMessage> messages) {
- super();
- this.messages = messages;
- }
-
- @Override
- public void failedTransaction(final long transactionID,
- final List<RecordInfo> records,
- final List<RecordInfo> recordsToDelete) {
- for (RecordInfo record : records) {
- if (record.userRecordType == ADD_LARGE_MESSAGE) {
- byte[] data = record.data;
-
- ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
-
- try {
- LargeServerMessage serverMessage = parseLargeMessage(messages, buff);
- serverMessage.decrementDelayDeletionCount();
- }
- catch (Exception e) {
- ActiveMQServerLogger.LOGGER.journalError(e);
- }
- }
- }
- }
-
- }
-
- public static final class AckDescribe {
-
- public RefEncoding refEncoding;
-
- public AckDescribe(RefEncoding refEncoding) {
- this.refEncoding = refEncoding;
- }
-
- @Override
- public String toString() {
- return "ACK;" + refEncoding;
- }
-
- }
-
- /**
+ /*
* @param id
* @param buffer
* @return
@@ -3379,21 +2511,4 @@ public class JournalStorageManager implements StorageManager {
}
txoper.confirmedMessages.add(recordID);
}
-
- final class TXLargeMessageConfirmationOperation extends TransactionOperationAbstract {
-
- public List<Long> confirmedMessages = new LinkedList<>();
-
- @Override
- public void afterRollback(Transaction tx) {
- for (Long msg : confirmedMessages) {
- try {
- JournalStorageManager.this.confirmPendingLargeMessage(msg);
- }
- catch (Throwable e) {
- ActiveMQServerLogger.LOGGER.journalErrorConfirmingLargeMessage(e, msg);
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java
new file mode 100644
index 0000000..c2133d9
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
+
+public class LargeMessageTXFailureCallback implements TransactionFailureCallback {
+
+ private AbstractJournalStorageManager journalStorageManager;
+ private final Map<Long, ServerMessage> messages;
+
+ public LargeMessageTXFailureCallback(AbstractJournalStorageManager journalStorageManager,
+ final Map<Long, ServerMessage> messages) {
+ super();
+ this.journalStorageManager = journalStorageManager;
+ this.messages = messages;
+ }
+
+ public void failedTransaction(final long transactionID,
+ final List<RecordInfo> records,
+ final List<RecordInfo> recordsToDelete) {
+ for (RecordInfo record : records) {
+ if (record.userRecordType == ADD_LARGE_MESSAGE) {
+ byte[] data = record.data;
+
+ ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
+
+ try {
+ LargeServerMessage serverMessage = journalStorageManager.parseLargeMessage(messages, buff);
+ serverMessage.decrementDelayDeletionCount();
+ }
+ catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.journalError(e);
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java
new file mode 100644
index 0000000..361477a
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+
+public final class TXLargeMessageConfirmationOperation extends TransactionOperationAbstract {
+
+ private AbstractJournalStorageManager journalStorageManager;
+ public List<Long> confirmedMessages = new LinkedList<Long>();
+
+ public TXLargeMessageConfirmationOperation(AbstractJournalStorageManager journalStorageManager) {
+ this.journalStorageManager = journalStorageManager;
+ }
+
+ @Override
+ public void afterRollback(Transaction tx) {
+ for (Long msg : confirmedMessages) {
+ try {
+ journalStorageManager.confirmPendingLargeMessage(msg);
+ }
+ catch (Throwable e) {
+ ActiveMQServerLogger.LOGGER.journalErrorConfirmingLargeMessage(e, msg);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/CursorAckRecordEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/CursorAckRecordEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/CursorAckRecordEncoding.java
new file mode 100644
index 0000000..4e1fada
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/CursorAckRecordEncoding.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class CursorAckRecordEncoding implements EncodingSupport {
+
+ public CursorAckRecordEncoding(final long queueID, final PagePosition position) {
+ this.queueID = queueID;
+ this.position = position;
+ }
+
+ public CursorAckRecordEncoding() {
+ this.position = new PagePositionImpl();
+ }
+
+ @Override
+ public String toString() {
+ return "CursorAckRecordEncoding [queueID=" + queueID + ", position=" + position + "]";
+ }
+
+ public long queueID;
+
+ public PagePosition position;
+
+ public int getEncodeSize() {
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+ }
+
+ public void encode(ActiveMQBuffer buffer) {
+ buffer.writeLong(queueID);
+ buffer.writeLong(position.getPageNr());
+ buffer.writeInt(position.getMessageNr());
+ }
+
+ public void decode(ActiveMQBuffer buffer) {
+ queueID = buffer.readLong();
+ long pageNR = buffer.readLong();
+ int messageNR = buffer.readInt();
+ this.position = new PagePositionImpl(pageNR, messageNR);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeleteEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeleteEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeleteEncoding.java
new file mode 100644
index 0000000..3001edf
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeleteEncoding.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class DeleteEncoding implements EncodingSupport {
+
+ public byte recordType;
+
+ public long id;
+
+ public DeleteEncoding(final byte recordType, final long id) {
+ this.recordType = recordType;
+ this.id = id;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
+ */
+ @Override
+ public int getEncodeSize() {
+ return DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)
+ */
+ @Override
+ public void encode(ActiveMQBuffer buffer) {
+ buffer.writeByte(recordType);
+ buffer.writeLong(id);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)
+ */
+ @Override
+ public void decode(ActiveMQBuffer buffer) {
+ recordType = buffer.readByte();
+ id = buffer.readLong();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeliveryCountUpdateEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeliveryCountUpdateEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeliveryCountUpdateEncoding.java
new file mode 100644
index 0000000..5c4541d
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeliveryCountUpdateEncoding.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+
+public class DeliveryCountUpdateEncoding implements EncodingSupport {
+
+ public long queueID;
+
+ public int count;
+
+ public DeliveryCountUpdateEncoding() {
+ super();
+ }
+
+ public DeliveryCountUpdateEncoding(final long queueID, final int count) {
+ super();
+ this.queueID = queueID;
+ this.count = count;
+ }
+
+ public void decode(final ActiveMQBuffer buffer) {
+ queueID = buffer.readLong();
+ count = buffer.readInt();
+ }
+
+ public void encode(final ActiveMQBuffer buffer) {
+ buffer.writeLong(queueID);
+ buffer.writeInt(count);
+ }
+
+ public int getEncodeSize() {
+ return 8 + 4;
+ }
+
+ @Override
+ public String toString() {
+ return "DeliveryCountUpdateEncoding [queueID=" + queueID + ", count=" + count + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DuplicateIDEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DuplicateIDEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DuplicateIDEncoding.java
new file mode 100644
index 0000000..49d8dc8
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DuplicateIDEncoding.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
+
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.UUID;
+
+public class DuplicateIDEncoding implements EncodingSupport {
+
+ public SimpleString address;
+
+ public byte[] duplID;
+
+ public DuplicateIDEncoding(final SimpleString address, final byte[] duplID) {
+ this.address = address;
+
+ this.duplID = duplID;
+ }
+
+ public DuplicateIDEncoding() {
+ }
+
+ public void decode(final ActiveMQBuffer buffer) {
+ address = buffer.readSimpleString();
+
+ int size = buffer.readInt();
+
+ duplID = new byte[size];
+
+ buffer.readBytes(duplID);
+ }
+
+ public void encode(final ActiveMQBuffer buffer) {
+ buffer.writeSimpleString(address);
+
+ buffer.writeInt(duplID.length);
+
+ buffer.writeBytes(duplID);
+ }
+
+ public int getEncodeSize() {
+ return SimpleString.sizeofString(address) + DataConstants.SIZE_INT + duplID.length;
+ }
+
+ @Override
+ public String toString() {
+ // this would be useful when testing. Most tests on the testsuite will use a SimpleString on the duplicate ID
+ // and this may be useful to validate the journal on those tests
+ // You may uncomment these two lines on that case and replcate the toString for the PrintData
+
+ // SimpleString simpleStr = new SimpleString(duplID);
+ // return "DuplicateIDEncoding [address=" + address + ", duplID=" + simpleStr + "]";
+
+ String bridgeRepresentation = null;
+
+ // The bridge will generate IDs on these terms:
+ // This will make them easier to read
+ if (address.toString().startsWith("BRIDGE") && duplID.length == 24) {
+ try {
+ ByteBuffer buff = ByteBuffer.wrap(duplID);
+
+ // 16 for UUID
+ byte[] bytesUUID = new byte[16];
+
+ buff.get(bytesUUID);
+
+ UUID uuid = new UUID(UUID.TYPE_TIME_BASED, bytesUUID);
+
+ long id = buff.getLong();
+ bridgeRepresentation = "nodeUUID=" + uuid.toString() + " messageID=" + id;
+ }
+ catch (Throwable ignored) {
+ bridgeRepresentation = null;
+ }
+ }
+
+ if (bridgeRepresentation != null) {
+ return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + " / " +
+ bridgeRepresentation + "]";
+ }
+ else {
+ return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/FinishPageMessageOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/FinishPageMessageOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/FinishPageMessageOperation.java
new file mode 100644
index 0000000..d741e95
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/FinishPageMessageOperation.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
+
+import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperation;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
+
+/**
+ * This is only used when loading a transaction.
+ * <p>
+ * it might be possible to merge the functionality of this class with
+ * {@link FinishPageMessageOperation}
+ */
+// TODO: merge this class with the one on the PagingStoreImpl
+public class FinishPageMessageOperation extends TransactionOperationAbstract implements TransactionOperation {
+
+ @Override
+ public void afterCommit(final Transaction tx) {
+ // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
+ // transaction until all the messages were added to the queue
+ // or else we could deliver the messages out of order
+
+ PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+ if (pageTransaction != null) {
+ pageTransaction.commit();
+ }
+ }
+
+ @Override
+ public void afterRollback(final Transaction tx) {
+ PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+ if (tx.getState() == Transaction.State.PREPARED && pageTransaction != null) {
+ pageTransaction.rollback();
+ }
+ }
+}