You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2022/01/07 01:06:49 UTC

[accumulo] branch main updated: fixes #2373 support reading multiple metadata levels in Ample (#2394)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 22cf17b  fixes #2373 support reading multiple metadata levels in Ample (#2394)
22cf17b is described below

commit 22cf17b157c191035de816bb9295a8beccba34ff
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jan 6 20:05:55 2022 -0500

    fixes #2373 support reading multiple metadata levels in Ample (#2394)
---
 .../core/metadata/schema/TabletsMetadata.java      | 110 ++++++++++++---------
 .../org/apache/accumulo/tserver/TabletServer.java  |  32 +-----
 2 files changed, 69 insertions(+), 73 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index 8ffa08e..d82bb0a 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.core.metadata.schema;
 
 import static com.google.common.base.Preconditions.checkState;
+import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.toList;
 import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN;
 import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
@@ -34,6 +35,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Set;
@@ -78,6 +80,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 
 /**
@@ -112,7 +115,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
       if (extents != null) {
         // setting multiple extents with forTablets(extents) is mutually exclusive with these
         // single-tablet options
-        checkState(range == null && table == null && level == DataLevel.USER && !checkConsistency);
+        checkState(range == null && table == null && level == null && !checkConsistency);
         return buildExtents(_client);
       }
 
@@ -128,44 +131,62 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
 
     private TabletsMetadata buildExtents(AccumuloClient client) {
 
-      try {
-        BatchScanner scanner = client.createBatchScanner(level.metaTable(), Authorizations.EMPTY);
+      Map<DataLevel,List<KeyExtent>> groupedExtents =
+          extents.stream().collect(groupingBy(ke -> DataLevel.of(ke.tableId())));
 
-        var ranges = extents.stream().map(KeyExtent::toMetaRange).collect(toList());
-        scanner.setRanges(ranges);
+      List<Iterable<TabletMetadata>> iterables = new ArrayList<>();
 
-        boolean extentsPresent = extentsToFetch != null;
+      List<AutoCloseable> closables = new ArrayList<>();
 
-        if (!fetchedCols.isEmpty() && extentsPresent)
-          fetch(ColumnType.PREV_ROW);
+      Preconditions.checkState(extentsToFetch != null);
 
-        configureColumns(scanner);
-        IteratorSetting iterSetting = new IteratorSetting(100, WholeRowIterator.class);
-        scanner.addScanIterator(iterSetting);
-
-        Iterable<TabletMetadata> tmi = () -> {
-          Iterator<TabletMetadata> iter = Iterators.transform(scanner.iterator(), entry -> {
-            try {
-              return TabletMetadata.convertRow(WholeRowIterator
-                  .decodeRow(entry.getKey(), entry.getValue()).entrySet().iterator(), fetchedCols,
-                  saveKeyValues);
-            } catch (IOException e) {
-              throw new UncheckedIOException(e);
-            }
-          });
+      if (!fetchedCols.isEmpty())
+        fetch(ColumnType.PREV_ROW);
 
-          if (extentsPresent) {
-            return Iterators.filter(iter,
-                tabletMetadata -> extentsToFetch.contains(tabletMetadata.getExtent()));
-          } else {
-            return iter;
+      for (DataLevel level : groupedExtents.keySet()) {
+        if (level == DataLevel.ROOT) {
+          iterables.add(() -> Iterators
+              .singletonIterator(getRootMetadata((ClientContext) client, readConsistency)));
+        } else {
+          try {
+            BatchScanner scanner =
+                client.createBatchScanner(level.metaTable(), Authorizations.EMPTY);
+
+            var ranges =
+                groupedExtents.get(level).stream().map(KeyExtent::toMetaRange).collect(toList());
+            scanner.setRanges(ranges);
+
+            configureColumns(scanner);
+            IteratorSetting iterSetting = new IteratorSetting(100, WholeRowIterator.class);
+            scanner.addScanIterator(iterSetting);
+
+            Iterable<TabletMetadata> tmi = () -> Iterators.transform(scanner.iterator(), entry -> {
+              try {
+                return TabletMetadata.convertRow(WholeRowIterator
+                    .decodeRow(entry.getKey(), entry.getValue()).entrySet().iterator(), fetchedCols,
+                    saveKeyValues);
+              } catch (IOException e) {
+                throw new UncheckedIOException(e);
+              }
+            });
+
+            iterables.add(tmi);
+            closables.add(scanner);
+
+          } catch (TableNotFoundException e) {
+            throw new RuntimeException(e);
           }
-        };
 
-        return new TabletsMetadata(scanner, tmi);
-      } catch (TableNotFoundException e) {
-        throw new RuntimeException(e);
+        }
       }
+
+      return new TabletsMetadata(() -> {
+        for (AutoCloseable closable : closables) {
+          closable.close();
+        }
+      }, Iterables.filter(Iterables.concat(iterables),
+          tabletMetadata -> extentsToFetch.contains(tabletMetadata.getExtent())));
+
     }
 
     private TabletsMetadata buildNonRoot(AccumuloClient client) {
@@ -321,13 +342,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
 
     @Override
     public Options forTablets(Collection<KeyExtent> extents) {
-      if (extents.stream().map(KeyExtent::tableId).map(DataLevel::of)
-          .anyMatch(dl -> dl != DataLevel.USER)) {
-        throw new IllegalArgumentException(
-            "readTablets only supported for user tablets at this time.");
-      }
-
-      this.level = DataLevel.USER;
+      this.level = null;
       this.extents = extents;
       this.extentsToFetch = Set.copyOf(extents);
       return this;
@@ -512,24 +527,31 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
         .convertToTabletMetadata();
   }
 
-  private final ScannerBase scanner;
+  private final AutoCloseable closeable;
 
   private final Iterable<TabletMetadata> tablets;
 
   private TabletsMetadata(TabletMetadata tm) {
-    this.scanner = null;
+    this.closeable = null;
     this.tablets = Collections.singleton(tm);
   }
 
-  private TabletsMetadata(ScannerBase scanner, Iterable<TabletMetadata> tmi) {
-    this.scanner = scanner;
+  private TabletsMetadata(AutoCloseable closeable, Iterable<TabletMetadata> tmi) {
+    this.closeable = closeable;
     this.tablets = tmi;
   }
 
   @Override
   public void close() {
-    if (scanner != null) {
-      scanner.close();
+    if (closeable != null) {
+      try {
+        closeable.close();
+      } catch (RuntimeException e) {
+        // avoid wrapping runtime w/ runtime
+        throw e;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 96e8e66..a06b349 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -55,7 +55,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Stream;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Durability;
@@ -73,8 +72,6 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.TServerInstance;
-import org.apache.accumulo.core.metadata.schema.Ample;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.metrics.MetricsUtil;
 import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
@@ -807,22 +804,6 @@ public class TabletServer extends AbstractServer {
     ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> {
       final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot();
 
-      final SortedSet<KeyExtent> userExtents = new TreeSet<>();
-      final SortedSet<KeyExtent> nonUserExtents = new TreeSet<>();
-
-      // Create subsets of tablets based on DataLevel: one set who's DataLevel is USER and another
-      // containing the remaining tablets (those who's DataLevel is ROOT or METADATA).
-      // This needs to happen so we can use .readTablets() on the DataLevel.USER tablets in order
-      // to reduce RPCs.
-      // TODO: Push this partitioning, based on DataLevel, to ample - accumulo issue #2373
-      onlineTabletsSnapshot.forEach((ke, tablet) -> {
-        if (Ample.DataLevel.of(ke.tableId()) == Ample.DataLevel.USER) {
-          userExtents.add(ke);
-        } else {
-          nonUserExtents.add(ke);
-        }
-      });
-
       Map<KeyExtent,Long> updateCounts = new HashMap<>();
 
       // gather updateCounts for each tablet
@@ -830,19 +811,12 @@ public class TabletServer extends AbstractServer {
         updateCounts.put(ke, tablet.getUpdateCount());
       });
 
-      // gather metadata for all tablets with DataLevel.USER using readTablets()
+      // gather metadata for all tablets readTablets()
       try (TabletsMetadata tabletsMetadata = getContext().getAmple().readTablets()
-          .forTablets(userExtents).fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
-
-        Stream<TabletMetadata> userTablets = tabletsMetadata.stream();
-
-        // gather metadata for all tablets with DataLevel.ROOT or METADATA using readTablet()
-        Stream<TabletMetadata> nonUserTablets = nonUserExtents.stream().flatMap(extent -> Stream
-            .of(getContext().getAmple().readTablet(extent, FILES, LOGS, ECOMP, PREV_ROW)));
+          .forTablets(onlineTabletsSnapshot.keySet()).fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
 
-        // combine both streams of TabletMetadata
         // for each tablet, compare its metadata to what is held in memory
-        Stream.concat(userTablets, nonUserTablets).forEach(tabletMetadata -> {
+        tabletsMetadata.forEach(tabletMetadata -> {
           KeyExtent extent = tabletMetadata.getExtent();
           Tablet tablet = onlineTabletsSnapshot.get(extent);
           Long counter = updateCounts.get(extent);