You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/07/27 13:42:22 UTC
[2/2] qpid-broker-j git commit: QPID-7781: QPID-7442: [Java Broker]
On deletion ensure that the store remains open until the link registry is
deleted. On virtualhost delete ensure that the virtualhost and durable
children are removed from the configurati
QPID-7781: QPID-7442: [Java Broker] On deletion ensure that the store remains open until the link registry is deleted.
On virtualhost delete ensure that the virtualhost and durable children are removed from the configuration store.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e697c9ed
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e697c9ed
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e697c9ed
Branch: refs/heads/master
Commit: e697c9eded66686f906feadd969566a248a5d16a
Parents: ebec2d3
Author: Keith Wall <kw...@apache.org>
Authored: Wed Jul 26 16:45:53 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Thu Jul 27 13:44:48 2017 +0100
----------------------------------------------------------------------
.../store/berkeleydb/BDBCacheSizeSetter.java | 25 ++--
.../VirtualHostStoreUpgraderAndRecoverer.java | 22 +++-
.../server/virtualhost/AbstractVirtualHost.java | 130 ++++++++++++-------
3 files changed, 117 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e697c9ed/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBCacheSizeSetter.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBCacheSizeSetter.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBCacheSizeSetter.java
index 8d0e755..e7c16a0 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBCacheSizeSetter.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBCacheSizeSetter.java
@@ -99,21 +99,18 @@ public class BDBCacheSizeSetter extends AbstractConfigurationChangeListener
}
}
int numberOfJEEnvironments = bdbEnvironmentContainers.size();
- if (numberOfJEEnvironments == 0)
+ if (numberOfJEEnvironments > 0)
{
- // This should never happen out side of tests
- LOGGER.warn("Cannot find any BDBEnvironmentContainer instance when there should be at least one.");
- numberOfJEEnvironments = 1;
- }
- long cacheSize = totalCacheSize / numberOfJEEnvironments;
- if (cacheSize < BDBVirtualHost.BDB_MIN_CACHE_SIZE)
- {
- cacheSize = BDBVirtualHost.BDB_MIN_CACHE_SIZE;
- }
- LOGGER.debug("Setting JE cache size: totalCacheSize: {}; numberOfJEEnvironment: {}; cacheSize: {}", totalCacheSize, numberOfJEEnvironments, cacheSize);
- for (BDBEnvironmentContainer bdbEnvironmentContainer : bdbEnvironmentContainers)
- {
- bdbEnvironmentContainer.setBDBCacheSize(cacheSize);
+ long cacheSize = totalCacheSize / numberOfJEEnvironments;
+ if (cacheSize < BDBVirtualHost.BDB_MIN_CACHE_SIZE)
+ {
+ cacheSize = BDBVirtualHost.BDB_MIN_CACHE_SIZE;
+ }
+ LOGGER.debug("Setting JE cache size: totalCacheSize: {}; numberOfJEEnvironment: {}; cacheSize: {}", totalCacheSize, numberOfJEEnvironments, cacheSize);
+ for (BDBEnvironmentContainer bdbEnvironmentContainer : bdbEnvironmentContainers)
+ {
+ bdbEnvironmentContainer.setBDBCacheSize(cacheSize);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e697c9ed/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
index 84071bf..1572600 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
@@ -1039,9 +1039,9 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
if(child instanceof VirtualHost)
{
child.removeChangeListener(configChangeListener);
+ removeVirtualHostConfiguration((VirtualHost<?>) child, durableConfigurationStore);
}
}
-
});
if(isNew)
{
@@ -1052,4 +1052,24 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
}
}
+ private void removeVirtualHostConfiguration(final VirtualHost<?> virtualHost,
+ final DurableConfigurationStore durableConfigurationStore)
+ {
+ Set<ConfiguredObjectRecord> records = new HashSet<>();
+ applyRecursively(virtualHost, new RecursiveAction<ConfiguredObject<?>>()
+ {
+ @Override
+ public boolean applyToChildren(final ConfiguredObject<?> object)
+ {
+ return object.isDurable();
+ }
+
+ @Override
+ public void performAction(final ConfiguredObject<?> object)
+ {
+ records.add(object.asObjectRecord());
+ }
+ });
+ durableConfigurationStore.remove(records.toArray(new ConfiguredObjectRecord[records.size()]));
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e697c9ed/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index da32b3d..4db5344 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -156,9 +156,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private final AccessControlContext _housekeepingJobContext;
private final AccessControlContext _fileSystemSpaceCheckerJobContext;
private final AtomicBoolean _acceptsConnections = new AtomicBoolean(false);
- private TaskExecutor _preferenceTaskExecutor;
+ private volatile TaskExecutor _preferenceTaskExecutor;
+ private volatile boolean _deleteRequested;
- private static enum BlockingType { STORE, FILESYSTEM };
+ private enum BlockingType { STORE, FILESYSTEM };
private static final String USE_ASYNC_RECOVERY = "use_async_message_store_recovery";
@@ -167,7 +168,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
- private ScheduledThreadPoolExecutor _houseKeepingTaskExecutor;
+ private volatile ScheduledThreadPoolExecutor _houseKeepingTaskExecutor;
private final Broker<?> _broker;
@@ -1151,7 +1152,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
- protected void shutdownHouseKeeping()
+ private void shutdownHouseKeeping()
{
if(_houseKeepingTaskExecutor != null)
{
@@ -1169,6 +1170,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_logger.warn("Interrupted during Housekeeping shutdown:", e);
Thread.currentThread().interrupt();
}
+ finally
+ {
+ _houseKeepingTaskExecutor = null;
+ }
}
}
@@ -1511,7 +1516,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
protected ListenableFuture<Void> beforeClose()
{
setState(State.UNAVAILABLE);
- _virtualHostLoggersToClose = new ArrayList(getChildren(VirtualHostLogger.class));
+ _virtualHostLoggersToClose = new ArrayList<>(getChildren(VirtualHostLogger.class));
//Stop Connections
return closeConnections();
}
@@ -1521,9 +1526,22 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
_dtxRegistry.close();
shutdownHouseKeeping();
+
+ if (_deleteRequested)
+ {
+ deleteLinkRegistry();
+ }
+
closeMessageStore();
stopPreferenceTaskExecutor();
closePreferenceStore();
+
+ if (_deleteRequested)
+ {
+ deleteMessageStore();
+ deletePreferenceStore();
+ }
+
closeNetworkConnectionScheduler();
_eventLogger.message(VirtualHostMessages.CLOSED(getName()));
@@ -1532,11 +1550,11 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
- public ListenableFuture<Void> closeConnections()
+ private ListenableFuture<Void> closeConnections()
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Closing connection registry :" + _connections.size() + " connections.");
+ _logger.debug("Closing connection registry : {} connection(s).", _connections.size());
}
_acceptsConnections.set(false);
for(AMQPConnection<?> conn : _connections)
@@ -1719,7 +1737,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
- protected void reportIfError(State state)
+ private void reportIfError(State state)
{
if (state == State.ERRORED)
{
@@ -2244,6 +2262,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
if (_preferenceTaskExecutor != null)
{
_preferenceTaskExecutor.stop();
+ _preferenceTaskExecutor = null;
}
}
@@ -2263,47 +2282,68 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
+ @SuppressWarnings("ignore")
@StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.DELETED )
private ListenableFuture<Void> doDelete()
{
+ _deleteRequested = true;
+
return doAfterAlways(closeAsync(),
- new Runnable()
- {
- @Override
- public void run()
- {
- if (_linkRegistry != null)
- {
- _linkRegistry.delete();
- }
- MessageStore ms = getMessageStore();
- if (ms != null)
- {
- try
- {
- ms.onDelete(AbstractVirtualHost.this);
- }
- catch (Exception e)
- {
- _logger.warn("Exception occurred on message store deletion", e);
- }
- }
- PreferenceStore ps = _preferenceStore;
- if (ps != null)
- {
- try
- {
- ps.onDelete();
- }
- catch (Exception e)
- {
- _logger.warn("Exception occurred on preference store deletion", e);
- }
- }
- deleted();
- setState(State.DELETED);
- }
- });
+ () ->
+ {
+ setState(State.DELETED);
+ deleted();
+ });
+ }
+
+ private void deleteLinkRegistry()
+ {
+ if (_linkRegistry != null)
+ {
+ _linkRegistry.delete();
+ _linkRegistry = null;
+ }
+ }
+
+ private void deletePreferenceStore()
+ {
+ final PreferenceStore ps = _preferenceStore;
+ if (ps != null)
+ {
+ try
+ {
+ ps.onDelete();
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Exception occurred on preference store deletion", e);
+ }
+ finally
+ {
+ _preferenceStore = null;
+
+ }
+ }
+ }
+
+ private void deleteMessageStore()
+ {
+ MessageStore ms = _messageStore;
+ if (ms != null)
+ {
+ try
+ {
+ ms.onDelete(AbstractVirtualHost.this);
+ }
+ catch (Exception e)
+ {
+ _logger.warn( "Exception occurred on message store deletion", e);
+ }
+ finally
+ {
+ _messageStore = null;
+ }
+ }
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org