You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/06/11 06:05:12 UTC

[GitHub] [accumulo] ctubbsii commented on a diff in pull request #2769: Modify snapshot synchronization

ctubbsii commented on code in PR #2769:
URL: https://github.com/apache/accumulo/pull/2769#discussion_r894981197


##########
server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java:
##########
@@ -48,11 +71,32 @@ public class ServerConfigurationFactory extends ServerConfiguration {
   private final SiteConfiguration siteConfig;
   private final Supplier<SystemConfiguration> systemConfig;
 
+  private final ScheduledFuture<?> refreshTaskFuture;
+
+  private final DeleteWatcher deleteWatcher =
+      new DeleteWatcher(tableConfigs, namespaceConfigs, tableParentConfigs);

Review Comment:
   Could simplify this by making the class non-static, then it could refer to it's parent class' fields, rather than passing references to some of its fields like this.



##########
server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java:
##########
@@ -48,11 +71,32 @@ public class ServerConfigurationFactory extends ServerConfiguration {
   private final SiteConfiguration siteConfig;
   private final Supplier<SystemConfiguration> systemConfig;
 
+  private final ScheduledFuture<?> refreshTaskFuture;
+
+  private final DeleteWatcher deleteWatcher =
+      new DeleteWatcher(tableConfigs, namespaceConfigs, tableParentConfigs);
+
   public ServerConfigurationFactory(ServerContext context, SiteConfiguration siteConfig) {
     this.context = context;
     this.siteConfig = siteConfig;
     systemConfig = Suppliers.memoize(
         () -> new SystemConfiguration(context, SystemPropKey.of(context), getSiteConfiguration()));
+
+    if (context.threadPools() != null) {
+      // simplify testing - only create refresh thread when operating in a context with thread pool
+      // initialized
+      ScheduledThreadPoolExecutor threadPool = context.threadPools()
+          .createScheduledExecutorService(1, this.getClass().getSimpleName(), false);
+      Runnable refreshTask = this::verifySnapshotVersions;

Review Comment:
   I'm not sure I understand the point of this scheduled task being done inside the ServerConfigurationFactory. This class is intended to be very lightweight... just keeping simple references to configurations. It shouldn't be responsible for the snapshots contained inside those configurations. That seems like a job for the ZooBasedConfiguration class.



##########
server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java:
##########
@@ -48,11 +71,32 @@ public class ServerConfigurationFactory extends ServerConfiguration {
   private final SiteConfiguration siteConfig;
   private final Supplier<SystemConfiguration> systemConfig;
 
+  private final ScheduledFuture<?> refreshTaskFuture;
+
+  private final DeleteWatcher deleteWatcher =
+      new DeleteWatcher(tableConfigs, namespaceConfigs, tableParentConfigs);
+
   public ServerConfigurationFactory(ServerContext context, SiteConfiguration siteConfig) {
     this.context = context;
     this.siteConfig = siteConfig;
     systemConfig = Suppliers.memoize(
         () -> new SystemConfiguration(context, SystemPropKey.of(context), getSiteConfiguration()));
+
+    if (context.threadPools() != null) {
+      // simplify testing - only create refresh thread when operating in a context with thread pool

Review Comment:
   I'm not sure I understand what this simplifies. This is always non-null. The only difference is which uncaught exception handler it uses.



##########
server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java:
##########
@@ -99,10 +144,139 @@ public NamespaceConfiguration getNamespaceConfigurationForTable(TableId tableId)
   @Override
   public NamespaceConfiguration getNamespaceConfiguration(NamespaceId namespaceId) {
     return namespaceConfigs.computeIfAbsent(namespaceId, key -> {
+      context.getPropStore().registerAsListener(NamespacePropKey.of(context, namespaceId),
+          deleteWatcher);
       var conf = new NamespaceConfiguration(context, namespaceId, getSystemConfiguration());
       ConfigCheckUtil.validate(conf);
       return conf;
     });
   }
 
+  /**
+   * Check that the stored version in ZooKeeper matches the version held in the local snapshot. When
+   * a mismatch is detected, a change event is sent to the prop store which will cause a re-load. If
+   * the Zookeeper node has been deleted, the local cache entries are removed.
+   * <p>
+   * This method is designed to be called as a scheduled task, so it does not propagate exceptions
+   * so the scheduled tasks will continue to run.
+   */
+  private void verifySnapshotVersions() {
+
+    int nsRefreshCount = 0;
+    int tblRefreshCount = 0;
+
+    long refreshStart = System.nanoTime();
+
+    ZooReader zooReader = context.getZooReader();
+
+    try {
+      refresh(systemConfig.get(), zooReader);
+    } catch (Throwable t) {
+      log.debug("Exception occurred during system config refresh", t.getCause());
+    }
+
+    try {
+      for (Map.Entry<NamespaceId,NamespaceConfiguration> entry : namespaceConfigs.entrySet()) {
+        if (!refresh(entry.getValue(), zooReader)) {
+          namespaceConfigs.remove(entry.getKey());
+        }
+        nsRefreshCount++;
+      }
+    } catch (Throwable t) {
+      log.debug(
+          "Exception occurred during namespace refresh - cycle may not have completed on this pass",
+          t.getCause());
+    }
+    try {
+      for (Map.Entry<TableId,TableConfiguration> entry : tableConfigs.entrySet()) {
+        if (!refresh(entry.getValue(), zooReader)) {
+          tableConfigs.remove(entry.getKey());
+          tableParentConfigs.remove(entry.getKey());
+        }
+        tblRefreshCount++;
+      }
+    } catch (Throwable t) {
+      log.debug(
+          "Exception occurred during table refresh - cycle may not have completed on this pass",
+          t.getCause());
+    }
+
+    log.debug(
+        "configuration snapshot refresh completed. Total runtime {} ms for local namespaces: {}, tables: {}",
+        MILLISECONDS.convert(System.nanoTime() - refreshStart, NANOSECONDS), nsRefreshCount,
+        tblRefreshCount);
+  }
+
+  private boolean refresh(ZooBasedConfiguration config, ZooReader zooReader) {
+    final PropStoreKey<?> key = config.getPropStoreKey();
+    try {
+      Stat stat = zooReader.getStatus(key.getPath());
+      log.trace("configuration snapshot refresh: stat returned: {} for {}", stat, key);
+      if (stat == null) {
+        return false;
+      }
+      if (config.getDataVersion() != stat.getVersion()) {
+        log.debug(
+            "configuration snapshot refresh - difference found. forcing configuration update for {}}",
+            key);
+        config.zkChangeEvent(key);
+      }
+      // add small jitter between calls.
+      int randDelay = ThreadLocalRandom.current().nextInt(0, 23);
+      Thread.sleep(randDelay);
+    } catch (KeeperException.NoNodeException ex) {
+      config.zkChangeEvent(key);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("Interrupted reading from ZooKeeper during snapshot refresh.",
+          ex);
+    } catch (KeeperException | IllegalStateException ex) {
+      log.debug("configuration snapshot refresh: Exception during refresh - key" + key, ex);
+    }
+    return true;
+  }
+
+  private static class DeleteWatcher implements PropChangeListener {
+
+    final Map<TableId,TableConfiguration> tableConfigs;
+    final Map<NamespaceId,NamespaceConfiguration> namespaceConfigs;
+    final Map<TableId,NamespaceConfiguration> tableParentConfigs;
+
+    DeleteWatcher(final Map<TableId,TableConfiguration> tableConfigs,
+        final Map<NamespaceId,NamespaceConfiguration> namespaceConfigs,
+        final Map<TableId,NamespaceConfiguration> tableParentConfigs) {
+      this.tableConfigs = tableConfigs;
+      this.namespaceConfigs = namespaceConfigs;
+      this.tableParentConfigs = tableParentConfigs;
+    }
+
+    @Override
+    public void zkChangeEvent(PropStoreKey<?> propStoreKey) {
+      // no-op. changes handled by prop store impl
+    }
+
+    @Override
+    public void cacheChangeEvent(PropStoreKey<?> propStoreKey) {
+      // no-op. changes handled by prop store impl
+    }
+
+    @Override
+    public void deleteEvent(PropStoreKey<?> propStoreKey) {
+      if (propStoreKey instanceof NamespacePropKey) {
+        log.trace("configuration snapshot refresh: Handle namespace delete for {}", propStoreKey);
+        namespaceConfigs.remove(((NamespacePropKey) propStoreKey).getId());
+        return;
+      }
+      if (propStoreKey instanceof TablePropKey) {
+        log.trace("configuration snapshot refresh: Handle table delete for {}", propStoreKey);
+        tableConfigs.remove(((TablePropKey) propStoreKey).getId());
+        tableConfigs.remove(((TablePropKey) propStoreKey).getId());

Review Comment:
   This is removed twice. I think the intent was:
   
   ```suggestion
           tableConfigs.remove(((TablePropKey) propStoreKey).getId());
           tableParentConfigs.remove(((TablePropKey) propStoreKey).getId());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org