You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by do...@apache.org on 2021/12/06 21:08:41 UTC
[accumulo] branch main updated: Periodically verify tablet metadata (#2320)
This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 1687ede Periodically verify tablet metadata (#2320)
1687ede is described below
commit 1687ede211888da0dcd40323c894eff0e00fce26
Author: Dom G <47...@users.noreply.github.com>
AuthorDate: Mon Dec 6 16:08:30 2021 -0500
Periodically verify tablet metadata (#2320)
Adds scheduled executor to periodically verify that metadata of tablets matches what is held in memory.
Co-authored-by: Keith Turner <kt...@apache.org>
---
.../org/apache/accumulo/tserver/TabletServer.java | 57 ++++++++++++++++++++++
.../accumulo/tserver/tablet/DatafileManager.java | 15 ++++--
.../org/apache/accumulo/tserver/tablet/Tablet.java | 31 +++++++++---
3 files changed, 94 insertions(+), 9 deletions(-)
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 63c40b0..96e8e66 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -19,6 +19,10 @@
package org.apache.accumulo.tserver;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.io.IOException;
@@ -51,6 +55,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Stream;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Durability;
@@ -68,6 +73,9 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
import org.apache.accumulo.core.rpc.ThriftUtil;
@@ -794,6 +802,55 @@ public class TabletServer extends AbstractServer {
}
}, 0, 5000, TimeUnit.MILLISECONDS);
+ int tabletCheckFrequency = 30 + random.nextInt(31); // random 30-60 minute delay
+ // Periodically check that metadata of tablets matches what is held in memory
+ ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> {
+ final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot();
+
+ final SortedSet<KeyExtent> userExtents = new TreeSet<>();
+ final SortedSet<KeyExtent> nonUserExtents = new TreeSet<>();
+
+ // Create subsets of tablets based on DataLevel: one set who's DataLevel is USER and another
+ // containing the remaining tablets (those who's DataLevel is ROOT or METADATA).
+ // This needs to happen so we can use .readTablets() on the DataLevel.USER tablets in order
+ // to reduce RPCs.
+ // TODO: Push this partitioning, based on DataLevel, to ample - accumulo issue #2373
+ onlineTabletsSnapshot.forEach((ke, tablet) -> {
+ if (Ample.DataLevel.of(ke.tableId()) == Ample.DataLevel.USER) {
+ userExtents.add(ke);
+ } else {
+ nonUserExtents.add(ke);
+ }
+ });
+
+ Map<KeyExtent,Long> updateCounts = new HashMap<>();
+
+ // gather updateCounts for each tablet
+ onlineTabletsSnapshot.forEach((ke, tablet) -> {
+ updateCounts.put(ke, tablet.getUpdateCount());
+ });
+
+ // gather metadata for all tablets with DataLevel.USER using readTablets()
+ try (TabletsMetadata tabletsMetadata = getContext().getAmple().readTablets()
+ .forTablets(userExtents).fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
+
+ Stream<TabletMetadata> userTablets = tabletsMetadata.stream();
+
+ // gather metadata for all tablets with DataLevel.ROOT or METADATA using readTablet()
+ Stream<TabletMetadata> nonUserTablets = nonUserExtents.stream().flatMap(extent -> Stream
+ .of(getContext().getAmple().readTablet(extent, FILES, LOGS, ECOMP, PREV_ROW)));
+
+ // combine both streams of TabletMetadata
+ // for each tablet, compare its metadata to what is held in memory
+ Stream.concat(userTablets, nonUserTablets).forEach(tabletMetadata -> {
+ KeyExtent extent = tabletMetadata.getExtent();
+ Tablet tablet = onlineTabletsSnapshot.get(extent);
+ Long counter = updateCounts.get(extent);
+ tablet.compareTabletInfo(counter, tabletMetadata);
+ });
+ }
+ }, tabletCheckFrequency, tabletCheckFrequency, TimeUnit.MINUTES);
+
final long CLEANUP_BULK_LOADED_CACHE_MILLIS = 15 * 60 * 1000;
context.getScheduledExecutor().scheduleWithFixedDelay(new BulkImportCacheCleaner(this),
CLEANUP_BULK_LOADED_CACHE_MILLIS, CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index 91826a2..1068a86 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -69,10 +69,12 @@ class DatafileManager {
// ensure we only have one reader/writer of our bulk file notes at at time
private final Object bulkFileImportLock = new Object();
+ // This must be incremented whenever datafileSizes is mutated
+ private long updateCount;
+
DatafileManager(Tablet tablet, SortedMap<StoredTabletFile,DataFileValue> datafileSizes) {
- for (Entry<StoredTabletFile,DataFileValue> datafiles : datafileSizes.entrySet()) {
- this.datafileSizes.put(datafiles.getKey(), datafiles.getValue());
- }
+ this.datafileSizes.putAll(datafileSizes);
+ this.updateCount = 0L;
this.tablet = tablet;
}
@@ -260,6 +262,7 @@ class DatafileManager {
}
datafileSizes.put(tpath.getKey(), tpath.getValue());
}
+ updateCount++;
tablet.getTabletResources().importedMapFiles();
@@ -386,6 +389,7 @@ class DatafileManager {
log.error("Adding file that is already in set {}", newFileStored);
}
datafileSizes.put(newFileStored, dfv);
+ updateCount++;
}
tablet.flushComplete(flushId);
@@ -456,6 +460,7 @@ class DatafileManager {
datafileSizes.put(newFile, dfv);
// could be used by a follow on compaction in a multipass compaction
}
+ updateCount++;
tablet.computeNumEntries();
@@ -504,4 +509,8 @@ class DatafileManager {
return datafileSizes.size();
}
+ public long getUpdateCount() {
+ return updateCount;
+ }
+
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index ac98e80..56645d6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -82,6 +82,7 @@ import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.security.Authorizations;
@@ -1401,12 +1402,7 @@ public class Tablet {
throw new RuntimeException(msg);
}
- if (!tabletMeta.getFilesMap().equals(getDatafileManager().getDatafileSizes())) {
- String msg = "Data files in differ from in memory data " + extent + " "
- + tabletMeta.getFilesMap() + " " + getDatafileManager().getDatafileSizes();
- log.error(msg);
- throw new RuntimeException(msg);
- }
+ compareToDataInMemory(tabletMeta);
} catch (Exception e) {
String msg = "Failed to do close consistency check for tablet " + extent;
log.error(msg, e);
@@ -1424,6 +1420,25 @@ public class Tablet {
// TODO check lastFlushID and lostCompactID - ACCUMULO-1290
}
+ private void compareToDataInMemory(TabletMetadata tabletMetadata) {
+ if (!tabletMetadata.getFilesMap().equals(getDatafileManager().getDatafileSizes())) {
+ String msg = "Data files in " + extent + " differ from in-memory data "
+ + tabletMetadata.getFilesMap() + " " + getDatafileManager().getDatafileSizes();
+ log.error(msg);
+ }
+ }
+
+ public synchronized void compareTabletInfo(Long updateCounter, TabletMetadata tabletMetadata) {
+ if (isClosed() || isClosing()) {
+ return;
+ }
+ // if the counter didn't change, compare metadata to what is in memory
+ if (updateCounter == this.getUpdateCount()) {
+ this.compareToDataInMemory(tabletMetadata);
+ }
+ // if counter did change, don't compare metadata and try again later
+ }
+
/**
* Returns an int representing the total block size of the files served by this tablet.
*
@@ -2234,6 +2249,10 @@ public class Tablet {
return datafileManager;
}
+ public synchronized long getUpdateCount() {
+ return getDatafileManager().getUpdateCount();
+ }
+
TabletMemory getTabletMemory() {
return tabletMemory;
}