You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by do...@apache.org on 2021/12/06 21:08:41 UTC

[accumulo] branch main updated: Periodically verify tablet metadata (#2320)

This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 1687ede  Periodically verify tablet metadata (#2320)
1687ede is described below

commit 1687ede211888da0dcd40323c894eff0e00fce26
Author: Dom G <47...@users.noreply.github.com>
AuthorDate: Mon Dec 6 16:08:30 2021 -0500

    Periodically verify tablet metadata (#2320)
    
    Adds scheduled executor to periodically verify that metadata of tablets matches what is held in memory.
    
    Co-authored-by: Keith Turner <kt...@apache.org>
---
 .../org/apache/accumulo/tserver/TabletServer.java  | 57 ++++++++++++++++++++++
 .../accumulo/tserver/tablet/DatafileManager.java   | 15 ++++--
 .../org/apache/accumulo/tserver/tablet/Tablet.java | 31 +++++++++---
 3 files changed, 94 insertions(+), 9 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 63c40b0..96e8e66 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -19,6 +19,10 @@
 package org.apache.accumulo.tserver;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.io.IOException;
@@ -51,6 +55,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Stream;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Durability;
@@ -68,6 +73,9 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.metrics.MetricsUtil;
 import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
 import org.apache.accumulo.core.rpc.ThriftUtil;
@@ -794,6 +802,55 @@ public class TabletServer extends AbstractServer {
       }
     }, 0, 5000, TimeUnit.MILLISECONDS);
 
+    int tabletCheckFrequency = 30 + random.nextInt(31); // random 30-60 minute delay
+    // Periodically check that metadata of tablets matches what is held in memory
+    ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> {
+      final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot();
+
+      final SortedSet<KeyExtent> userExtents = new TreeSet<>();
+      final SortedSet<KeyExtent> nonUserExtents = new TreeSet<>();
+
+      // Create subsets of tablets based on DataLevel: one set who's DataLevel is USER and another
+      // containing the remaining tablets (those who's DataLevel is ROOT or METADATA).
+      // This needs to happen so we can use .readTablets() on the DataLevel.USER tablets in order
+      // to reduce RPCs.
+      // TODO: Push this partitioning, based on DataLevel, to ample - accumulo issue #2373
+      onlineTabletsSnapshot.forEach((ke, tablet) -> {
+        if (Ample.DataLevel.of(ke.tableId()) == Ample.DataLevel.USER) {
+          userExtents.add(ke);
+        } else {
+          nonUserExtents.add(ke);
+        }
+      });
+
+      Map<KeyExtent,Long> updateCounts = new HashMap<>();
+
+      // gather updateCounts for each tablet
+      onlineTabletsSnapshot.forEach((ke, tablet) -> {
+        updateCounts.put(ke, tablet.getUpdateCount());
+      });
+
+      // gather metadata for all tablets with DataLevel.USER using readTablets()
+      try (TabletsMetadata tabletsMetadata = getContext().getAmple().readTablets()
+          .forTablets(userExtents).fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
+
+        Stream<TabletMetadata> userTablets = tabletsMetadata.stream();
+
+        // gather metadata for all tablets with DataLevel.ROOT or METADATA using readTablet()
+        Stream<TabletMetadata> nonUserTablets = nonUserExtents.stream().flatMap(extent -> Stream
+            .of(getContext().getAmple().readTablet(extent, FILES, LOGS, ECOMP, PREV_ROW)));
+
+        // combine both streams of TabletMetadata
+        // for each tablet, compare its metadata to what is held in memory
+        Stream.concat(userTablets, nonUserTablets).forEach(tabletMetadata -> {
+          KeyExtent extent = tabletMetadata.getExtent();
+          Tablet tablet = onlineTabletsSnapshot.get(extent);
+          Long counter = updateCounts.get(extent);
+          tablet.compareTabletInfo(counter, tabletMetadata);
+        });
+      }
+    }, tabletCheckFrequency, tabletCheckFrequency, TimeUnit.MINUTES);
+
     final long CLEANUP_BULK_LOADED_CACHE_MILLIS = 15 * 60 * 1000;
     context.getScheduledExecutor().scheduleWithFixedDelay(new BulkImportCacheCleaner(this),
         CLEANUP_BULK_LOADED_CACHE_MILLIS, CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index 91826a2..1068a86 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -69,10 +69,12 @@ class DatafileManager {
   // ensure we only have one reader/writer of our bulk file notes at at time
   private final Object bulkFileImportLock = new Object();
 
+  // This must be incremented whenever datafileSizes is mutated
+  private long updateCount;
+
   DatafileManager(Tablet tablet, SortedMap<StoredTabletFile,DataFileValue> datafileSizes) {
-    for (Entry<StoredTabletFile,DataFileValue> datafiles : datafileSizes.entrySet()) {
-      this.datafileSizes.put(datafiles.getKey(), datafiles.getValue());
-    }
+    this.datafileSizes.putAll(datafileSizes);
+    this.updateCount = 0L;
     this.tablet = tablet;
   }
 
@@ -260,6 +262,7 @@ class DatafileManager {
         }
         datafileSizes.put(tpath.getKey(), tpath.getValue());
       }
+      updateCount++;
 
       tablet.getTabletResources().importedMapFiles();
 
@@ -386,6 +389,7 @@ class DatafileManager {
           log.error("Adding file that is already in set {}", newFileStored);
         }
         datafileSizes.put(newFileStored, dfv);
+        updateCount++;
       }
 
       tablet.flushComplete(flushId);
@@ -456,6 +460,7 @@ class DatafileManager {
         datafileSizes.put(newFile, dfv);
         // could be used by a follow on compaction in a multipass compaction
       }
+      updateCount++;
 
       tablet.computeNumEntries();
 
@@ -504,4 +509,8 @@ class DatafileManager {
     return datafileSizes.size();
   }
 
+  public long getUpdateCount() {
+    return updateCount;
+  }
+
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index ac98e80..56645d6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -82,6 +82,7 @@ import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataTime;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.security.Authorizations;
@@ -1401,12 +1402,7 @@ public class Tablet {
         throw new RuntimeException(msg);
       }
 
-      if (!tabletMeta.getFilesMap().equals(getDatafileManager().getDatafileSizes())) {
-        String msg = "Data files in differ from in memory data " + extent + "  "
-            + tabletMeta.getFilesMap() + "  " + getDatafileManager().getDatafileSizes();
-        log.error(msg);
-        throw new RuntimeException(msg);
-      }
+      compareToDataInMemory(tabletMeta);
     } catch (Exception e) {
       String msg = "Failed to do close consistency check for tablet " + extent;
       log.error(msg, e);
@@ -1424,6 +1420,25 @@ public class Tablet {
     // TODO check lastFlushID and lostCompactID - ACCUMULO-1290
   }
 
+  private void compareToDataInMemory(TabletMetadata tabletMetadata) {
+    if (!tabletMetadata.getFilesMap().equals(getDatafileManager().getDatafileSizes())) {
+      String msg = "Data files in " + extent + " differ from in-memory data "
+          + tabletMetadata.getFilesMap() + " " + getDatafileManager().getDatafileSizes();
+      log.error(msg);
+    }
+  }
+
+  public synchronized void compareTabletInfo(Long updateCounter, TabletMetadata tabletMetadata) {
+    if (isClosed() || isClosing()) {
+      return;
+    }
+    // if the counter didn't change, compare metadata to what is in memory
+    if (updateCounter == this.getUpdateCount()) {
+      this.compareToDataInMemory(tabletMetadata);
+    }
+    // if counter did change, don't compare metadata and try again later
+  }
+
   /**
    * Returns an int representing the total block size of the files served by this tablet.
    *
@@ -2234,6 +2249,10 @@ public class Tablet {
     return datafileManager;
   }
 
+  public synchronized long getUpdateCount() {
+    return getDatafileManager().getUpdateCount();
+  }
+
   TabletMemory getTabletMemory() {
     return tabletMemory;
   }