You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2022/03/24 14:05:50 UTC

[accumulo] branch main updated: Add trace and debug log to consistency check (#2583)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new dd81d60  Add trace and debug log to consistency check (#2583)
dd81d60 is described below

commit dd81d608bff88835676e756ffcb2a2f6b69c9be8
Author: Mike Miller <mm...@apache.org>
AuthorDate: Thu Mar 24 10:05:44 2022 -0400

    Add trace and debug log to consistency check (#2583)
    
    * Closes #2577
    * Add trace span and time measurement around consistency check
    so we get an idea of how long metadata scans are taking
    * Create new property tserver.health.check.interval to make it configurable
    * Create new method watchCriticalFixedDelay() in ThreadPools
    
    Co-authored-by: Keith Turner <kt...@apache.org>
---
 .../org/apache/accumulo/core/conf/Property.java    |  2 +
 .../accumulo/core/util/threads/ThreadPools.java    |  7 ++
 .../org/apache/accumulo/tserver/TabletServer.java  | 80 +++++++++++++---------
 3 files changed, 57 insertions(+), 32 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 667d78b..06131bc 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -656,6 +656,8 @@ public enum Property {
       "1.4.0"),
   TSERV_BULK_TIMEOUT("tserver.bulk.timeout", "5m", PropertyType.TIMEDURATION,
       "The time to wait for a tablet server to process a bulk import request.", "1.4.3"),
+  TSERV_HEALTH_CHECK_FREQ("tserver.health.check.interval", "30m", PropertyType.TIMEDURATION,
+      "The time between tablet server health checks.", "2.1.0"),
   TSERV_MINTHREADS("tserver.server.threads.minimum", "20", PropertyType.COUNT,
       "The minimum number of threads to use to handle incoming requests.", "1.4.0"),
   TSERV_MINTHREADS_TIMEOUT("tserver.server.threads.timeout", "0s", PropertyType.TIMEDURATION,
diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 73cb308..548852a 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -166,6 +166,13 @@ public class ThreadPools {
     CRITICAL_RUNNING_TASKS.add(future);
   }
 
+  public static void watchCriticalFixedDelay(AccumuloConfiguration aconf, long intervalMillis,
+      Runnable runnable) {
+    ScheduledFuture<?> future = getServerThreadPools().createGeneralScheduledExecutorService(aconf)
+        .scheduleWithFixedDelay(runnable, intervalMillis, intervalMillis, TimeUnit.MILLISECONDS);
+    CRITICAL_RUNNING_TASKS.add(future);
+  }
+
   public static void watchNonCriticalScheduledTask(ScheduledFuture<?> future) {
     NON_CRITICAL_RUNNING_TASKS.add(future);
   }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 3de8187..8c33967 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -23,12 +23,17 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
 import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+import static org.apache.accumulo.core.util.threads.ThreadPools.watchCriticalFixedDelay;
+import static org.apache.accumulo.core.util.threads.ThreadPools.watchCriticalScheduledTask;
+import static org.apache.accumulo.core.util.threads.ThreadPools.watchNonCriticalScheduledTask;
 import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
 import java.security.SecureRandom;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -164,6 +169,9 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterators;
 
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
 public class TabletServer extends AbstractServer {
 
   private static final SecureRandom random = new SecureRandom();
@@ -287,7 +295,7 @@ public class TabletServer extends AbstractServer {
               }
             }
           }), logBusyTabletsDelay, logBusyTabletsDelay, TimeUnit.MILLISECONDS);
-      ThreadPools.watchNonCriticalScheduledTask(future);
+      watchNonCriticalScheduledTask(future);
     }
 
     ScheduledFuture<?> future = context.getScheduledExecutor()
@@ -304,7 +312,7 @@ public class TabletServer extends AbstractServer {
             }
           }
         }), 5, 5, TimeUnit.SECONDS);
-    ThreadPools.watchNonCriticalScheduledTask(future);
+    watchNonCriticalScheduledTask(future);
 
     @SuppressWarnings("deprecation")
     final long walMaxSize =
@@ -352,7 +360,7 @@ public class TabletServer extends AbstractServer {
     this.resourceManager = new TabletServerResourceManager(context);
     this.security = AuditedSecurityOperation.getInstance(context);
 
-    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
+    watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
         TabletLocator::clearLocators, jitter(), jitter(), TimeUnit.MILLISECONDS));
     walMarker = new WalStateManager(context);
 
@@ -803,38 +811,46 @@ public class TabletServer extends AbstractServer {
         }
       }
     }, 0, 5, TimeUnit.SECONDS);
-    ThreadPools.watchNonCriticalScheduledTask(future);
+    watchNonCriticalScheduledTask(future);
 
-    int tabletCheckFrequency = 30 + random.nextInt(31); // random 30-60 minute delay
+    long tabletCheckFrequency = aconf.getTimeInMillis(Property.TSERV_HEALTH_CHECK_FREQ);
     // Periodically check that metadata of tablets matches what is held in memory
-    ThreadPools.watchCriticalScheduledTask(ThreadPools.getServerThreadPools()
-        .createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> {
-          final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot();
-
-          Map<KeyExtent,Long> updateCounts = new HashMap<>();
-
-          // gather updateCounts for each tablet
-          onlineTabletsSnapshot.forEach((ke, tablet) -> {
-            updateCounts.put(ke, tablet.getUpdateCount());
-          });
-
-          // gather metadata for all tablets readTablets()
-          try (TabletsMetadata tabletsMetadata =
-              getContext().getAmple().readTablets().forTablets(onlineTabletsSnapshot.keySet())
-                  .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
-
-            // for each tablet, compare its metadata to what is held in memory
-            tabletsMetadata.forEach(tabletMetadata -> {
-              KeyExtent extent = tabletMetadata.getExtent();
-              Tablet tablet = onlineTabletsSnapshot.get(extent);
-              Long counter = updateCounts.get(extent);
-              tablet.compareTabletInfo(counter, tabletMetadata);
-            });
+    watchCriticalFixedDelay(aconf, tabletCheckFrequency, () -> {
+      final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot();
+
+      Map<KeyExtent,Long> updateCounts = new HashMap<>();
+
+      // gather updateCounts for each tablet
+      onlineTabletsSnapshot.forEach((ke, tablet) -> {
+        updateCounts.put(ke, tablet.getUpdateCount());
+      });
+
+      Instant start = Instant.now();
+      Duration duration;
+      Span mdScanSpan = TraceUtil.startSpan(this.getClass(), "metadataScan");
+      try (Scope scope = mdScanSpan.makeCurrent()) {
+        // gather metadata for all tablets readTablets()
+        try (TabletsMetadata tabletsMetadata =
+            getContext().getAmple().readTablets().forTablets(onlineTabletsSnapshot.keySet())
+                .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
+          mdScanSpan.end();
+          duration = Duration.between(start, Instant.now());
+          log.debug("Metadata scan took {}ms for {} tablets read.", duration.toMillis(),
+              onlineTabletsSnapshot.keySet().size());
+
+          // for each tablet, compare its metadata to what is held in memory
+          for (var tabletMetadata : tabletsMetadata) {
+            KeyExtent extent = tabletMetadata.getExtent();
+            Tablet tablet = onlineTabletsSnapshot.get(extent);
+            Long counter = updateCounts.get(extent);
+            tablet.compareTabletInfo(counter, tabletMetadata);
           }
-        }, tabletCheckFrequency, tabletCheckFrequency, TimeUnit.MINUTES));
+        }
+      }
+    });
 
     final long CLEANUP_BULK_LOADED_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(15);
-    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
+    watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
         new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS,
         CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS));
 
@@ -963,7 +979,7 @@ public class TabletServer extends AbstractServer {
     };
     ScheduledFuture<?> future = context.getScheduledExecutor()
         .scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10, 30, TimeUnit.SECONDS);
-    ThreadPools.watchNonCriticalScheduledTask(future);
+    watchNonCriticalScheduledTask(future);
   }
 
   public String getClientAddressString() {
@@ -1030,7 +1046,7 @@ public class TabletServer extends AbstractServer {
 
     ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(gcDebugTask,
         0, TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
-    ThreadPools.watchNonCriticalScheduledTask(future);
+    watchNonCriticalScheduledTask(future);
   }
 
   public TabletServerStatus getStats(Map<TableId,MapCounter<ScanRunState>> scanCounts) {