You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2022/03/24 14:05:50 UTC
[accumulo] branch main updated: Add trace and debug log to consistency check (#2583)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new dd81d60 Add trace and debug log to consistency check (#2583)
dd81d60 is described below
commit dd81d608bff88835676e756ffcb2a2f6b69c9be8
Author: Mike Miller <mm...@apache.org>
AuthorDate: Thu Mar 24 10:05:44 2022 -0400
Add trace and debug log to consistency check (#2583)
* Closes #2577
* Add trace span and time measurement around consistency check
so we get an idea of how long metadata scans are taking
* Create new property tserver.health.check.interval to make it configurable
* Create new method watchCriticalFixedDelay() in ThreadPools
Co-authored-by: Keith Turner <kt...@apache.org>
---
.../org/apache/accumulo/core/conf/Property.java | 2 +
.../accumulo/core/util/threads/ThreadPools.java | 7 ++
.../org/apache/accumulo/tserver/TabletServer.java | 80 +++++++++++++---------
3 files changed, 57 insertions(+), 32 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 667d78b..06131bc 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -656,6 +656,8 @@ public enum Property {
"1.4.0"),
TSERV_BULK_TIMEOUT("tserver.bulk.timeout", "5m", PropertyType.TIMEDURATION,
"The time to wait for a tablet server to process a bulk import request.", "1.4.3"),
+ TSERV_HEALTH_CHECK_FREQ("tserver.health.check.interval", "30m", PropertyType.TIMEDURATION,
+ "The time between tablet server health checks.", "2.1.0"),
TSERV_MINTHREADS("tserver.server.threads.minimum", "20", PropertyType.COUNT,
"The minimum number of threads to use to handle incoming requests.", "1.4.0"),
TSERV_MINTHREADS_TIMEOUT("tserver.server.threads.timeout", "0s", PropertyType.TIMEDURATION,
diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 73cb308..548852a 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -166,6 +166,13 @@ public class ThreadPools {
CRITICAL_RUNNING_TASKS.add(future);
}
+ public static void watchCriticalFixedDelay(AccumuloConfiguration aconf, long intervalMillis,
+ Runnable runnable) {
+ ScheduledFuture<?> future = getServerThreadPools().createGeneralScheduledExecutorService(aconf)
+ .scheduleWithFixedDelay(runnable, intervalMillis, intervalMillis, TimeUnit.MILLISECONDS);
+ CRITICAL_RUNNING_TASKS.add(future);
+ }
+
public static void watchNonCriticalScheduledTask(ScheduledFuture<?> future) {
NON_CRITICAL_RUNNING_TASKS.add(future);
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 3de8187..8c33967 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -23,12 +23,17 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+import static org.apache.accumulo.core.util.threads.ThreadPools.watchCriticalFixedDelay;
+import static org.apache.accumulo.core.util.threads.ThreadPools.watchCriticalScheduledTask;
+import static org.apache.accumulo.core.util.threads.ThreadPools.watchNonCriticalScheduledTask;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.security.SecureRandom;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -164,6 +169,9 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
public class TabletServer extends AbstractServer {
private static final SecureRandom random = new SecureRandom();
@@ -287,7 +295,7 @@ public class TabletServer extends AbstractServer {
}
}
}), logBusyTabletsDelay, logBusyTabletsDelay, TimeUnit.MILLISECONDS);
- ThreadPools.watchNonCriticalScheduledTask(future);
+ watchNonCriticalScheduledTask(future);
}
ScheduledFuture<?> future = context.getScheduledExecutor()
@@ -304,7 +312,7 @@ public class TabletServer extends AbstractServer {
}
}
}), 5, 5, TimeUnit.SECONDS);
- ThreadPools.watchNonCriticalScheduledTask(future);
+ watchNonCriticalScheduledTask(future);
@SuppressWarnings("deprecation")
final long walMaxSize =
@@ -352,7 +360,7 @@ public class TabletServer extends AbstractServer {
this.resourceManager = new TabletServerResourceManager(context);
this.security = AuditedSecurityOperation.getInstance(context);
- ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
+ watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
TabletLocator::clearLocators, jitter(), jitter(), TimeUnit.MILLISECONDS));
walMarker = new WalStateManager(context);
@@ -803,38 +811,46 @@ public class TabletServer extends AbstractServer {
}
}
}, 0, 5, TimeUnit.SECONDS);
- ThreadPools.watchNonCriticalScheduledTask(future);
+ watchNonCriticalScheduledTask(future);
- int tabletCheckFrequency = 30 + random.nextInt(31); // random 30-60 minute delay
+ long tabletCheckFrequency = aconf.getTimeInMillis(Property.TSERV_HEALTH_CHECK_FREQ);
// Periodically check that metadata of tablets matches what is held in memory
- ThreadPools.watchCriticalScheduledTask(ThreadPools.getServerThreadPools()
- .createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> {
- final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot();
-
- Map<KeyExtent,Long> updateCounts = new HashMap<>();
-
- // gather updateCounts for each tablet
- onlineTabletsSnapshot.forEach((ke, tablet) -> {
- updateCounts.put(ke, tablet.getUpdateCount());
- });
-
- // gather metadata for all tablets readTablets()
- try (TabletsMetadata tabletsMetadata =
- getContext().getAmple().readTablets().forTablets(onlineTabletsSnapshot.keySet())
- .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
-
- // for each tablet, compare its metadata to what is held in memory
- tabletsMetadata.forEach(tabletMetadata -> {
- KeyExtent extent = tabletMetadata.getExtent();
- Tablet tablet = onlineTabletsSnapshot.get(extent);
- Long counter = updateCounts.get(extent);
- tablet.compareTabletInfo(counter, tabletMetadata);
- });
+ watchCriticalFixedDelay(aconf, tabletCheckFrequency, () -> {
+ final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot();
+
+ Map<KeyExtent,Long> updateCounts = new HashMap<>();
+
+ // gather updateCounts for each tablet
+ onlineTabletsSnapshot.forEach((ke, tablet) -> {
+ updateCounts.put(ke, tablet.getUpdateCount());
+ });
+
+ Instant start = Instant.now();
+ Duration duration;
+ Span mdScanSpan = TraceUtil.startSpan(this.getClass(), "metadataScan");
+ try (Scope scope = mdScanSpan.makeCurrent()) {
+ // gather metadata for all tablets readTablets()
+ try (TabletsMetadata tabletsMetadata =
+ getContext().getAmple().readTablets().forTablets(onlineTabletsSnapshot.keySet())
+ .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
+ mdScanSpan.end();
+ duration = Duration.between(start, Instant.now());
+ log.debug("Metadata scan took {}ms for {} tablets read.", duration.toMillis(),
+ onlineTabletsSnapshot.keySet().size());
+
+ // for each tablet, compare its metadata to what is held in memory
+ for (var tabletMetadata : tabletsMetadata) {
+ KeyExtent extent = tabletMetadata.getExtent();
+ Tablet tablet = onlineTabletsSnapshot.get(extent);
+ Long counter = updateCounts.get(extent);
+ tablet.compareTabletInfo(counter, tabletMetadata);
}
- }, tabletCheckFrequency, tabletCheckFrequency, TimeUnit.MINUTES));
+ }
+ }
+ });
final long CLEANUP_BULK_LOADED_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(15);
- ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
+ watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS,
CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS));
@@ -963,7 +979,7 @@ public class TabletServer extends AbstractServer {
};
ScheduledFuture<?> future = context.getScheduledExecutor()
.scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10, 30, TimeUnit.SECONDS);
- ThreadPools.watchNonCriticalScheduledTask(future);
+ watchNonCriticalScheduledTask(future);
}
public String getClientAddressString() {
@@ -1030,7 +1046,7 @@ public class TabletServer extends AbstractServer {
ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(gcDebugTask,
0, TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
- ThreadPools.watchNonCriticalScheduledTask(future);
+ watchNonCriticalScheduledTask(future);
}
public TabletServerStatus getStats(Map<TableId,MapCounter<ScanRunState>> scanCounts) {