You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/06/16 06:43:38 UTC

[GitHub] [accumulo] ctubbsii commented on a diff in pull request #2778: Add version check to ZooPropStore

ctubbsii commented on code in PR #2778:
URL: https://github.com/apache/accumulo/pull/2778#discussion_r898741809


##########
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java:
##########
@@ -391,4 +416,61 @@ public PropCache getCache() {
   public @Nullable VersionedProperties getWithoutCaching(PropStoreKey<?> propStoreKey) {
     return cache.getWithoutCaching(propStoreKey);
   }
+
+  /**
+   * Check that the stored version in ZooKeeper matches the version held in the local snapshot. When
+   * a mismatch is detected, a change event is sent to the prop store which will cause a re-load. If
+   * the Zookeeper node has been deleted, the local cache entries are removed.
+   * <p>
+   * This method is designed to be called as a scheduled task, so it does not propagate exceptions
+   * other than interrupted Exceptions so the scheduled tasks will continue to run.
+   */
+  @SuppressFBWarnings(value = "PREDICTABLE_RANDOM",
+      justification = "random number not used in secure context")
+  private void verifySnapshotVersions() {
+    long refreshStart = System.nanoTime();
+    int keyCount = 0;
+    int keyChangedCount = 0;
+
+    var cacheView = cache.asMap();
+    for (Map.Entry<PropStoreKey<?>,VersionedProperties> entry : cacheView.entrySet()) {
+      keyCount++;
+      var key = entry.getKey();
+      if (versionChanged(key, entry.getValue())) {
+        keyChangedCount++;
+        propStoreWatcher.signalZkChangeEvent(key);
+        log.debug("data version sync: difference found. forcing configuration update for {}}", key);
+      }
+      // add small jitter between calls.
+      int randDelay = ThreadLocalRandom.current().nextInt(0, MAX_JITTER_DELAY);
+      try {
+        Thread.sleep(randDelay);
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(ex);
+      }
+    }
+    log.debug("data version sync: Total runtime {} ms for {} entries, changes detected: {}",
+        MILLISECONDS.convert(System.nanoTime() - refreshStart, NANOSECONDS), keyCount,

Review Comment:
   Please avoid `DESIREDUNIT.convert(diffInUnit, CURRENTUNIT)`. Use `CURRENTUNIT.toDESIREDUNIT(diff)` instead, as in `NANOSECONDS.toMillis(diff)`. It's much more intuitive and far less ambiguous (you never have to ask which way the conversion is going), and if I remember correctly, we've previously swept through the code to get rid of these.



##########
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java:
##########
@@ -113,6 +115,17 @@ public void removeAll() {
     return cache.getIfPresent(propStoreKey);
   }
 
+  /**
+   * This returns a weakly consistent view of the entries in the cache - changes may or may not be
+   * reflected in the view and it is undefined which changes (including eviction) will be visible to
+   * the view.
+   *
+   * @return a map weakly consistent view of the underlying cache entries.
+   */
+  public Map<PropStoreKey<?>,VersionedProperties> asMap() {
+    return Collections.unmodifiableMap(cache.asMap());
+  }

Review Comment:
   Would `Map.copyOf(cache.asMap())` be better to provide an immutable snapshot of the current cache?



##########
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java:
##########
@@ -52,8 +54,8 @@ public class PropCacheCaffeineImpl implements PropCache {
   private PropCacheCaffeineImpl(final CacheLoader<PropStoreKey<?>,VersionedProperties> cacheLoader,
       final PropStoreMetrics metrics, final Ticker ticker, boolean runTasksInline) {
     this.metrics = metrics;
-    var builder = Caffeine.newBuilder().refreshAfterWrite(REFRESH_MIN, BASE_TIME_UNITS)
-        .expireAfterAccess(EXPIRE_MIN, BASE_TIME_UNITS).evictionListener(this::evictionNotifier);
+    var builder = Caffeine.newBuilder().expireAfterAccess(EXPIRE_MIN, BASE_TIME_UNITS)
+        .evictionListener(this::evictionNotifier);

Review Comment:
   What's the motivation for removing `refreshAfterWrite` here? I'm not sure I understand what this was doing before, never mind why this change might be better.



##########
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java:
##########
@@ -391,4 +416,61 @@ public PropCache getCache() {
   public @Nullable VersionedProperties getWithoutCaching(PropStoreKey<?> propStoreKey) {
     return cache.getWithoutCaching(propStoreKey);
   }
+
+  /**
+   * Check that the stored version in ZooKeeper matches the version held in the local snapshot. When
+   * a mismatch is detected, a change event is sent to the prop store which will cause a re-load. If
+   * the Zookeeper node has been deleted, the local cache entries are removed.
+   * <p>
+   * This method is designed to be called as a scheduled task, so it does not propagate exceptions
+   * other than interrupted Exceptions so the scheduled tasks will continue to run.
+   */
+  @SuppressFBWarnings(value = "PREDICTABLE_RANDOM",
+      justification = "random number not used in secure context")
+  private void verifySnapshotVersions() {
+    long refreshStart = System.nanoTime();
+    int keyCount = 0;
+    int keyChangedCount = 0;
+
+    var cacheView = cache.asMap();
+    for (Map.Entry<PropStoreKey<?>,VersionedProperties> entry : cacheView.entrySet()) {
+      keyCount++;
+      var key = entry.getKey();
+      if (versionChanged(key, entry.getValue())) {
+        keyChangedCount++;
+        propStoreWatcher.signalZkChangeEvent(key);
+        log.debug("data version sync: difference found. forcing configuration update for {}}", key);
+      }
+      // add small jitter between calls.
+      int randDelay = ThreadLocalRandom.current().nextInt(0, MAX_JITTER_DELAY);
+      try {
+        Thread.sleep(randDelay);
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(ex);
+      }
+    }
+    log.debug("data version sync: Total runtime {} ms for {} entries, changes detected: {}",

Review Comment:
   I'm going to assume timing information will be moved to the trace level, deleted, or this code will be wrapped by an opentelemetry trace scope, rather than leave these in at debug. This could get quite spammy in the debug logs if not.



##########
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java:
##########
@@ -391,4 +416,61 @@ public PropCache getCache() {
   public @Nullable VersionedProperties getWithoutCaching(PropStoreKey<?> propStoreKey) {
     return cache.getWithoutCaching(propStoreKey);
   }
+
+  /**
+   * Check that the stored version in ZooKeeper matches the version held in the local snapshot. When
+   * a mismatch is detected, a change event is sent to the prop store which will cause a re-load. If
+   * the Zookeeper node has been deleted, the local cache entries are removed.
+   * <p>
+   * This method is designed to be called as a scheduled task, so it does not propagate exceptions
+   * other than interrupted Exceptions so the scheduled tasks will continue to run.
+   */
+  @SuppressFBWarnings(value = "PREDICTABLE_RANDOM",
+      justification = "random number not used in secure context")
+  private void verifySnapshotVersions() {
+    long refreshStart = System.nanoTime();
+    int keyCount = 0;
+    int keyChangedCount = 0;
+
+    var cacheView = cache.asMap();
+    for (Map.Entry<PropStoreKey<?>,VersionedProperties> entry : cacheView.entrySet()) {
+      keyCount++;
+      var key = entry.getKey();
+      if (versionChanged(key, entry.getValue())) {
+        keyChangedCount++;
+        propStoreWatcher.signalZkChangeEvent(key);
+        log.debug("data version sync: difference found. forcing configuration update for {}}", key);
+      }
+      // add small jitter between calls.
+      int randDelay = ThreadLocalRandom.current().nextInt(0, MAX_JITTER_DELAY);
+      try {
+        Thread.sleep(randDelay);
+      } catch (InterruptedException ex) {

Review Comment:
   It's not clear why this is introducing a delay between each node that is being verified.



##########
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java:
##########
@@ -391,4 +416,61 @@ public PropCache getCache() {
   public @Nullable VersionedProperties getWithoutCaching(PropStoreKey<?> propStoreKey) {
     return cache.getWithoutCaching(propStoreKey);
   }
+
+  /**
+   * Check that the stored version in ZooKeeper matches the version held in the local snapshot. When
+   * a mismatch is detected, a change event is sent to the prop store which will cause a re-load. If
+   * the Zookeeper node has been deleted, the local cache entries are removed.
+   * <p>
+   * This method is designed to be called as a scheduled task, so it does not propagate exceptions
+   * other than interrupted Exceptions so the scheduled tasks will continue to run.
+   */
+  @SuppressFBWarnings(value = "PREDICTABLE_RANDOM",
+      justification = "random number not used in secure context")
+  private void verifySnapshotVersions() {
+    long refreshStart = System.nanoTime();
+    int keyCount = 0;
+    int keyChangedCount = 0;
+
+    var cacheView = cache.asMap();
+    for (Map.Entry<PropStoreKey<?>,VersionedProperties> entry : cacheView.entrySet()) {
+      keyCount++;
+      var key = entry.getKey();
+      if (versionChanged(key, entry.getValue())) {
+        keyChangedCount++;
+        propStoreWatcher.signalZkChangeEvent(key);
+        log.debug("data version sync: difference found. forcing configuration update for {}}", key);
+      }
+      // add small jitter between calls.
+      int randDelay = ThreadLocalRandom.current().nextInt(0, MAX_JITTER_DELAY);
+      try {
+        Thread.sleep(randDelay);
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(ex);
+      }
+    }
+    log.debug("data version sync: Total runtime {} ms for {} entries, changes detected: {}",
+        MILLISECONDS.convert(System.nanoTime() - refreshStart, NANOSECONDS), keyCount,
+        keyChangedCount);
+  }
+
+  private boolean versionChanged(PropStoreKey<?> key, VersionedProperties vProps) {
+    try {
+      Stat stat = zrw.getStatus(key.getPath());
+      log.trace("data version sync: stat returned: {} for {}", stat, key);
+      if (stat == null) {
+        return true;
+      }
+      if (vProps.getDataVersion() != stat.getVersion()) {
+        return true;
+      }
+    } catch (Exception ex) {

Review Comment:
   Instead of blanket catch clauses, it's probably better to be explicit about what you're catching, especially if you intentionally are catching RuntimeExceptions:
   
   ```suggestion
       } catch (KeeperException | InterruptedException | RuntimeException ex) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org