You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ed...@apache.org on 2022/06/15 12:48:34 UTC

[accumulo] branch main updated: Add delete event watcher to ServerConfigurationFactory (#2773)

This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 0bc41827a5 Add delete event watcher to ServerConfigurationFactory (#2773)
0bc41827a5 is described below

commit 0bc41827a59fcff585fa8b349f2dde2746749c5c
Author: EdColeman <de...@etcoleman.com>
AuthorDate: Wed Jun 15 12:48:30 2022 +0000

    Add delete event watcher to ServerConfigurationFactory (#2773)
    
    This partially satisfies PR #2769.
---
 .../server/conf/ServerConfigurationFactory.java    | 43 +++++++++++++++++
 .../accumulo/test/conf/PropStoreConfigIT.java      | 56 ++++++++++++++++++++++
 2 files changed, 99 insertions(+)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
index 6341b45e72..3e4c935878 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
@@ -30,7 +30,13 @@ import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.data.NamespaceId;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.store.NamespacePropKey;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStoreKey;
 import org.apache.accumulo.server.conf.store.SystemPropKey;
+import org.apache.accumulo.server.conf.store.TablePropKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Suppliers;
 
@@ -38,6 +44,7 @@ import com.google.common.base.Suppliers;
  * A factor for configurations used by a server process. Instance of this class are thread-safe.
  */
 public class ServerConfigurationFactory extends ServerConfiguration {
+  private final static Logger log = LoggerFactory.getLogger(ServerConfigurationFactory.class);
 
   private final Map<TableId,NamespaceConfiguration> tableParentConfigs = new ConcurrentHashMap<>();
   private final Map<TableId,TableConfiguration> tableConfigs = new ConcurrentHashMap<>();
@@ -48,6 +55,8 @@ public class ServerConfigurationFactory extends ServerConfiguration {
   private final SiteConfiguration siteConfig;
   private final Supplier<SystemConfiguration> systemConfig;
 
+  private final DeleteWatcher deleteWatcher = new DeleteWatcher();
+
   public ServerConfigurationFactory(ServerContext context, SiteConfiguration siteConfig) {
     this.context = context;
     this.siteConfig = siteConfig;
@@ -76,6 +85,7 @@ public class ServerConfigurationFactory extends ServerConfiguration {
   public TableConfiguration getTableConfiguration(TableId tableId) {
     return tableConfigs.computeIfAbsent(tableId, key -> {
       if (context.tableNodeExists(tableId)) {
+        context.getPropStore().registerAsListener(TablePropKey.of(context, tableId), deleteWatcher);
         var conf =
             new TableConfiguration(context, tableId, getNamespaceConfigurationForTable(tableId));
         ConfigCheckUtil.validate(conf);
@@ -99,10 +109,43 @@ public class ServerConfigurationFactory extends ServerConfiguration {
   @Override
   public NamespaceConfiguration getNamespaceConfiguration(NamespaceId namespaceId) {
     return namespaceConfigs.computeIfAbsent(namespaceId, key -> {
+      context.getPropStore().registerAsListener(NamespacePropKey.of(context, namespaceId),
+          deleteWatcher);
       var conf = new NamespaceConfiguration(context, namespaceId, getSystemConfiguration());
       ConfigCheckUtil.validate(conf);
       return conf;
     });
   }
 
+  private class DeleteWatcher implements PropChangeListener {
+
+    @Override
+    public void zkChangeEvent(PropStoreKey<?> propStoreKey) {
+      // no-op. changes handled by prop store impl
+    }
+
+    @Override
+    public void cacheChangeEvent(PropStoreKey<?> propStoreKey) {
+      // no-op. changes handled by prop store impl
+    }
+
+    @Override
+    public void deleteEvent(PropStoreKey<?> propStoreKey) {
+      if (propStoreKey instanceof NamespacePropKey) {
+        log.trace("configuration snapshot refresh: Handle namespace delete for {}", propStoreKey);
+        namespaceConfigs.remove(((NamespacePropKey) propStoreKey).getId());
+        return;
+      }
+      if (propStoreKey instanceof TablePropKey) {
+        log.trace("configuration snapshot refresh: Handle table delete for {}", propStoreKey);
+        tableConfigs.remove(((TablePropKey) propStoreKey).getId());
+        tableParentConfigs.remove(((TablePropKey) propStoreKey).getId());
+      }
+    }
+
+    @Override
+    public void connectionEvent() {
+      // no-op. changes handled by prop store impl
+    }
+  }
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/conf/PropStoreConfigIT.java b/test/src/main/java/org/apache/accumulo/test/conf/PropStoreConfigIT.java
index 5eb7c25cdd..f386c6d152 100644
--- a/test/src/main/java/org/apache/accumulo/test/conf/PropStoreConfigIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/conf/PropStoreConfigIT.java
@@ -23,6 +23,8 @@ import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
 import static org.apache.accumulo.harness.AccumuloITBase.SUNNY_DAY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.Map;
@@ -75,6 +77,60 @@ public class PropStoreConfigIT extends AccumuloClusterHarness {
     }
   }
 
+  @Test
+  public void deletePropsTest() throws Exception {
+    String[] names = getUniqueNames(2);
+    String namespace = names[0];
+    String table = namespace + "." + names[1];
+
+    try (var client = Accumulo.newClient().from(getClientProps()).build()) {
+
+      client.namespaceOperations().create(namespace);
+      client.tableOperations().create(table);
+
+      log.info("Tables: {}", client.tableOperations().list());
+
+      client.instanceOperations().setProperty(Property.TABLE_BLOOM_ENABLED.getKey(), "true");
+      client.tableOperations().setProperty(table, Property.TABLE_BLOOM_ENABLED.getKey(), "true");
+
+      Thread.sleep(SECONDS.toMillis(1L));
+
+      var props = client.tableOperations().getProperties(table);
+      log.info("Props: {}", props);
+      for (Map.Entry<String,String> e : props) {
+        if (e.getKey().contains("table.bloom.enabled")) {
+          log.info("after bloom property: {}={}", e.getKey(), e.getValue());
+          assertEquals("true", e.getValue());
+        }
+      }
+
+      var tableIdMap = client.tableOperations().tableIdMap();
+      var nsIdMap = client.namespaceOperations().namespaceIdMap();
+
+      ServerContext context = getServerContext();
+
+      NamespaceId nid = NamespaceId.of(nsIdMap.get(namespace));
+      TableId tid = TableId.of(tableIdMap.get(table));
+
+      // check zk nodes exist
+      assertTrue(context.getPropStore().exists(NamespacePropKey.of(context, nid)));
+      assertTrue(context.getPropStore().exists(TablePropKey.of(context, tid)));
+      // check ServerConfigurationFactory
+      assertNotNull(context.getNamespaceConfiguration(nid));
+      assertNotNull(context.getTableConfiguration(tid));
+
+      client.tableOperations().delete(table);
+      client.namespaceOperations().delete(namespace);
+      Thread.sleep(100);
+
+      // check zk nodes deleted
+      assertFalse(context.getPropStore().exists(NamespacePropKey.of(context, nid)));
+      assertFalse(context.getPropStore().exists(TablePropKey.of(context, tid)));
+      // check ServerConfigurationFactory deleted - should return null
+      assertNull(context.getTableConfiguration(tid));
+    }
+  }
+
   /**
    * Validate that property nodes have an ACL set to restrict world access.
    */