You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ed...@apache.org on 2022/06/15 12:48:34 UTC
[accumulo] branch main updated: Add delete event watcher to ServerConfigurationFactory (#2773)
This is an automated email from the ASF dual-hosted git repository.
edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 0bc41827a5 Add delete event watcher to ServerConfigurationFactory (#2773)
0bc41827a5 is described below
commit 0bc41827a59fcff585fa8b349f2dde2746749c5c
Author: EdColeman <de...@etcoleman.com>
AuthorDate: Wed Jun 15 12:48:30 2022 +0000
Add delete event watcher to ServerConfigurationFactory (#2773)
This partially satisfies PR #2769.
---
.../server/conf/ServerConfigurationFactory.java | 43 +++++++++++++++++
.../accumulo/test/conf/PropStoreConfigIT.java | 56 ++++++++++++++++++++++
2 files changed, 99 insertions(+)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
index 6341b45e72..3e4c935878 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
@@ -30,7 +30,13 @@ import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.store.NamespacePropKey;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStoreKey;
import org.apache.accumulo.server.conf.store.SystemPropKey;
+import org.apache.accumulo.server.conf.store.TablePropKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Suppliers;
@@ -38,6 +44,7 @@ import com.google.common.base.Suppliers;
* A factor for configurations used by a server process. Instance of this class are thread-safe.
*/
public class ServerConfigurationFactory extends ServerConfiguration {
+ private final static Logger log = LoggerFactory.getLogger(ServerConfigurationFactory.class);
private final Map<TableId,NamespaceConfiguration> tableParentConfigs = new ConcurrentHashMap<>();
private final Map<TableId,TableConfiguration> tableConfigs = new ConcurrentHashMap<>();
@@ -48,6 +55,8 @@ public class ServerConfigurationFactory extends ServerConfiguration {
private final SiteConfiguration siteConfig;
private final Supplier<SystemConfiguration> systemConfig;
+ private final DeleteWatcher deleteWatcher = new DeleteWatcher();
+
public ServerConfigurationFactory(ServerContext context, SiteConfiguration siteConfig) {
this.context = context;
this.siteConfig = siteConfig;
@@ -76,6 +85,7 @@ public class ServerConfigurationFactory extends ServerConfiguration {
public TableConfiguration getTableConfiguration(TableId tableId) {
return tableConfigs.computeIfAbsent(tableId, key -> {
if (context.tableNodeExists(tableId)) {
+ context.getPropStore().registerAsListener(TablePropKey.of(context, tableId), deleteWatcher);
var conf =
new TableConfiguration(context, tableId, getNamespaceConfigurationForTable(tableId));
ConfigCheckUtil.validate(conf);
@@ -99,10 +109,43 @@ public class ServerConfigurationFactory extends ServerConfiguration {
@Override
public NamespaceConfiguration getNamespaceConfiguration(NamespaceId namespaceId) {
return namespaceConfigs.computeIfAbsent(namespaceId, key -> {
+ context.getPropStore().registerAsListener(NamespacePropKey.of(context, namespaceId),
+ deleteWatcher);
var conf = new NamespaceConfiguration(context, namespaceId, getSystemConfiguration());
ConfigCheckUtil.validate(conf);
return conf;
});
}
+ private class DeleteWatcher implements PropChangeListener {
+
+ @Override
+ public void zkChangeEvent(PropStoreKey<?> propStoreKey) {
+ // no-op. changes handled by prop store impl
+ }
+
+ @Override
+ public void cacheChangeEvent(PropStoreKey<?> propStoreKey) {
+ // no-op. changes handled by prop store impl
+ }
+
+ @Override
+ public void deleteEvent(PropStoreKey<?> propStoreKey) {
+ if (propStoreKey instanceof NamespacePropKey) {
+ log.trace("configuration snapshot refresh: Handle namespace delete for {}", propStoreKey);
+ namespaceConfigs.remove(((NamespacePropKey) propStoreKey).getId());
+ return;
+ }
+ if (propStoreKey instanceof TablePropKey) {
+ log.trace("configuration snapshot refresh: Handle table delete for {}", propStoreKey);
+ tableConfigs.remove(((TablePropKey) propStoreKey).getId());
+ tableParentConfigs.remove(((TablePropKey) propStoreKey).getId());
+ }
+ }
+
+ @Override
+ public void connectionEvent() {
+ // no-op. changes handled by prop store impl
+ }
+ }
}
diff --git a/test/src/main/java/org/apache/accumulo/test/conf/PropStoreConfigIT.java b/test/src/main/java/org/apache/accumulo/test/conf/PropStoreConfigIT.java
index 5eb7c25cdd..f386c6d152 100644
--- a/test/src/main/java/org/apache/accumulo/test/conf/PropStoreConfigIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/conf/PropStoreConfigIT.java
@@ -23,6 +23,8 @@ import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
import static org.apache.accumulo.harness.AccumuloITBase.SUNNY_DAY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Map;
@@ -75,6 +77,60 @@ public class PropStoreConfigIT extends AccumuloClusterHarness {
}
}
+ @Test
+ public void deletePropsTest() throws Exception {
+ String[] names = getUniqueNames(2);
+ String namespace = names[0];
+ String table = namespace + "." + names[1];
+
+ try (var client = Accumulo.newClient().from(getClientProps()).build()) {
+
+ client.namespaceOperations().create(namespace);
+ client.tableOperations().create(table);
+
+ log.info("Tables: {}", client.tableOperations().list());
+
+ client.instanceOperations().setProperty(Property.TABLE_BLOOM_ENABLED.getKey(), "true");
+ client.tableOperations().setProperty(table, Property.TABLE_BLOOM_ENABLED.getKey(), "true");
+
+ Thread.sleep(SECONDS.toMillis(1L));
+
+ var props = client.tableOperations().getProperties(table);
+ log.info("Props: {}", props);
+ for (Map.Entry<String,String> e : props) {
+ if (e.getKey().contains("table.bloom.enabled")) {
+ log.info("after bloom property: {}={}", e.getKey(), e.getValue());
+ assertEquals("true", e.getValue());
+ }
+ }
+
+ var tableIdMap = client.tableOperations().tableIdMap();
+ var nsIdMap = client.namespaceOperations().namespaceIdMap();
+
+ ServerContext context = getServerContext();
+
+ NamespaceId nid = NamespaceId.of(nsIdMap.get(namespace));
+ TableId tid = TableId.of(tableIdMap.get(table));
+
+ // check zk nodes exist
+ assertTrue(context.getPropStore().exists(NamespacePropKey.of(context, nid)));
+ assertTrue(context.getPropStore().exists(TablePropKey.of(context, tid)));
+ // check ServerConfigurationFactory
+ assertNotNull(context.getNamespaceConfiguration(nid));
+ assertNotNull(context.getTableConfiguration(tid));
+
+ client.tableOperations().delete(table);
+ client.namespaceOperations().delete(namespace);
+ Thread.sleep(100);
+
+ // check zk nodes deleted
+ assertFalse(context.getPropStore().exists(NamespacePropKey.of(context, nid)));
+ assertFalse(context.getPropStore().exists(TablePropKey.of(context, tid)));
+ // check ServerConfigurationFactory deleted - should return null
+ assertNull(context.getTableConfiguration(tid));
+ }
+ }
+
/**
* Validate that property nodes have an ACL set to restrict world access.
*/