You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/07/27 13:42:22 UTC

[2/2] qpid-broker-j git commit: QPID-7781: QPID-7442: [Java Broker] On deletion ensure that the store remains open until the link registry is deleted. On virtualhost delete ensure that the virtualhost and durable children are removed from the configurati

QPID-7781: QPID-7442: [Java Broker] On deletion ensure that the store remains open until the link registry is deleted.
On virtualhost delete ensure that the virtualhost and durable children are removed from the configuration store.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e697c9ed
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e697c9ed
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e697c9ed

Branch: refs/heads/master
Commit: e697c9eded66686f906feadd969566a248a5d16a
Parents: ebec2d3
Author: Keith Wall <kw...@apache.org>
Authored: Wed Jul 26 16:45:53 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Thu Jul 27 13:44:48 2017 +0100

----------------------------------------------------------------------
 .../store/berkeleydb/BDBCacheSizeSetter.java    |  25 ++--
 .../VirtualHostStoreUpgraderAndRecoverer.java   |  22 +++-
 .../server/virtualhost/AbstractVirtualHost.java | 130 ++++++++++++-------
 3 files changed, 117 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e697c9ed/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBCacheSizeSetter.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBCacheSizeSetter.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBCacheSizeSetter.java
index 8d0e755..e7c16a0 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBCacheSizeSetter.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBCacheSizeSetter.java
@@ -99,21 +99,18 @@ public class BDBCacheSizeSetter extends AbstractConfigurationChangeListener
             }
         }
         int numberOfJEEnvironments = bdbEnvironmentContainers.size();
-        if (numberOfJEEnvironments == 0)
+        if (numberOfJEEnvironments > 0)
         {
-            // This should never happen out side of tests
-            LOGGER.warn("Cannot find any BDBEnvironmentContainer instance when there should be at least one.");
-            numberOfJEEnvironments = 1;
-        }
-        long cacheSize = totalCacheSize / numberOfJEEnvironments;
-        if (cacheSize < BDBVirtualHost.BDB_MIN_CACHE_SIZE)
-        {
-            cacheSize = BDBVirtualHost.BDB_MIN_CACHE_SIZE;
-        }
-        LOGGER.debug("Setting JE cache size: totalCacheSize: {}; numberOfJEEnvironment: {}; cacheSize: {}", totalCacheSize, numberOfJEEnvironments, cacheSize);
-        for (BDBEnvironmentContainer bdbEnvironmentContainer : bdbEnvironmentContainers)
-        {
-            bdbEnvironmentContainer.setBDBCacheSize(cacheSize);
+            long cacheSize = totalCacheSize / numberOfJEEnvironments;
+            if (cacheSize < BDBVirtualHost.BDB_MIN_CACHE_SIZE)
+            {
+                cacheSize = BDBVirtualHost.BDB_MIN_CACHE_SIZE;
+            }
+            LOGGER.debug("Setting JE cache size: totalCacheSize: {}; numberOfJEEnvironment: {}; cacheSize: {}", totalCacheSize, numberOfJEEnvironments, cacheSize);
+            for (BDBEnvironmentContainer bdbEnvironmentContainer : bdbEnvironmentContainers)
+            {
+                bdbEnvironmentContainer.setBDBCacheSize(cacheSize);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e697c9ed/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
index 84071bf..1572600 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
@@ -1039,9 +1039,9 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
                 if(child instanceof VirtualHost)
                 {
                     child.removeChangeListener(configChangeListener);
+                    removeVirtualHostConfiguration((VirtualHost<?>) child, durableConfigurationStore);
                 }
             }
-
         });
         if(isNew)
         {
@@ -1052,4 +1052,24 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
         }
     }
 
+    private void removeVirtualHostConfiguration(final VirtualHost<?> virtualHost,
+                                                final DurableConfigurationStore durableConfigurationStore)
+    {
+        Set<ConfiguredObjectRecord> records = new HashSet<>();
+        applyRecursively(virtualHost, new RecursiveAction<ConfiguredObject<?>>()
+        {
+            @Override
+            public boolean applyToChildren(final ConfiguredObject<?> object)
+            {
+                return object.isDurable();
+            }
+
+            @Override
+            public void performAction(final ConfiguredObject<?> object)
+            {
+                records.add(object.asObjectRecord());
+            }
+        });
+        durableConfigurationStore.remove(records.toArray(new ConfiguredObjectRecord[records.size()]));
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e697c9ed/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index da32b3d..4db5344 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -156,9 +156,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     private final AccessControlContext _housekeepingJobContext;
     private final AccessControlContext _fileSystemSpaceCheckerJobContext;
     private final AtomicBoolean _acceptsConnections = new AtomicBoolean(false);
-    private TaskExecutor _preferenceTaskExecutor;
+    private volatile TaskExecutor _preferenceTaskExecutor;
+    private volatile boolean _deleteRequested;
 
-    private static enum BlockingType { STORE, FILESYSTEM };
+    private enum BlockingType { STORE, FILESYSTEM };
 
     private static final String USE_ASYNC_RECOVERY = "use_async_message_store_recovery";
 
@@ -167,7 +168,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
 
     private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
 
-    private ScheduledThreadPoolExecutor _houseKeepingTaskExecutor;
+    private volatile ScheduledThreadPoolExecutor _houseKeepingTaskExecutor;
 
     private final Broker<?> _broker;
 
@@ -1151,7 +1152,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         }
     }
 
-    protected void shutdownHouseKeeping()
+    private void shutdownHouseKeeping()
     {
         if(_houseKeepingTaskExecutor != null)
         {
@@ -1169,6 +1170,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
                 _logger.warn("Interrupted during Housekeeping shutdown:", e);
                 Thread.currentThread().interrupt();
             }
+            finally
+            {
+                _houseKeepingTaskExecutor = null;
+            }
         }
     }
 
@@ -1511,7 +1516,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     protected ListenableFuture<Void> beforeClose()
     {
         setState(State.UNAVAILABLE);
-        _virtualHostLoggersToClose = new ArrayList(getChildren(VirtualHostLogger.class));
+        _virtualHostLoggersToClose = new ArrayList<>(getChildren(VirtualHostLogger.class));
         //Stop Connections
         return closeConnections();
     }
@@ -1521,9 +1526,22 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     {
         _dtxRegistry.close();
         shutdownHouseKeeping();
+
+        if (_deleteRequested)
+        {
+            deleteLinkRegistry();
+        }
+
         closeMessageStore();
         stopPreferenceTaskExecutor();
         closePreferenceStore();
+
+        if (_deleteRequested)
+        {
+            deleteMessageStore();
+            deletePreferenceStore();
+        }
+
         closeNetworkConnectionScheduler();
         _eventLogger.message(VirtualHostMessages.CLOSED(getName()));
 
@@ -1532,11 +1550,11 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     }
 
 
-    public ListenableFuture<Void> closeConnections()
+    private ListenableFuture<Void> closeConnections()
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Closing connection registry :" + _connections.size() + " connections.");
+            _logger.debug("Closing connection registry : {} connection(s).", _connections.size());
         }
         _acceptsConnections.set(false);
         for(AMQPConnection<?> conn : _connections)
@@ -1719,7 +1737,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         }
     }
 
-    protected void reportIfError(State state)
+    private void reportIfError(State state)
     {
         if (state == State.ERRORED)
         {
@@ -2244,6 +2262,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         if (_preferenceTaskExecutor != null)
         {
             _preferenceTaskExecutor.stop();
+            _preferenceTaskExecutor = null;
         }
     }
 
@@ -2263,47 +2282,68 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         }
     }
 
+    @SuppressWarnings("ignore")
     @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.DELETED )
     private ListenableFuture<Void> doDelete()
     {
+        _deleteRequested = true;
+
         return doAfterAlways(closeAsync(),
-                new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        if (_linkRegistry != null)
-                        {
-                            _linkRegistry.delete();
-                        }
-                        MessageStore ms = getMessageStore();
-                        if (ms != null)
-                        {
-                            try
-                            {
-                                ms.onDelete(AbstractVirtualHost.this);
-                            }
-                            catch (Exception e)
-                            {
-                                _logger.warn("Exception occurred on message store deletion", e);
-                            }
-                        }
-                        PreferenceStore ps = _preferenceStore;
-                        if (ps != null)
-                        {
-                            try
-                            {
-                                ps.onDelete();
-                            }
-                            catch (Exception e)
-                            {
-                                _logger.warn("Exception occurred on preference store deletion", e);
-                            }
-                        }
-                        deleted();
-                        setState(State.DELETED);
-                    }
-                });
+                             () ->
+                             {
+                                 setState(State.DELETED);
+                                 deleted();
+                             });
+    }
+
+    private void deleteLinkRegistry()
+    {
+        if (_linkRegistry != null)
+        {
+            _linkRegistry.delete();
+            _linkRegistry = null;
+        }
+    }
+
+    private void deletePreferenceStore()
+    {
+        final PreferenceStore ps = _preferenceStore;
+        if (ps != null)
+        {
+            try
+            {
+                ps.onDelete();
+            }
+            catch (Exception e)
+            {
+                _logger.warn("Exception occurred on preference store deletion", e);
+            }
+            finally
+            {
+                _preferenceStore = null;
+
+            }
+        }
+    }
+
+    private void deleteMessageStore()
+    {
+        MessageStore ms = _messageStore;
+        if (ms != null)
+        {
+            try
+            {
+                ms.onDelete(AbstractVirtualHost.this);
+            }
+            catch (Exception e)
+            {
+                _logger.warn( "Exception occurred on message store deletion", e);
+            }
+            finally
+            {
+                _messageStore = null;
+            }
+        }
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org