You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2022/01/07 01:06:49 UTC
[accumulo] branch main updated: fixes #2373 support reading multiple metadata levels in Ample (#2394)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 22cf17b fixes #2373 support reading multiple metadata levels in Ample (#2394)
22cf17b is described below
commit 22cf17b157c191035de816bb9295a8beccba34ff
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jan 6 20:05:55 2022 -0500
fixes #2373 support reading multiple metadata levels in Ample (#2394)
---
.../core/metadata/schema/TabletsMetadata.java | 110 ++++++++++++---------
.../org/apache/accumulo/tserver/TabletServer.java | 32 +-----
2 files changed, 69 insertions(+), 73 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index 8ffa08e..d82bb0a 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.core.metadata.schema;
import static com.google.common.base.Preconditions.checkState;
+import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
@@ -34,6 +35,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
@@ -78,6 +80,7 @@ import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
/**
@@ -112,7 +115,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
if (extents != null) {
// setting multiple extents with forTablets(extents) is mutually exclusive with these
// single-tablet options
- checkState(range == null && table == null && level == DataLevel.USER && !checkConsistency);
+ checkState(range == null && table == null && level == null && !checkConsistency);
return buildExtents(_client);
}
@@ -128,44 +131,62 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
private TabletsMetadata buildExtents(AccumuloClient client) {
- try {
- BatchScanner scanner = client.createBatchScanner(level.metaTable(), Authorizations.EMPTY);
+ Map<DataLevel,List<KeyExtent>> groupedExtents =
+ extents.stream().collect(groupingBy(ke -> DataLevel.of(ke.tableId())));
- var ranges = extents.stream().map(KeyExtent::toMetaRange).collect(toList());
- scanner.setRanges(ranges);
+ List<Iterable<TabletMetadata>> iterables = new ArrayList<>();
- boolean extentsPresent = extentsToFetch != null;
+ List<AutoCloseable> closables = new ArrayList<>();
- if (!fetchedCols.isEmpty() && extentsPresent)
- fetch(ColumnType.PREV_ROW);
+ Preconditions.checkState(extentsToFetch != null);
- configureColumns(scanner);
- IteratorSetting iterSetting = new IteratorSetting(100, WholeRowIterator.class);
- scanner.addScanIterator(iterSetting);
-
- Iterable<TabletMetadata> tmi = () -> {
- Iterator<TabletMetadata> iter = Iterators.transform(scanner.iterator(), entry -> {
- try {
- return TabletMetadata.convertRow(WholeRowIterator
- .decodeRow(entry.getKey(), entry.getValue()).entrySet().iterator(), fetchedCols,
- saveKeyValues);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- });
+ if (!fetchedCols.isEmpty())
+ fetch(ColumnType.PREV_ROW);
- if (extentsPresent) {
- return Iterators.filter(iter,
- tabletMetadata -> extentsToFetch.contains(tabletMetadata.getExtent()));
- } else {
- return iter;
+ for (DataLevel level : groupedExtents.keySet()) {
+ if (level == DataLevel.ROOT) {
+ iterables.add(() -> Iterators
+ .singletonIterator(getRootMetadata((ClientContext) client, readConsistency)));
+ } else {
+ try {
+ BatchScanner scanner =
+ client.createBatchScanner(level.metaTable(), Authorizations.EMPTY);
+
+ var ranges =
+ groupedExtents.get(level).stream().map(KeyExtent::toMetaRange).collect(toList());
+ scanner.setRanges(ranges);
+
+ configureColumns(scanner);
+ IteratorSetting iterSetting = new IteratorSetting(100, WholeRowIterator.class);
+ scanner.addScanIterator(iterSetting);
+
+ Iterable<TabletMetadata> tmi = () -> Iterators.transform(scanner.iterator(), entry -> {
+ try {
+ return TabletMetadata.convertRow(WholeRowIterator
+ .decodeRow(entry.getKey(), entry.getValue()).entrySet().iterator(), fetchedCols,
+ saveKeyValues);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+
+ iterables.add(tmi);
+ closables.add(scanner);
+
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException(e);
}
- };
- return new TabletsMetadata(scanner, tmi);
- } catch (TableNotFoundException e) {
- throw new RuntimeException(e);
+ }
}
+
+ return new TabletsMetadata(() -> {
+ for (AutoCloseable closable : closables) {
+ closable.close();
+ }
+ }, Iterables.filter(Iterables.concat(iterables),
+ tabletMetadata -> extentsToFetch.contains(tabletMetadata.getExtent())));
+
}
private TabletsMetadata buildNonRoot(AccumuloClient client) {
@@ -321,13 +342,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
@Override
public Options forTablets(Collection<KeyExtent> extents) {
- if (extents.stream().map(KeyExtent::tableId).map(DataLevel::of)
- .anyMatch(dl -> dl != DataLevel.USER)) {
- throw new IllegalArgumentException(
- "readTablets only supported for user tablets at this time.");
- }
-
- this.level = DataLevel.USER;
+ this.level = null;
this.extents = extents;
this.extentsToFetch = Set.copyOf(extents);
return this;
@@ -512,24 +527,31 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
.convertToTabletMetadata();
}
- private final ScannerBase scanner;
+ private final AutoCloseable closeable;
private final Iterable<TabletMetadata> tablets;
private TabletsMetadata(TabletMetadata tm) {
- this.scanner = null;
+ this.closeable = null;
this.tablets = Collections.singleton(tm);
}
- private TabletsMetadata(ScannerBase scanner, Iterable<TabletMetadata> tmi) {
- this.scanner = scanner;
+ private TabletsMetadata(AutoCloseable closeable, Iterable<TabletMetadata> tmi) {
+ this.closeable = closeable;
this.tablets = tmi;
}
@Override
public void close() {
- if (scanner != null) {
- scanner.close();
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (RuntimeException e) {
+ // avoid wrapping runtime w/ runtime
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 96e8e66..a06b349 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -55,7 +55,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Stream;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Durability;
@@ -73,8 +72,6 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.TServerInstance;
-import org.apache.accumulo.core.metadata.schema.Ample;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
@@ -807,22 +804,6 @@ public class TabletServer extends AbstractServer {
ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> {
final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot();
- final SortedSet<KeyExtent> userExtents = new TreeSet<>();
- final SortedSet<KeyExtent> nonUserExtents = new TreeSet<>();
-
- // Create subsets of tablets based on DataLevel: one set who's DataLevel is USER and another
- // containing the remaining tablets (those who's DataLevel is ROOT or METADATA).
- // This needs to happen so we can use .readTablets() on the DataLevel.USER tablets in order
- // to reduce RPCs.
- // TODO: Push this partitioning, based on DataLevel, to ample - accumulo issue #2373
- onlineTabletsSnapshot.forEach((ke, tablet) -> {
- if (Ample.DataLevel.of(ke.tableId()) == Ample.DataLevel.USER) {
- userExtents.add(ke);
- } else {
- nonUserExtents.add(ke);
- }
- });
-
Map<KeyExtent,Long> updateCounts = new HashMap<>();
// gather updateCounts for each tablet
@@ -830,19 +811,12 @@ public class TabletServer extends AbstractServer {
updateCounts.put(ke, tablet.getUpdateCount());
});
- // gather metadata for all tablets with DataLevel.USER using readTablets()
+ // gather metadata for all tablets readTablets()
try (TabletsMetadata tabletsMetadata = getContext().getAmple().readTablets()
- .forTablets(userExtents).fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
-
- Stream<TabletMetadata> userTablets = tabletsMetadata.stream();
-
- // gather metadata for all tablets with DataLevel.ROOT or METADATA using readTablet()
- Stream<TabletMetadata> nonUserTablets = nonUserExtents.stream().flatMap(extent -> Stream
- .of(getContext().getAmple().readTablet(extent, FILES, LOGS, ECOMP, PREV_ROW)));
+ .forTablets(onlineTabletsSnapshot.keySet()).fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
- // combine both streams of TabletMetadata
// for each tablet, compare its metadata to what is held in memory
- Stream.concat(userTablets, nonUserTablets).forEach(tabletMetadata -> {
+ tabletsMetadata.forEach(tabletMetadata -> {
KeyExtent extent = tabletMetadata.getExtent();
Tablet tablet = onlineTabletsSnapshot.get(extent);
Long counter = updateCounts.get(extent);