You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/11/07 16:36:39 UTC
[19/50] [abbrv] activemq-artemis git commit: actual persistence work
actual persistence work
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4e378c16
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4e378c16
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4e378c16
Branch: refs/heads/ARTEMIS-780
Commit: 4e378c16425245eb49fa653520a99cb6c6281812
Parents: 892bea4
Author: jbertram <jb...@apache.com>
Authored: Fri Oct 21 19:58:01 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Nov 7 11:28:07 2016 -0500
----------------------------------------------------------------------
.../core/persistence/AddressBindingInfo.java | 4 --
.../core/persistence/StorageManager.java | 8 +++-
.../journal/AbstractJournalStorageManager.java | 43 +++++++++++++++--
.../impl/journal/DescribeJournal.java | 2 +-
.../impl/journal/JournalRecordIds.java | 2 +
.../codec/PersistentAddressBindingEncoding.java | 49 +------------------
.../impl/nullpm/NullStorageManager.java | 13 ++++-
.../core/server/impl/ActiveMQServerImpl.java | 20 +++++++-
.../artemis/core/server/impl/AddressInfo.java | 9 ++--
.../artemis/core/server/impl/JournalLoader.java | 4 ++
.../server/impl/PostOfficeJournalLoader.java | 16 +++++++
.../transaction/impl/TransactionImplTest.java | 15 +++++-
.../addressing/AddressConfigTest.java | 50 ++++++++++++++++++++
.../DeleteMessagesOnStartupTest.java | 3 +-
.../integration/persistence/RestartSMTest.java | 5 +-
.../persistence/StorageManagerTestBase.java | 3 +-
.../impl/DuplicateDetectionUnitTest.java | 7 +--
.../server/impl/fakes/FakeJournalLoader.java | 6 +++
18 files changed, 188 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java
index 83d37bc..838be12 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java
@@ -25,10 +25,6 @@ public interface AddressBindingInfo {
SimpleString getName();
- boolean isAutoCreated();
-
- SimpleString getUser();
-
AddressInfo.RoutingType getRoutingType();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index bbfec14..ee11577 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -298,8 +299,13 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
void deleteQueueStatus(long recordID) throws Exception;
+ void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception;
+
+ void deleteAddressBinding(long tx, long addressBindingID) throws Exception;
+
JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
- List<GroupingInfo> groupingInfos) throws Exception;
+ List<GroupingInfo> groupingInfos,
+ List<AddressBindingInfo> addressBindingInfos) throws Exception;
// grouping related operations
void addGrouping(GroupBinding groupBinding) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index ecaa86e..b67cfa6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -56,6 +56,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
+import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
@@ -77,6 +78,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCount
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentAddressBindingEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
@@ -93,6 +95,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -1261,7 +1264,29 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
} finally {
readUnLock();
}
+ }
+
+ public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception {
+ PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingType());
+ readLock();
+ try {
+ long recordID = idGenerator.generateID();
+ bindingEncoding.setId(recordID);
+ bindingsJournal.appendAddRecordTransactional(tx, recordID, JournalRecordIds.ADDRESS_BINDING_RECORD, bindingEncoding);
+ } finally {
+ readUnLock();
+ }
+ }
+
+ @Override
+ public void deleteAddressBinding(long tx, final long addressBindingID) throws Exception {
+ readLock();
+ try {
+ bindingsJournal.appendDeleteRecordTransactional(tx, addressBindingID);
+ } finally {
+ readUnLock();
+ }
}
@Override
@@ -1347,7 +1372,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
@Override
public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
- final List<GroupingInfo> groupingInfos) throws Exception {
+ final List<GroupingInfo> groupingInfos,
+ final List<AddressBindingInfo> addressBindingInfos) throws Exception {
List<RecordInfo> records = new ArrayList<>();
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();
@@ -1364,12 +1390,15 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
byte rec = record.getUserRecordType();
if (rec == JournalRecordIds.QUEUE_BINDING_RECORD) {
- PersistentQueueBindingEncoding bindingEncoding = newBindingEncoding(id, buffer);
-
+ PersistentQueueBindingEncoding bindingEncoding = newQueueBindingEncoding(id, buffer);
queueBindingInfos.add(bindingEncoding);
mapBindings.put(bindingEncoding.getId(), bindingEncoding);
} else if (rec == JournalRecordIds.ID_COUNTER_RECORD) {
idGenerator.loadState(record.id, buffer);
+ } else if (rec == JournalRecordIds.ADDRESS_BINDING_RECORD) {
+ PersistentAddressBindingEncoding bindingEncoding = newAddressBindingEncoding(id, buffer);
+ ActiveMQServerLogger.LOGGER.info("=== Loading: " + bindingEncoding);
+ addressBindingInfos.add(bindingEncoding);
} else if (rec == JournalRecordIds.GROUP_RECORD) {
GroupingEncoding encoding = newGroupEncoding(id, buffer);
groupingInfos.add(encoding);
@@ -1849,7 +1878,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
* @param buffer
* @return
*/
- protected static PersistentQueueBindingEncoding newBindingEncoding(long id, ActiveMQBuffer buffer) {
+ protected static PersistentQueueBindingEncoding newQueueBindingEncoding(long id, ActiveMQBuffer buffer) {
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding();
bindingEncoding.decode(buffer);
@@ -1872,8 +1901,14 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
return statusEncoding;
}
+ protected static PersistentAddressBindingEncoding newAddressBindingEncoding(long id, ActiveMQBuffer buffer) {
+ PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding();
+ bindingEncoding.decode(buffer);
+ bindingEncoding.setId(id);
+ return bindingEncoding;
+ }
@Override
public boolean addToPage(PagingStore store,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
index 58723c6..a5c1fd7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
@@ -555,7 +555,7 @@ public final class DescribeJournal {
return AbstractJournalStorageManager.newQueueStatusEncoding(id, buffer);
case QUEUE_BINDING_RECORD:
- return AbstractJournalStorageManager.newBindingEncoding(id, buffer);
+ return AbstractJournalStorageManager.newQueueBindingEncoding(id, buffer);
case ID_COUNTER_RECORD:
EncodingSupport idReturn = new IDCounterEncoding();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
index 0169f38..cd1d526 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
@@ -83,4 +83,6 @@ public final class JournalRecordIds {
public static final byte PAGE_CURSOR_COMPLETE = 42;
public static final byte PAGE_CURSOR_PENDING_COUNTER = 43;
+
+ public static final byte ADDRESS_BINDING_RECORD = 44;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
index 9f47362..7ef7e4d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
@@ -29,10 +29,6 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
public SimpleString name;
- public boolean autoCreated;
-
- public SimpleString user;
-
public AddressInfo.RoutingType routingType;
public PersistentAddressBindingEncoding() {
@@ -43,22 +39,14 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
return "PersistentAddressBindingEncoding [id=" + id +
", name=" +
name +
- ", user=" +
- user +
- ", autoCreated=" +
- autoCreated +
", routingType=" +
routingType +
"]";
}
public PersistentAddressBindingEncoding(final SimpleString name,
- final SimpleString user,
- final boolean autoCreated,
final AddressInfo.RoutingType routingType) {
this.name = name;
- this.user = user;
- this.autoCreated = autoCreated;
this.routingType = routingType;
}
@@ -77,16 +65,6 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
}
@Override
- public SimpleString getUser() {
- return user;
- }
-
- @Override
- public boolean isAutoCreated() {
- return autoCreated;
- }
-
- @Override
public AddressInfo.RoutingType getRoutingType() {
return routingType;
}
@@ -94,42 +72,17 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
@Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
-
- String metadata = buffer.readNullableSimpleString().toString();
- if (metadata != null) {
- String[] elements = metadata.split(";");
- for (String element : elements) {
- String[] keyValuePair = element.split("=");
- if (keyValuePair.length == 2) {
- if (keyValuePair[0].equals("user")) {
- user = SimpleString.toSimpleString(keyValuePair[1]);
- }
- }
- }
- }
-
- autoCreated = buffer.readBoolean();
routingType = AddressInfo.RoutingType.getType(buffer.readByte());
}
@Override
public void encode(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(name);
- buffer.writeNullableSimpleString(createMetadata());
- buffer.writeBoolean(autoCreated);
buffer.writeByte(routingType.getType());
}
@Override
public int getEncodeSize() {
- return SimpleString.sizeofString(name) + DataConstants.SIZE_BOOLEAN +
- SimpleString.sizeofNullableString(createMetadata()) +
- DataConstants.SIZE_BYTE;
- }
-
- private SimpleString createMetadata() {
- StringBuilder metadata = new StringBuilder();
- metadata.append("user=").append(user).append(";");
- return SimpleString.toSimpleString(metadata.toString());
+ return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 3a2999e..404f248 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
@@ -55,6 +56,7 @@ import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -155,12 +157,21 @@ public class NullStorageManager implements StorageManager {
}
@Override
+ public void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception {
+ }
+
+ @Override
+ public void deleteAddressBinding(long tx, long addressBindingID) throws Exception {
+ }
+
+ @Override
public void commit(final long txID) throws Exception {
}
@Override
public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
- final List<GroupingInfo> groupingInfos) throws Exception {
+ final List<GroupingInfo> groupingInfos,
+ final List<AddressBindingInfo> addressBindingInfos) throws Exception {
return new JournalLoadInformation();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 375e678..cce81c5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
+import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
@@ -2137,7 +2138,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
List<GroupingInfo> groupingInfos = new ArrayList<>();
- journalInfo[0] = storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
+ List<AddressBindingInfo> addressBindingInfos = new ArrayList<>();
+
+ journalInfo[0] = storageManager.loadBindingJournal(queueBindingInfos, groupingInfos, addressBindingInfos);
recoverStoredConfigs();
@@ -2147,6 +2150,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
journalLoader.handleGroupingBindings(groupingInfos);
+ Map<Long, AddressBindingInfo> addressBindingInfosMap = new HashMap<>();
+
+ journalLoader.initAddresses(addressBindingInfosMap, addressBindingInfos);
+
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<>();
HashSet<Pair<Long, Long>> pendingLargeMessages = new HashSet<>();
@@ -2245,6 +2252,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build();
final Queue queue = queueFactory.createQueueWith(queueConfig);
+ boolean addressAlreadyExists = true;
+
+ if (postOffice.getAddressInfo(queue.getAddress()) == null) {
+ postOffice.addAddressInfo(new AddressInfo(queue.getAddress())
+ .setRoutingType(AddressInfo.RoutingType.MULTICAST));
+ addressAlreadyExists = false;
+ }
+
if (transientQueue) {
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
} else if (queue.isAutoCreated()) {
@@ -2255,6 +2270,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (queue.isDurable()) {
storageManager.addQueueBinding(txID, localQueueBinding);
+ if (!addressAlreadyExists) {
+ storageManager.addAddressBinding(txID, getAddressInfo(queue.getAddress()));
+ }
}
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 1449107..4e982c4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -36,24 +36,27 @@ public class AddressInfo {
return routingType;
}
- public void setRoutingType(RoutingType routingType) {
+ public AddressInfo setRoutingType(RoutingType routingType) {
this.routingType = routingType;
+ return this;
}
public boolean isDefaultDeleteOnNoConsumers() {
return defaultDeleteOnNoConsumers;
}
- public void setDefaultDeleteOnNoConsumers(boolean defaultDeleteOnNoConsumers) {
+ public AddressInfo setDefaultDeleteOnNoConsumers(boolean defaultDeleteOnNoConsumers) {
this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers;
+ return this;
}
public int getDefaultMaxConsumers() {
return defaultMaxConsumers;
}
- public void setDefaultMaxConsumers(int defaultMaxConsumers) {
+ public AddressInfo setDefaultMaxConsumers(int defaultMaxConsumers) {
this.defaultMaxConsumers = defaultMaxConsumers;
+ return this;
}
public SimpleString getName() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
index 6f36ff5..40cef50 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
@@ -37,6 +38,9 @@ public interface JournalLoader {
void initQueues(Map<Long, QueueBindingInfo> queueBindingInfosMap,
List<QueueBindingInfo> queueBindingInfos) throws Exception;
+ void initAddresses(Map<Long, AddressBindingInfo> addressBindingInfosMap,
+ List<AddressBindingInfo> addressBindingInfo) throws Exception;
+
void handleAddMessage(Map<Long, Map<Long, AddMessageRecord>> queueMap) throws Exception;
void handleNoMessageReferences(Map<Long, ServerMessage> messages);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 71c5b2b..4e89e8a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.QueueStatus;
@@ -166,6 +167,21 @@ public class PostOfficeJournalLoader implements JournalLoader {
}
@Override
+ public void initAddresses(Map<Long, AddressBindingInfo> addressBindingInfosMap,
+ List<AddressBindingInfo> addressBindingInfos) throws Exception {
+ for (AddressBindingInfo addressBindingInfo : addressBindingInfos) {
+ addressBindingInfosMap.put(addressBindingInfo.getId(), addressBindingInfo);
+
+ // TODO: figure out what else to set here
+ AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName())
+ .setRoutingType(addressBindingInfo.getRoutingType());
+
+ postOffice.addAddressInfo(addressInfo);
+ managementService.registerAddress(addressInfo.getName());
+ }
+ }
+
+ @Override
public void handleAddMessage(Map<Long, Map<Long, AddMessageRecord>> queueMap) throws Exception {
for (Map.Entry<Long, Map<Long, AddMessageRecord>> entry : queueMap.entrySet()) {
long queueID = entry.getKey();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 93c5c9d..97dc90d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
@@ -55,6 +56,7 @@ import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -529,8 +531,19 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
+ public void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception {
+
+ }
+
+ @Override
+ public void deleteAddressBinding(long tx, long addressBindingID) throws Exception {
+
+ }
+
+ @Override
public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
- List<GroupingInfo> groupingInfos) throws Exception {
+ List<GroupingInfo> groupingInfos,
+ List<AddressBindingInfo> addressBindingInfos) throws Exception {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java
new file mode 100644
index 0000000..f3a0beb
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <br>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <br>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.addressing;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AddressConfigTest extends ActiveMQTestBase {
+
+ protected ActiveMQServer server;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration configuration = createDefaultInVMConfig();
+ server = createServer(true, configuration);
+ server.start();
+ }
+
+ @Test
+ public void persistAddressConfigTest() throws Exception {
+ server.createQueue(SimpleString.toSimpleString("myAddress"), SimpleString.toSimpleString("myQueue"), null, true, false);
+ server.stop();
+ server.start();
+ AddressInfo addressInfo = server.getAddressInfo(SimpleString.toSimpleString("myAddress"));
+ assertNotNull(addressInfo);
+ assertEquals(AddressInfo.RoutingType.MULTICAST, addressInfo.getRoutingType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
index 7d515d8..90f7c5f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
+import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
@@ -76,7 +77,7 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
journal.start();
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
FakePostOffice postOffice = new FakePostOffice();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
index 49d3a12..2ee879f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
@@ -73,7 +74,7 @@ public class RestartSMTest extends ActiveMQTestBase {
List<QueueBindingInfo> queueBindingInfos = new ArrayList<>();
- journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>());
+ journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
journal.loadMessageJournal(postOffice, null, null, null, null, null, null, new FakeJournalLoader());
@@ -87,7 +88,7 @@ public class RestartSMTest extends ActiveMQTestBase {
queueBindingInfos = new ArrayList<>();
- journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>());
+ journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
journal.start();
} finally {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
index a104363..508f23b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
+import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -128,7 +129,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
journal.start();
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
journal.loadMessageJournal(new FakePostOffice(), null, null, null, null, null, null, new FakeJournalLoader());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
index 96fa35c..58c5c4f 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
@@ -95,7 +96,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
journal = new JournalStorageManager(configuration, factory, factory);
journal.start();
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<>();
@@ -114,7 +115,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
journal = new JournalStorageManager(configuration, factory, factory);
journal.start();
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
@@ -137,7 +138,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
journal = new JournalStorageManager(configuration, factory, factory);
journal.start();
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeJournalLoader.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeJournalLoader.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeJournalLoader.java
index 32ad718..547d669 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeJournalLoader.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeJournalLoader.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
@@ -49,6 +50,11 @@ public class FakeJournalLoader implements JournalLoader {
}
@Override
+ public void initAddresses(Map<Long, AddressBindingInfo> addressBindingInfosMap,
+ List<AddressBindingInfo> addressBindingInfo) throws Exception {
+ }
+
+ @Override
public void handleGroupingBindings(List<GroupingInfo> groupingInfos) {
}