You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2018/12/10 18:03:58 UTC
[accumulo] branch master updated: Improved tablet metadata
abstraction layer and used it more. (#797)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 5484fa8 Improved tablet metadata abstraction layer and used it more. (#797)
5484fa8 is described below
commit 5484fa80022c52784d4000cdd422941fddec1441
Author: Keith Turner <kt...@apache.org>
AuthorDate: Mon Dec 10 13:03:54 2018 -0500
Improved tablet metadata abstraction layer and used it more. (#797)
---
.../core/clientImpl/ConcurrentKeyExtentCache.java | 6 +-
.../accumulo/core/clientImpl/OfflineIterator.java | 76 ++--
.../core/clientImpl/TableOperationsImpl.java | 80 ++--
.../core/metadata/schema/DataFileValue.java | 8 +-
.../core/metadata/schema/MetadataSchema.java | 18 +-
.../core/metadata/schema/TabletMetadata.java | 142 +++++--
.../{MetadataScanner.java => TabletsMetadata.java} | 406 ++++++++++++---------
.../org/apache/accumulo/core/summary/Gatherer.java | 13 +-
.../accumulo/core/tabletserver/log/LogEntry.java | 12 +-
.../java/org/apache/accumulo/core/util/Merge.java | 58 +--
.../core/clientImpl/TableOperationsImplTest.java | 76 ----
.../server/master/balancer/GroupBalancer.java | 68 +---
.../server/master/state/TServerInstance.java | 5 +
.../accumulo/server/util/MetadataTableUtil.java | 46 +--
.../apache/accumulo/gc/SimpleGarbageCollector.java | 7 +-
.../master/MasterClientServiceHandler.java | 98 ++---
.../master/tableOps/bulkVer2/LoadFiles.java | 8 +-
.../master/tableOps/bulkVer2/PrepBulkImport.java | 8 +-
.../master/tableOps/compact/CompactionDriver.java | 80 +---
.../accumulo/test/functional/BulkLoadIT.java | 8 +-
20 files changed, 519 insertions(+), 704 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConcurrentKeyExtentCache.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConcurrentKeyExtentCache.java
index 6c02d63..90093b2 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConcurrentKeyExtentCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConcurrentKeyExtentCache.java
@@ -34,8 +34,8 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.BulkImport.KeyExtentCache;
import org.apache.accumulo.core.clientImpl.Table.ID;
import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.metadata.schema.MetadataScanner;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.hadoop.io.Text;
import com.google.common.annotations.VisibleForTesting;
@@ -85,8 +85,8 @@ class ConcurrentKeyExtentCache implements KeyExtentCache {
@VisibleForTesting
protected Stream<KeyExtent> lookupExtents(Text row)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
- return MetadataScanner.builder().from(ctx).scanMetadataTable().overRange(tableId, row, null)
- .checkConsistency().fetchPrev().build().stream().limit(100).map(TabletMetadata::getExtent);
+ return TabletsMetadata.builder().forTable(tableId).overlapping(row, null).checkConsistency()
+ .fetchPrev().build(ctx).stream().limit(100).map(TabletMetadata::getExtent);
}
@Override
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
index afd9393..6f09508 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
@@ -30,9 +30,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.SampleNotPresentException;
-import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -53,15 +52,14 @@ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.MultiIterator;
import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.LocalityGroupUtil;
-import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.volume.VolumeConfiguration;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
@@ -213,7 +211,8 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
}
}
- private void nextTablet() throws TableNotFoundException, AccumuloException, IOException {
+ private void nextTablet()
+ throws TableNotFoundException, AccumuloException, IOException, AccumuloSecurityException {
Range nextRange = null;
@@ -225,8 +224,7 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
else
startRow = new Text();
- nextRange = new Range(new KeyExtent(tableId, startRow, null).getMetadataEntry(), true, null,
- false);
+ nextRange = new Range(TabletsSection.getRow(tableId, startRow), true, null, false);
} else {
if (currentExtent.getEndRow() == null) {
@@ -242,32 +240,30 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
nextRange = new Range(currentExtent.getMetadataEntry(), false, null, false);
}
- List<String> relFiles = new ArrayList<>();
+ TabletMetadata tablet = getTabletFiles(nextRange);
- Pair<KeyExtent,String> eloc = getTabletFiles(nextRange, relFiles);
-
- while (eloc.getSecond() != null) {
+ while (tablet.getLocation() != null) {
if (Tables.getTableState(context, tableId) != TableState.OFFLINE) {
Tables.clearCache(context);
if (Tables.getTableState(context, tableId) != TableState.OFFLINE) {
throw new AccumuloException("Table is online " + tableId
- + " cannot scan tablet in offline mode " + eloc.getFirst());
+ + " cannot scan tablet in offline mode " + tablet.getExtent());
}
}
sleepUninterruptibly(250, TimeUnit.MILLISECONDS);
- eloc = getTabletFiles(nextRange, relFiles);
+ tablet = getTabletFiles(nextRange);
}
- KeyExtent extent = eloc.getFirst();
-
- if (!extent.getTableId().equals(tableId)) {
- throw new AccumuloException(" did not find tablets for table " + tableId + " " + extent);
+ if (!tablet.getExtent().getTableId().equals(tableId)) {
+ throw new AccumuloException(
+ " did not find tablets for table " + tableId + " " + tablet.getExtent());
}
- if (currentExtent != null && !extent.isPreviousExtent(currentExtent))
- throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent);
+ if (currentExtent != null && !tablet.getExtent().isPreviousExtent(currentExtent))
+ throw new AccumuloException(
+ " " + currentExtent + " is not previous extent " + tablet.getExtent());
// Old property is only used to resolve relative paths into absolute paths. For systems upgraded
// with relative paths, it's assumed that correct instance.dfs.{uri,dir} is still correct in the
@@ -276,7 +272,7 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
String tablesDir = config.get(Property.INSTANCE_DFS_DIR) + Constants.HDFS_TABLES_DIR;
List<String> absFiles = new ArrayList<>();
- for (String relPath : relFiles) {
+ for (String relPath : tablet.getFiles()) {
if (relPath.contains(":")) {
absFiles.add(relPath);
} else {
@@ -289,44 +285,20 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
}
}
- iter = createIterator(extent, absFiles);
+ iter = createIterator(tablet.getExtent(), absFiles);
iter.seek(range, LocalityGroupUtil.families(options.fetchedColumns),
options.fetchedColumns.size() != 0);
- currentExtent = extent;
+ currentExtent = tablet.getExtent();
}
- private Pair<KeyExtent,String> getTabletFiles(Range nextRange, List<String> relFiles)
- throws TableNotFoundException {
- Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- scanner.setBatchSize(100);
- scanner.setRange(nextRange);
-
- RowIterator rowIter = new RowIterator(scanner);
- Iterator<Entry<Key,Value>> row = rowIter.next();
-
- KeyExtent extent = null;
- String location = null;
-
- while (row.hasNext()) {
- Entry<Key,Value> entry = row.next();
- Key key = entry.getKey();
-
- if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
- relFiles.add(key.getColumnQualifier().toString());
- }
-
- if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)
- || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) {
- location = entry.getValue().toString();
- }
-
- if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
- extent = new KeyExtent(key.getRow(), entry.getValue());
- }
+ private TabletMetadata getTabletFiles(Range nextRange)
+ throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ try (TabletsMetadata tablets = TabletsMetadata.builder().scanMetadataTable()
+ .overRange(nextRange).fetchFiles().fetchLocation().fetchPrev().build(client)) {
+ return tablets.iterator().next();
}
- return new Pair<>(extent, location);
}
private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent, List<String> absFiles)
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index a5e5f1a..c4dc693 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -37,7 +37,6 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -62,11 +61,9 @@ import java.util.zip.ZipInputStream;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.NamespaceExistsException;
import org.apache.accumulo.core.client.NamespaceNotFoundException;
-import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableExistsException;
@@ -93,10 +90,8 @@ import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.constraints.Constraint;
import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TabletId;
-import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
@@ -112,7 +107,10 @@ import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.metadata.MetadataServicer;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
@@ -1191,13 +1189,8 @@ public class TableOperationsImpl extends TableOperationsHelper {
else
range = new Range(startRow, lastRow);
- String metaTable = MetadataTable.NAME;
- if (tableId.equals(MetadataTable.ID))
- metaTable = RootTable.NAME;
-
- Scanner scanner = createMetadataScanner(metaTable, range);
-
- RowIterator rowIter = new RowIterator(scanner);
+ TabletsMetadata tablets = TabletsMetadata.builder().scanMetadataTable().overRange(range)
+ .fetchLocation().fetchPrev().build(context);
KeyExtent lastExtent = null;
@@ -1207,51 +1200,34 @@ public class TableOperationsImpl extends TableOperationsHelper {
Text continueRow = null;
MapCounter<String> serverCounts = new MapCounter<>();
- while (rowIter.hasNext()) {
- Iterator<Entry<Key,Value>> row = rowIter.next();
-
+ for (TabletMetadata tablet : tablets) {
total++;
- KeyExtent extent = null;
- String future = null;
- String current = null;
-
- while (row.hasNext()) {
- Entry<Key,Value> entry = row.next();
- Key key = entry.getKey();
-
- if (key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME))
- future = entry.getValue().toString();
-
- if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME))
- current = entry.getValue().toString();
-
- if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key))
- extent = new KeyExtent(key.getRow(), entry.getValue());
- }
+ Location loc = tablet.getLocation();
- if ((expectedState == TableState.ONLINE && current == null)
- || (expectedState == TableState.OFFLINE && (future != null || current != null))) {
+ if ((expectedState == TableState.ONLINE
+ && (loc == null || loc.getType() == LocationType.FUTURE))
+ || (expectedState == TableState.OFFLINE && loc != null)) {
if (continueRow == null)
- continueRow = extent.getMetadataEntry();
+ continueRow = tablet.getExtent().getMetadataEntry();
waitFor++;
- lastRow = extent.getMetadataEntry();
+ lastRow = tablet.getExtent().getMetadataEntry();
- if (current != null)
- serverCounts.increment(current, 1);
- if (future != null)
- serverCounts.increment(future, 1);
+ if (loc != null) {
+ serverCounts.increment(loc.getId(), 1);
+ }
}
- if (!extent.getTableId().equals(tableId)) {
- throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
+ if (!tablet.getExtent().getTableId().equals(tableId)) {
+ throw new AccumuloException(
+ "Saw unexpected table Id " + tableId + " " + tablet.getExtent());
}
- if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
+ if (lastExtent != null && !tablet.getExtent().isPreviousExtent(lastExtent)) {
holes++;
}
- lastExtent = extent;
+ lastExtent = tablet.getExtent();
}
if (continueRow != null) {
@@ -1283,20 +1259,6 @@ public class TableOperationsImpl extends TableOperationsHelper {
}
}
- /**
- * Create an IsolatedScanner over the given table, fetching the columns necessary to determine
- * when a table has transitioned to online or offline.
- */
- protected IsolatedScanner createMetadataScanner(String metaTable, Range range)
- throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
- Scanner scanner = context.getClient().createScanner(metaTable, Authorizations.EMPTY);
- TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
- scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME);
- scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
- scanner.setRange(range);
- return new IsolatedScanner(scanner);
- }
-
@Override
public void offline(String tableName)
throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java
index 5f1379d..6e3ddf2 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java
@@ -37,8 +37,8 @@ public class DataFileValue {
this.time = -1;
}
- public DataFileValue(byte[] encodedDFV) {
- String[] ba = new String(encodedDFV, UTF_8).split(",");
+ public DataFileValue(String encodedDFV) {
+ String[] ba = encodedDFV.split(",");
size = Long.parseLong(ba[0]);
numEntries = Long.parseLong(ba[1]);
@@ -49,6 +49,10 @@ public class DataFileValue {
time = -1;
}
+ public DataFileValue(byte[] encodedDFV) {
+ this(new String(encodedDFV, UTF_8));
+ }
+
public long getSize() {
return size;
}
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 798f78a..0cb8d49 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -81,7 +81,8 @@ public class MetadataSchema {
* README : very important that prevRow sort last to avoid race conditions between garbage
* collector and split this needs to sort after everything else for that tablet
*/
- public static final ColumnFQ PREV_ROW_COLUMN = new ColumnFQ(NAME, new Text("~pr"));
+ public static final String PREV_ROW_QUAL = "~pr";
+ public static final ColumnFQ PREV_ROW_COLUMN = new ColumnFQ(NAME, new Text(PREV_ROW_QUAL));
/**
* A temporary field in case a split fails and we need to roll back
*/
@@ -101,24 +102,29 @@ public class MetadataSchema {
/**
* Holds the location of the tablet in the DFS file system
*/
- public static final ColumnFQ DIRECTORY_COLUMN = new ColumnFQ(NAME, new Text("dir"));
+ public static final String DIRECTORY_QUAL = "dir";
+ public static final ColumnFQ DIRECTORY_COLUMN = new ColumnFQ(NAME, new Text(DIRECTORY_QUAL));
/**
* Holds the {@link TimeType}
*/
- public static final ColumnFQ TIME_COLUMN = new ColumnFQ(NAME, new Text("time"));
+ public static final String TIME_QUAL = "time";
+ public static final ColumnFQ TIME_COLUMN = new ColumnFQ(NAME, new Text(TIME_QUAL));
/**
* Holds flush IDs to enable waiting on a flush to complete
*/
- public static final ColumnFQ FLUSH_COLUMN = new ColumnFQ(NAME, new Text("flush"));
+ public static final String FLUSH_QUAL = "flush";
+ public static final ColumnFQ FLUSH_COLUMN = new ColumnFQ(NAME, new Text(FLUSH_QUAL));
/**
* Holds compact IDs to enable waiting on a compaction to complete
*/
- public static final ColumnFQ COMPACT_COLUMN = new ColumnFQ(NAME, new Text("compact"));
+ public static final String COMPACT_QUAL = "compact";
+ public static final ColumnFQ COMPACT_COLUMN = new ColumnFQ(NAME, new Text(COMPACT_QUAL));
/**
* Holds lock IDs to enable a sanity check to ensure that the TServer writing to the metadata
* tablet is not dead
*/
- public static final ColumnFQ LOCK_COLUMN = new ColumnFQ(NAME, new Text("lock"));
+ public static final String LOCK_QUAL = "lock";
+ public static final ColumnFQ LOCK_COLUMN = new ColumnFQ(NAME, new Text(LOCK_QUAL));
}
/**
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
index 6bae677..92f7720 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
@@ -17,15 +17,20 @@
package org.apache.accumulo.core.metadata.schema;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_QUAL;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_QUAL;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_QUAL;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_QUAL;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_QUAL;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedMap;
import java.util.function.Function;
@@ -44,9 +49,11 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Cu
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.hadoop.io.Text;
@@ -54,6 +61,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Iterators;
@@ -65,7 +73,7 @@ public class TabletMetadata {
private boolean sawPrevEndRow = false;
private Text endRow;
private Location location;
- private List<String> files;
+ private Map<String,DataFileValue> files;
private List<String> scans;
private Set<String> loadedFiles;
private EnumSet<FetchedColumns> fetchedCols;
@@ -75,13 +83,16 @@ public class TabletMetadata {
private String time;
private String cloned;
private SortedMap<Key,Value> keyValues;
+ private OptionalLong flush = OptionalLong.empty();
+ private List<LogEntry> logs;
+ private OptionalLong compact = OptionalLong.empty();
public static enum LocationType {
CURRENT, FUTURE, LAST
}
public static enum FetchedColumns {
- LOCATION, PREV_ROW, FILES, LAST, LOADED, SCANS, DIR, TIME, CLONED
+ LOCATION, PREV_ROW, FILES, LAST, LOADED, SCANS, DIR, TIME, CLONED, FLUSH_ID, LOGS, COMPACT_ID
}
public static class Location {
@@ -103,9 +114,21 @@ public class TabletMetadata {
return session;
}
- public LocationType getLocationType() {
+ public LocationType getType() {
return lt;
}
+
+ @Override
+ public String toString() {
+ return server + ":" + session + ":" + lt;
+ }
+
+ /**
+ * @return an ID composed of host, port, and session id.
+ */
+ public String getId() {
+ return server + "[" + session + "]";
+ }
}
public Table.ID getTableId() {
@@ -125,6 +148,9 @@ public class TabletMetadata {
public Text getPrevEndRow() {
ensureFetched(FetchedColumns.PREV_ROW);
+ if (!sawPrevEndRow)
+ throw new IllegalStateException(
+ "No prev endrow seen. tableId: " + tableId + " endrow: " + endRow);
return prevEndRow;
}
@@ -142,6 +168,11 @@ public class TabletMetadata {
return location;
}
+ public boolean hasCurrent() {
+ ensureFetched(FetchedColumns.LOCATION);
+ return location != null && location.getType() == LocationType.CURRENT;
+ }
+
public Set<String> getLoaded() {
ensureFetched(FetchedColumns.LOADED);
return loadedFiles;
@@ -152,11 +183,21 @@ public class TabletMetadata {
return last;
}
- public List<String> getFiles() {
+ public Collection<String> getFiles() {
+ ensureFetched(FetchedColumns.FILES);
+ return files.keySet();
+ }
+
+ public Map<String,DataFileValue> getFilesMap() {
ensureFetched(FetchedColumns.FILES);
return files;
}
+ public Collection<LogEntry> getLogs() {
+ ensureFetched(FetchedColumns.LOGS);
+ return logs;
+ }
+
public List<String> getScans() {
ensureFetched(FetchedColumns.SCANS);
return scans;
@@ -177,6 +218,16 @@ public class TabletMetadata {
return cloned;
}
+ public OptionalLong getFlushId() {
+ ensureFetched(FetchedColumns.FLUSH_ID);
+ return flush;
+ }
+
+ public OptionalLong getCompactId() {
+ ensureFetched(FetchedColumns.COMPACT_ID);
+ return compact;
+ }
+
public SortedMap<Key,Value> getKeyValues() {
Preconditions.checkState(keyValues != null, "Requested key values when it was not saved");
return keyValues;
@@ -192,76 +243,79 @@ public class TabletMetadata {
kvBuilder = ImmutableSortedMap.naturalOrder();
}
- Builder<String> filesBuilder = ImmutableList.builder();
+ ImmutableMap.Builder<String,DataFileValue> filesBuilder = ImmutableMap.builder();
Builder<String> scansBuilder = ImmutableList.builder();
+ Builder<LogEntry> logsBuilder = ImmutableList.builder();
final ImmutableSet.Builder<String> loadedFilesBuilder = ImmutableSet.builder();
ByteSequence row = null;
while (rowIter.hasNext()) {
Entry<Key,Value> kv = rowIter.next();
- Key k = kv.getKey();
- Value v = kv.getValue();
- Text fam = k.getColumnFamily();
+ Key key = kv.getKey();
+ String val = kv.getValue().toString();
+ String fam = key.getColumnFamilyData().toString();
+ String qual = key.getColumnQualifierData().toString();
if (buildKeyValueMap) {
- kvBuilder.put(k, v);
+ kvBuilder.put(key, kv.getValue());
}
if (row == null) {
- row = k.getRowData();
- KeyExtent ke = new KeyExtent(k.getRow(), (Text) null);
+ row = key.getRowData();
+ KeyExtent ke = new KeyExtent(key.getRow(), (Text) null);
te.endRow = ke.getEndRow();
te.tableId = ke.getTableId();
- } else if (!row.equals(k.getRowData())) {
+ } else if (!row.equals(key.getRowData())) {
throw new IllegalArgumentException(
- "Input contains more than one row : " + row + " " + k.getRowData());
+ "Input contains more than one row : " + row + " " + key.getRowData());
}
switch (fam.toString()) {
case TabletColumnFamily.STR_NAME:
- if (PREV_ROW_COLUMN.hasColumns(k)) {
- te.prevEndRow = KeyExtent.decodePrevEndRow(v);
+ if (PREV_ROW_QUAL.equals(qual)) {
+ te.prevEndRow = KeyExtent.decodePrevEndRow(kv.getValue());
te.sawPrevEndRow = true;
}
break;
case ServerColumnFamily.STR_NAME:
- if (DIRECTORY_COLUMN.hasColumns(k)) {
- te.dir = v.toString();
- } else if (TIME_COLUMN.hasColumns(k)) {
- te.time = v.toString();
+ switch (qual) {
+ case DIRECTORY_QUAL:
+ te.dir = val;
+ break;
+ case TIME_QUAL:
+ te.time = val;
+ break;
+ case FLUSH_QUAL:
+ te.flush = OptionalLong.of(Long.parseLong(val));
+ break;
+ case COMPACT_QUAL:
+ te.compact = OptionalLong.of(Long.parseLong(val));
+ break;
}
break;
case DataFileColumnFamily.STR_NAME:
- filesBuilder.add(k.getColumnQualifier().toString());
+ filesBuilder.put(qual, new DataFileValue(val));
break;
case BulkFileColumnFamily.STR_NAME:
- loadedFilesBuilder.add(k.getColumnQualifier().toString());
+ loadedFilesBuilder.add(qual);
break;
case CurrentLocationColumnFamily.STR_NAME:
- if (te.location != null) {
- throw new IllegalArgumentException(
- "Input contains more than one location " + te.location + " " + v);
- }
- te.location = new Location(v.toString(), k.getColumnQualifierData().toString(),
- LocationType.CURRENT);
+ te.setLocationOnce(val, qual, LocationType.CURRENT);
break;
case FutureLocationColumnFamily.STR_NAME:
- if (te.location != null) {
- throw new IllegalArgumentException(
- "Input contains more than one location " + te.location + " " + v);
- }
- te.location = new Location(v.toString(), k.getColumnQualifierData().toString(),
- LocationType.FUTURE);
+ te.setLocationOnce(val, qual, LocationType.FUTURE);
break;
case LastLocationColumnFamily.STR_NAME:
- te.last = new Location(v.toString(), k.getColumnQualifierData().toString(),
- LocationType.LAST);
+ te.last = new Location(val, qual, LocationType.LAST);
break;
case ScanFileColumnFamily.STR_NAME:
- scansBuilder.add(k.getColumnQualifierData().toString());
+ scansBuilder.add(qual);
break;
case ClonedColumnFamily.STR_NAME:
- te.cloned = v.toString();
+ te.cloned = val;
+ break;
+ case LogColumnFamily.STR_NAME:
+ logsBuilder.add(LogEntry.fromKeyValue(key, val));
break;
default:
throw new IllegalStateException("Unexpected family " + fam);
@@ -272,12 +326,20 @@ public class TabletMetadata {
te.loadedFiles = loadedFilesBuilder.build();
te.fetchedCols = fetchedColumns;
te.scans = scansBuilder.build();
+ te.logs = logsBuilder.build();
if (buildKeyValueMap) {
te.keyValues = kvBuilder.build();
}
return te;
}
+ private void setLocationOnce(String val, String qual, LocationType lt) {
+ if (location != null)
+ throw new IllegalStateException("Attempted to set second location for tableId: " + tableId
+ + " endrow: " + endRow + " -- " + location + " " + qual + " " + val);
+ location = new Location(val, qual, lt);
+ }
+
static Iterable<TabletMetadata> convert(Scanner input, EnumSet<FetchedColumns> fetchedColumns,
boolean checkConsistency, boolean buildKeyValueMap) {
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
similarity index 61%
rename from core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java
rename to core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index 6434de0..1ae57a7 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -17,7 +17,9 @@
package org.apache.accumulo.core.metadata.schema;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
@@ -30,8 +32,6 @@ import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -49,111 +49,101 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Cu
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.FetchedColumns;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.hadoop.io.Text;
-public class MetadataScanner implements Iterable<TabletMetadata>, AutoCloseable {
+import com.google.common.base.Preconditions;
- public interface SourceOptions {
- TableOptions from(ClientContext ctx);
-
- TableOptions from(AccumuloClient client);
- }
-
- public interface TableOptions {
- RangeOptions scanRootTable();
-
- RangeOptions scanMetadataTable();
-
- RangeOptions scanTable(String tableName);
- }
-
- public interface RangeOptions {
- Options overTabletRange();
-
- Options overRange(Range range);
-
- Options overRange(Table.ID tableId);
-
- Options overRange(Table.ID tableId, Text startRow, Text endRow);
- }
-
- public interface Options {
- /**
- * Checks that the metadata table forms a linked list and automatically backs up until it does.
- */
- Options checkConsistency();
-
- /**
- * Saves the key values seen in the metadata table for each tablet.
- */
- Options saveKeyValues();
-
- Options fetchFiles();
-
- Options fetchLoaded();
-
- Options fetchLocation();
-
- Options fetchPrev();
-
- Options fetchLast();
-
- Options fetchScans();
-
- Options fetchDir();
-
- Options fetchTime();
-
- Options fetchCloned();
-
- MetadataScanner build()
- throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
- }
+/**
+ * An abstraction layer for reading tablet metadata from the accumulo.metadata and accumulo.root
+ * tables.
+ */
+public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable {
- private static class TabletMetadataIterator implements Iterator<TabletMetadata> {
+ private static class Builder implements TableRangeOptions, TableOptions, RangeOptions, Options {
- private boolean sawLast = false;
- private Iterator<TabletMetadata> iter;
+ private List<Text> families = new ArrayList<>();
+ private List<ColumnFQ> qualifiers = new ArrayList<>();
+ private String table = MetadataTable.NAME;
+ private Range range;
+ private EnumSet<FetchedColumns> fetchedCols = EnumSet.noneOf(FetchedColumns.class);
private Text endRow;
+ private boolean checkConsistency = false;
+ private boolean saveKeyValues;
+ private ID tableId;
- TabletMetadataIterator(Iterator<TabletMetadata> source, Text endRow) {
- this.iter = source;
- this.endRow = endRow;
+ @Override
+ public TabletsMetadata build(ClientContext ctx) {
+ return build(ctx.getClient());
}
@Override
- public boolean hasNext() {
- return !sawLast && iter.hasNext();
+ public TabletsMetadata build(AccumuloClient client) {
+ try {
+ Scanner scanner = new IsolatedScanner(client.createScanner(table, Authorizations.EMPTY));
+ scanner.setRange(range);
+
+ if (checkConsistency && !fetchedCols.contains(FetchedColumns.PREV_ROW)) {
+ fetchPrev();
+ }
+
+ for (Text fam : families) {
+ scanner.fetchColumnFamily(fam);
+ }
+
+ for (ColumnFQ col : qualifiers) {
+ col.fetch(scanner);
+ }
+
+ if (families.size() == 0 && qualifiers.size() == 0) {
+ fetchedCols = EnumSet.allOf(FetchedColumns.class);
+ }
+
+ Iterable<TabletMetadata> tmi = TabletMetadata.convert(scanner, fetchedCols,
+ checkConsistency, saveKeyValues);
+
+ if (endRow != null) {
+ // create an iterable that will stop at the tablet which contains the endRow
+ return new TabletsMetadata(scanner,
+ () -> new TabletMetadataIterator(tmi.iterator(), endRow));
+ } else {
+ return new TabletsMetadata(scanner, tmi);
+ }
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
- public TabletMetadata next() {
- if (sawLast) {
- throw new NoSuchElementException();
- }
- TabletMetadata next = iter.next();
- if (next.getExtent().contains(endRow)) {
- sawLast = true;
- }
- return next;
+ public Options checkConsistency() {
+ this.checkConsistency = true;
+ return this;
}
- }
- private static class Builder implements SourceOptions, TableOptions, RangeOptions, Options {
+ @Override
+ public Options fetchCloned() {
+ fetchedCols.add(FetchedColumns.CLONED);
+ families.add(ClonedColumnFamily.NAME);
+ return this;
+ }
- private List<Text> families = new ArrayList<>();
- private List<ColumnFQ> qualifiers = new ArrayList<>();
- private AccumuloClient client;
- private String table = MetadataTable.NAME;
- private Range range;
- private EnumSet<FetchedColumns> fetchedCols = EnumSet.noneOf(FetchedColumns.class);
- private Text endRow;
- private boolean checkConsistency = false;
- private boolean saveKeyValues;
+ @Override
+ public Options fetchCompactId() {
+ fetchedCols.add(FetchedColumns.COMPACT_ID);
+ qualifiers.add(COMPACT_COLUMN);
+ return this;
+ }
+
+ @Override
+ public Options fetchDir() {
+ fetchedCols.add(FetchedColumns.DIR);
+ qualifiers.add(DIRECTORY_COLUMN);
+ return this;
+ }
@Override
public Options fetchFiles() {
@@ -163,9 +153,16 @@ public class MetadataScanner implements Iterable<TabletMetadata>, AutoCloseable
}
@Override
- public Options fetchScans() {
- fetchedCols.add(FetchedColumns.SCANS);
- families.add(ScanFileColumnFamily.NAME);
+ public Options fetchFlushId() {
+ fetchedCols.add(FetchedColumns.FLUSH_ID);
+ qualifiers.add(FLUSH_COLUMN);
+ return this;
+ }
+
+ @Override
+ public Options fetchLast() {
+ fetchedCols.add(FetchedColumns.LAST);
+ families.add(LastLocationColumnFamily.NAME);
return this;
}
@@ -185,23 +182,23 @@ public class MetadataScanner implements Iterable<TabletMetadata>, AutoCloseable
}
@Override
- public Options fetchPrev() {
- fetchedCols.add(FetchedColumns.PREV_ROW);
- qualifiers.add(PREV_ROW_COLUMN);
+ public Options fetchLogs() {
+ fetchedCols.add(FetchedColumns.LOGS);
+ families.add(LogColumnFamily.NAME);
return this;
}
@Override
- public Options fetchDir() {
- fetchedCols.add(FetchedColumns.DIR);
- qualifiers.add(DIRECTORY_COLUMN);
+ public Options fetchPrev() {
+ fetchedCols.add(FetchedColumns.PREV_ROW);
+ qualifiers.add(PREV_ROW_COLUMN);
return this;
}
@Override
- public Options fetchLast() {
- fetchedCols.add(FetchedColumns.LAST);
- families.add(LastLocationColumnFamily.NAME);
+ public Options fetchScans() {
+ fetchedCols.add(FetchedColumns.SCANS);
+ families.add(ScanFileColumnFamily.NAME);
return this;
}
@@ -213,62 +210,38 @@ public class MetadataScanner implements Iterable<TabletMetadata>, AutoCloseable
}
@Override
- public Options fetchCloned() {
- fetchedCols.add(FetchedColumns.CLONED);
- families.add(ClonedColumnFamily.NAME);
- return this;
- }
-
- @Override
- public MetadataScanner build()
- throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-
- Scanner scanner = new IsolatedScanner(client.createScanner(table, Authorizations.EMPTY));
- scanner.setRange(range);
-
- if (checkConsistency && !fetchedCols.contains(FetchedColumns.PREV_ROW)) {
- fetchPrev();
- }
-
- for (Text fam : families) {
- scanner.fetchColumnFamily(fam);
- }
-
- for (ColumnFQ col : qualifiers) {
- col.fetch(scanner);
- }
-
- if (families.size() == 0 && qualifiers.size() == 0) {
- fetchedCols = EnumSet.allOf(FetchedColumns.class);
+ public TableRangeOptions forTable(Table.ID tableId) {
+ Preconditions.checkArgument(!tableId.equals(RootTable.ID),
+ "Getting tablet metadata for " + RootTable.NAME + " not supported at this time.");
+ if (tableId.equals(MetadataTable.ID)) {
+ this.table = RootTable.NAME;
+ } else {
+ this.table = MetadataTable.NAME;
}
- Iterable<TabletMetadata> tmi = TabletMetadata.convert(scanner, fetchedCols, checkConsistency,
- saveKeyValues);
+ this.tableId = tableId;
+ this.range = TabletsSection.getRange(tableId);
- if (endRow != null) {
- // create an iterable that will stop at the tablet which contains the endRow
- return new MetadataScanner(scanner,
- () -> new TabletMetadataIterator(tmi.iterator(), endRow));
- } else {
- return new MetadataScanner(scanner, tmi);
- }
+ return this;
}
@Override
- public TableOptions from(ClientContext ctx) {
- this.client = ctx.getClient();
+ public Options forTablet(KeyExtent extent) {
+ forTable(extent.getTableId());
+ this.range = new Range(extent.getMetadataEntry());
return this;
}
@Override
- public TableOptions from(AccumuloClient client) {
- this.client = client;
+ public Options overRange(Range range) {
+ this.range = TabletsSection.getRange().clip(range);
return this;
}
@Override
- public Options checkConsistency() {
- this.checkConsistency = true;
+ public Options overlapping(Text startRow, Text endRow) {
+ this.range = new KeyExtent(tableId, null, startRow).toMetadataRange();
+ this.endRow = endRow;
return this;
}
@@ -279,57 +252,153 @@ public class MetadataScanner implements Iterable<TabletMetadata>, AutoCloseable
}
@Override
- public Options overTabletRange() {
+ public RangeOptions scanTable(String tableName) {
+ this.table = tableName;
this.range = TabletsSection.getRange();
return this;
}
+ }
- @Override
- public Options overRange(Range range) {
- this.range = range;
- return this;
+ public interface Options {
+ TabletsMetadata build(AccumuloClient client);
+
+ TabletsMetadata build(ClientContext ctx);
+
+ /**
+ * Checks that the metadata table forms a linked list and automatically backs up until it does.
+ */
+ Options checkConsistency();
+
+ Options fetchCloned();
+
+ Options fetchCompactId();
+
+ Options fetchDir();
+
+ Options fetchFiles();
+
+ Options fetchFlushId();
+
+ Options fetchLast();
+
+ Options fetchLoaded();
+
+ Options fetchLocation();
+
+ Options fetchLogs();
+
+ Options fetchPrev();
+
+ Options fetchScans();
+
+ Options fetchTime();
+
+ /**
+ * Saves the key values seen in the metadata table for each tablet.
+ */
+ Options saveKeyValues();
+ }
+
+ public interface RangeOptions extends Options {
+ Options overRange(Range range);
+ }
+
+ public interface TableOptions {
+
+ /**
+ * Get the tablet metadata for this extents end row. This should only ever return a single
+ * tablet. No checking is done for prev row, so it could differ.
+ */
+ Options forTablet(KeyExtent extent);
+
+ /**
+ * This method automatically determines where the metadata for the passed in table ID resides.
+ * For example if a user tablet ID is passed in, then the metadata table is scanned. If the
+ * metadata table ID is passed in then the root table is scanned. Defaults to returning all
+ * tablets for the table ID.
+ */
+ TableRangeOptions forTable(Table.ID tableId);
+
+ /**
+ * Obtain tablet metadata by scanning the metadata table. Defaults to the range
+ * {@link TabletsSection#getRange()}
+ */
+ default RangeOptions scanMetadataTable() {
+ return scanTable(MetadataTable.NAME);
}
- @Override
- public Options overRange(ID tableId) {
- this.range = TabletsSection.getRange(tableId);
- return this;
+ /**
+ * Obtain tablet metadata by scanning an arbitrary table. Defaults to the range
+ * {@link TabletsSection#getRange()}
+ */
+ RangeOptions scanTable(String tableName);
+ }
+
+ public interface TableRangeOptions extends Options {
+
+ /**
+ * @see #overlapping(Text, Text)
+ */
+ default Options overlapping(byte[] startRow, byte[] endRow) {
+ return overlapping(startRow == null ? null : new Text(startRow),
+ endRow == null ? null : new Text(endRow));
}
- @Override
- public Options overRange(ID tableId, Text startRow, Text endRow) {
- this.range = new KeyExtent(tableId, null, startRow).toMetadataRange();
+ /**
+ * Limit to tablets that overlap the range {@code (startRow, endRow]}. Can pass null
+ * representing -inf and +inf. The impl creates open ended ranges which may be problematic, see
+ * #813.
+ */
+ Options overlapping(Text startRow, Text endRow);
+ }
+
+ private static class TabletMetadataIterator implements Iterator<TabletMetadata> {
+
+ private boolean sawLast = false;
+ private Iterator<TabletMetadata> iter;
+ private Text endRow;
+
+ TabletMetadataIterator(Iterator<TabletMetadata> source, Text endRow) {
+ this.iter = source;
this.endRow = endRow;
- return this;
}
@Override
- public RangeOptions scanTable(String tableName) {
- this.table = tableName;
- return this;
+ public boolean hasNext() {
+ return !sawLast && iter.hasNext();
}
@Override
- public RangeOptions scanMetadataTable() {
- return scanTable(MetadataTable.NAME);
+ public TabletMetadata next() {
+ if (sawLast) {
+ throw new NoSuchElementException();
+ }
+ TabletMetadata next = iter.next();
+ // impossible to construct a range that stops at the first tablet that contains endRow. That
+ // is why this specialized code exists.
+ if (next.getExtent().contains(endRow)) {
+ sawLast = true;
+ }
+ return next;
}
+ }
- @Override
- public RangeOptions scanRootTable() {
- return scanTable(RootTable.NAME);
- }
+ public static TableOptions builder() {
+ return new Builder();
}
private Scanner scanner;
+
private Iterable<TabletMetadata> tablets;
- private MetadataScanner(Scanner scanner, Iterable<TabletMetadata> tmi) {
+ private TabletsMetadata(Scanner scanner, Iterable<TabletMetadata> tmi) {
this.scanner = scanner;
this.tablets = tmi;
}
- public static SourceOptions builder() {
- return new Builder();
+ @Override
+ public void close() {
+ scanner.close();
}
@Override
@@ -340,9 +409,4 @@ public class MetadataScanner implements Iterable<TabletMetadata>, AutoCloseable
public Stream<TabletMetadata> stream() {
return StreamSupport.stream(tablets.spliterator(), false);
}
-
- @Override
- public void close() {
- scanner.close();
- }
}
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
index 1a4a1e2..b6e6c45 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -56,8 +56,8 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
-import org.apache.accumulo.core.metadata.schema.MetadataScanner;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.crypto.CryptoService;
@@ -166,9 +166,9 @@ public class Gatherer {
Predicate<String> fileSelector)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
- Iterable<TabletMetadata> tmi = MetadataScanner.builder().from(ctx).scanMetadataTable()
- .overRange(tableId, startRow, endRow).fetchFiles().fetchLocation().fetchLast().fetchPrev()
- .build();
+ Iterable<TabletMetadata> tmi = TabletsMetadata.builder().forTable(tableId)
+ .overlapping(startRow, endRow).fetchFiles().fetchLocation().fetchLast().fetchPrev()
+ .build(ctx);
// get a subset of files
Map<String,List<TabletMetadata>> files = new HashMap<>();
@@ -525,9 +525,8 @@ public class Gatherer {
private int countFiles()
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
// TODO use a batch scanner + iterator to parallelize counting files
- return MetadataScanner.builder().from(ctx).scanMetadataTable()
- .overRange(tableId, startRow, endRow).fetchFiles().fetchPrev().build().stream()
- .mapToInt(tm -> tm.getFiles().size()).sum();
+ return TabletsMetadata.builder().forTable(tableId).overlapping(startRow, endRow).fetchFiles()
+ .fetchPrev().build(ctx).stream().mapToInt(tm -> tm.getFiles().size()).sum();
}
private class GatherRequest implements Supplier<SummaryCollection> {
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
index fd7711b..7651788 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
@@ -80,21 +80,25 @@ public class LogEntry {
private static final Text EMPTY_TEXT = new Text();
- public static LogEntry fromKeyValue(Key key, Value value) {
- String qualifier = key.getColumnQualifier().toString();
+ public static LogEntry fromKeyValue(Key key, String value) {
+ String qualifier = key.getColumnQualifierData().toString();
if (qualifier.indexOf('/') < 1) {
throw new IllegalArgumentException("Bad key for log entry: " + key);
}
KeyExtent extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
- String[] parts = key.getColumnQualifier().toString().split("/", 2);
+ String[] parts = qualifier.split("/", 2);
String server = parts[0];
// handle old-style log entries that specify log sets
- parts = value.toString().split("\\|")[0].split(";");
+ parts = value.split("\\|")[0].split(";");
String filename = parts[parts.length - 1];
long timestamp = key.getTimestamp();
return new LogEntry(extent, timestamp, server, filename);
}
+ public static LogEntry fromKeyValue(Key key, Value value) {
+ return fromKeyValue(key, value.toString());
+ }
+
public Text getRow() {
return extent.getMetadataEntry();
}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Merge.java b/core/src/main/java/org/apache/accumulo/core/util/Merge.java
index 64400df..dbbc8c3 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/Merge.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/Merge.java
@@ -19,25 +19,19 @@ package org.apache.accumulo.core.util;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Map.Entry;
import org.apache.accumulo.core.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.Table;
import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -218,55 +212,23 @@ public class Merge {
protected Iterator<Size> getSizeIterator(AccumuloClient client, String tablename, Text start,
Text end) throws MergeException {
// open up metadata, walk through the tablets.
+
Table.ID tableId;
- Scanner scanner;
+ TabletsMetadata tablets;
try {
ClientContext context = new ClientContext(client);
tableId = Tables.getTableId(context, tablename);
- scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ tablets = TabletsMetadata.builder().scanMetadataTable()
+ .overRange(new KeyExtent(tableId, end, start).toMetadataRange()).fetchFiles().fetchPrev()
+ .build(context);
} catch (Exception e) {
throw new MergeException(e);
}
- scanner.setRange(new KeyExtent(tableId, end, start).toMetadataRange());
- scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
- TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
- final Iterator<Entry<Key,Value>> iterator = scanner.iterator();
-
- return new Iterator<Size>() {
- Size next = fetch();
-
- @Override
- public boolean hasNext() {
- return next != null;
- }
-
- private Size fetch() {
- long tabletSize = 0;
- while (iterator.hasNext()) {
- Entry<Key,Value> entry = iterator.next();
- Key key = entry.getKey();
- if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
- tabletSize += new DataFileValue(entry.getValue().get()).getSize();
- } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
- KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue());
- return new Size(extent, tabletSize);
- }
- }
- return null;
- }
- @Override
- public Size next() {
- Size result1 = next;
- next = fetch();
- return result1;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
+ return tablets.stream().map(tm -> {
+ long size = tm.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum();
+ return new Size(tm.getExtent(), size);
+ }).iterator();
}
}
diff --git a/core/src/test/java/org/apache/accumulo/core/clientImpl/TableOperationsImplTest.java b/core/src/test/java/org/apache/accumulo/core/clientImpl/TableOperationsImplTest.java
deleted file mode 100644
index 84d6335..0000000
--- a/core/src/test/java/org/apache/accumulo/core/clientImpl/TableOperationsImplTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.clientImpl;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.security.Authorizations;
-import org.easymock.EasyMock;
-import org.junit.Test;
-
-public class TableOperationsImplTest {
-
- @Test
- public void waitForStoreTransitionScannerConfiguredCorrectly() throws Exception {
- final String tableName = "metadata";
- ClientContext context = EasyMock.createMock(ClientContext.class);
-
- TableOperationsImpl topsImpl = new TableOperationsImpl(context);
-
- AccumuloClient accumuloClient = EasyMock.createMock(AccumuloClient.class);
- Scanner scanner = EasyMock.createMock(Scanner.class);
-
- Range range = new KeyExtent(Table.ID.of("1"), null, null).toMetadataRange();
-
- EasyMock.expect(context.getClient()).andReturn(accumuloClient);
- EasyMock.expect(accumuloClient.createScanner(tableName, Authorizations.EMPTY))
- .andReturn(scanner);
-
- // Fetch the columns on the scanner
- scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME);
- EasyMock.expectLastCall();
- scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
- EasyMock.expectLastCall();
- scanner.fetchColumn(
- MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnFamily(),
- MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnQualifier());
- EasyMock.expectLastCall();
-
- // Set the Range
- scanner.setRange(range);
- EasyMock.expectLastCall();
-
- // IsolatedScanner -- make the verification pass, not really relevant
- EasyMock.expect(scanner.getRange()).andReturn(range).anyTimes();
- EasyMock.expect(scanner.getTimeout(TimeUnit.MILLISECONDS)).andReturn(Long.MAX_VALUE);
- EasyMock.expect(scanner.getBatchTimeout(TimeUnit.MILLISECONDS)).andReturn(Long.MAX_VALUE);
- EasyMock.expect(scanner.getBatchSize()).andReturn(1000);
- EasyMock.expect(scanner.getReadaheadThreshold()).andReturn(100L);
-
- EasyMock.replay(context, accumuloClient, scanner);
-
- topsImpl.createMetadataScanner(tableName, range);
-
- EasyMock.verify(context, accumuloClient, scanner);
- }
-
-}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
index bbfb224..533fbb9 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
@@ -33,17 +33,10 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.function.Function;
-import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.RowIterator;
-import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.clientImpl.Table;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.util.ComparablePair;
import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.core.util.Pair;
@@ -84,7 +77,20 @@ public abstract class GroupBalancer extends TabletBalancer {
}
protected Iterable<Pair<KeyExtent,Location>> getLocationProvider() {
- return new MetadataLocationProvider();
+ return () -> {
+ try {
+ return TabletsMetadata.builder().forTable(tableId).fetchLocation().fetchPrev()
+ .build(context).stream().map(tm -> {
+ Location loc = Location.NONE;
+ if (tm.hasCurrent()) {
+ loc = new Location(new TServerInstance(tm.getLocation()));
+ }
+ return new Pair<>(tm.getExtent(), loc);
+ }).iterator();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
}
/**
@@ -770,48 +776,4 @@ public abstract class GroupBalancer extends TabletBalancer {
}
}
}
-
- static class LocationFunction
- implements Function<Iterator<Entry<Key,Value>>,Pair<KeyExtent,Location>> {
- @Override
- public Pair<KeyExtent,Location> apply(Iterator<Entry<Key,Value>> input) {
- Location loc = Location.NONE;
- KeyExtent extent = null;
- while (input.hasNext()) {
- Entry<Key,Value> entry = input.next();
- if (entry.getKey().getColumnFamily()
- .equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)) {
- loc = new Location(
- new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier()));
- } else if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN
- .hasColumns(entry.getKey())) {
- extent = new KeyExtent(entry.getKey().getRow(), entry.getValue());
- }
- }
-
- return new Pair<>(extent, loc);
- }
-
- }
-
- class MetadataLocationProvider implements Iterable<Pair<KeyExtent,Location>> {
-
- @Override
- public Iterator<Pair<KeyExtent,Location>> iterator() {
- try {
- Scanner scanner = new IsolatedScanner(
- context.getClient().createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
- MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
- scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId));
-
- RowIterator rowIter = new RowIterator(scanner);
-
- Function<Iterator<Entry<Key,Value>>,Pair<KeyExtent,Location>> f = new LocationFunction();
- return Iterators.transform(rowIter, x -> f.apply(x));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
index 82311dd..45f13d3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.hadoop.io.Text;
@@ -73,6 +74,10 @@ public class TServerInstance implements Comparable<TServerInstance>, Serializabl
this(AddressUtil.parseAddress(new String(address.get(), UTF_8), false), session.toString());
}
+ public TServerInstance(Location location) {
+ this(location.getHostAndPort(), location.getSession());
+ }
+
public void putLocation(Mutation m) {
m.put(TabletsSection.CurrentLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index f816f0c..aa06ff7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -61,7 +61,6 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.metadata.schema.MetadataScanner;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
@@ -72,6 +71,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Sc
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletDeletedException;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
@@ -97,6 +97,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
/**
* provides a reference to the metadata table for updates by tablet servers
@@ -551,27 +552,19 @@ public class MetadataTableUtil {
}
} else {
- Table.ID systemTableToCheck = extent.isMeta() ? RootTable.ID : MetadataTable.ID;
- try (Scanner scanner = new ScannerImpl(context, systemTableToCheck, Authorizations.EMPTY)) {
- scanner.fetchColumnFamily(LogColumnFamily.NAME);
- scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
- scanner.setRange(extent.toMetadataRange());
+ try (TabletsMetadata tablets = TabletsMetadata.builder().forTablet(extent).fetchFiles()
+ .fetchLogs().fetchPrev().build(context)) {
- for (Entry<Key,Value> entry : scanner) {
- if (!entry.getKey().getRow().equals(extent.getMetadataEntry())) {
- throw new RuntimeException("Unexpected row " + entry.getKey().getRow() + " expected "
- + extent.getMetadataEntry());
- }
+ TabletMetadata tablet = Iterables.getOnlyElement(tablets);
- if (entry.getKey().getColumnFamily().equals(LogColumnFamily.NAME)) {
- result.add(LogEntry.fromKeyValue(entry.getKey(), entry.getValue()));
- } else if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
- DataFileValue dfv = new DataFileValue(entry.getValue().get());
- sizes.put(new FileRef(fs, entry.getKey()), dfv);
- } else {
- throw new RuntimeException("Unexpected col fam " + entry.getKey().getColumnFamily());
- }
- }
+ if (!tablet.getExtent().equals(extent))
+ throw new RuntimeException(
+ "Unexpected extent " + tablet.getExtent() + " expected " + extent);
+
+ result.addAll(tablet.getLogs());
+ tablet.getFilesMap().forEach((k, v) -> {
+ sizes.put(new FileRef(k, fs.getFullPath(tablet.getTableId(), k)), v);
+ });
}
}
@@ -713,7 +706,8 @@ public class MetadataTableUtil {
}
}
- private static void getFiles(Set<String> files, List<String> tabletFiles, Table.ID srcTableId) {
+ private static void getFiles(Set<String> files, Collection<String> tabletFiles,
+ Table.ID srcTableId) {
for (String file : tabletFiles) {
if (srcTableId != null && !file.startsWith("../") && !file.contains(":")) {
file = "../" + srcTableId + file;
@@ -766,13 +760,9 @@ public class MetadataTableUtil {
range = TabletsSection.getRange(tableId);
}
- try {
- return MetadataScanner.builder().from(client).scanTable(tableName).overRange(range)
- .checkConsistency().saveKeyValues().fetchFiles().fetchLocation().fetchLast().fetchCloned()
- .fetchPrev().fetchTime().build();
- } catch (AccumuloException | AccumuloSecurityException e) {
- throw new RuntimeException(e);
- }
+ return TabletsMetadata.builder().scanTable(tableName).overRange(range).checkConsistency()
+ .saveKeyValues().fetchFiles().fetchLocation().fetchLast().fetchCloned().fetchPrev()
+ .fetchTime().build(client);
}
@VisibleForTesting
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index d6d49b5..1aac7a2 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -58,9 +58,9 @@ import org.apache.accumulo.core.gc.thrift.GcCycleStats;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataScanner;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
@@ -280,9 +280,8 @@ public class SimpleGarbageCollector implements Iface {
public Stream<Reference> getReferences()
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
- Stream<TabletMetadata> tabletStream = MetadataScanner.builder().from(getClient())
- .scanTable(tableName).overTabletRange().checkConsistency().fetchDir().fetchFiles()
- .fetchScans().build().stream();
+ Stream<TabletMetadata> tabletStream = TabletsMetadata.builder().scanTable(tableName)
+ .checkConsistency().fetchDir().fetchFiles().fetchScans().build(getClient()).stream();
Stream<Reference> refStream = tabletStream.flatMap(tm -> {
Stream<Reference> refs = Stream.concat(tm.getFiles().stream(), tm.getScans().stream())
diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
index 9df563c..16b8b8e 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
@@ -33,9 +32,6 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.RowIterator;
-import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
import org.apache.accumulo.core.clientImpl.AuthenticationTokenIdentifier;
@@ -64,11 +60,10 @@ import org.apache.accumulo.core.master.thrift.TabletLoadState;
import org.apache.accumulo.core.master.thrift.TabletSplit;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletDeletedException;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
import org.apache.accumulo.core.replication.ReplicationTable;
@@ -147,15 +142,17 @@ public class MasterClientServiceHandler extends FateServiceHandler
}
@Override
- public void waitForFlush(TInfo tinfo, TCredentials c, String tableIdStr, ByteBuffer startRow,
- ByteBuffer endRow, long flushID, long maxLoops)
+ public void waitForFlush(TInfo tinfo, TCredentials c, String tableIdStr, ByteBuffer startRowBB,
+ ByteBuffer endRowBB, long flushID, long maxLoops)
throws ThriftSecurityException, ThriftTableOperationException {
Table.ID tableId = Table.ID.of(tableIdStr);
Namespace.ID namespaceId = getNamespaceIdFromTableId(TableOperation.FLUSH, tableId);
master.security.canFlush(c, tableId, namespaceId);
- if (endRow != null && startRow != null
- && ByteBufferUtil.toText(startRow).compareTo(ByteBufferUtil.toText(endRow)) >= 0)
+ Text startRow = ByteBufferUtil.toText(startRowBB);
+ Text endRow = ByteBufferUtil.toText(endRowBB);
+
+ if (endRow != null && startRow != null && startRow.compareTo(endRow) >= 0)
throw new ThriftTableOperationException(tableId.canonicalID(), null, TableOperation.FLUSH,
TableOperationExceptionType.BAD_RANGE, "start row must be less than end row");
@@ -167,13 +164,16 @@ public class MasterClientServiceHandler extends FateServiceHandler
try {
final TServerConnection server = master.tserverSet.getConnection(instance);
if (server != null)
- server.flush(master.masterLock, tableId, ByteBufferUtil.toBytes(startRow),
- ByteBufferUtil.toBytes(endRow));
+ server.flush(master.masterLock, tableId, ByteBufferUtil.toBytes(startRowBB),
+ ByteBufferUtil.toBytes(endRowBB));
} catch (TException ex) {
Master.log.error(ex.toString());
}
}
+ if (tableId.equals(RootTable.ID))
+ break; // this code does not properly handle the root tablet. See #798
+
if (l == maxLoops - 1)
break;
@@ -181,71 +181,23 @@ public class MasterClientServiceHandler extends FateServiceHandler
serversToFlush.clear();
- try {
- AccumuloClient client = master.getClient();
- Scanner scanner;
- if (tableId.equals(MetadataTable.ID)) {
- scanner = new IsolatedScanner(client.createScanner(RootTable.NAME, Authorizations.EMPTY));
- scanner.setRange(MetadataSchema.TabletsSection.getRange());
- } else {
- scanner = new IsolatedScanner(
- client.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- Range range = new KeyExtent(tableId, null, ByteBufferUtil.toText(startRow))
- .toMetadataRange();
- scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange()));
- }
- TabletsSection.ServerColumnFamily.FLUSH_COLUMN.fetch(scanner);
- TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
- scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
- scanner.fetchColumnFamily(LogColumnFamily.NAME);
-
- RowIterator ri = new RowIterator(scanner);
-
+ try (TabletsMetadata tablets = TabletsMetadata.builder().forTable(tableId)
+ .overlapping(startRow, endRow).fetchFlushId().fetchLocation().fetchLogs().fetchPrev()
+ .build(master.getContext())) {
int tabletsToWaitFor = 0;
int tabletCount = 0;
- Text ert = ByteBufferUtil.toText(endRow);
-
- while (ri.hasNext()) {
- Iterator<Entry<Key,Value>> row = ri.next();
- long tabletFlushID = -1;
- int logs = 0;
- boolean online = false;
-
- TServerInstance server = null;
-
- Entry<Key,Value> entry = null;
- while (row.hasNext()) {
- entry = row.next();
- Key key = entry.getKey();
-
- if (TabletsSection.ServerColumnFamily.FLUSH_COLUMN.equals(key.getColumnFamily(),
- key.getColumnQualifier())) {
- tabletFlushID = Long.parseLong(entry.getValue().toString());
- }
-
- if (LogColumnFamily.NAME.equals(key.getColumnFamily()))
- logs++;
-
- if (TabletsSection.CurrentLocationColumnFamily.NAME.equals(key.getColumnFamily())) {
- online = true;
- server = new TServerInstance(entry.getValue(), key.getColumnQualifier());
- }
-
- }
+ for (TabletMetadata tablet : tablets) {
+ int logs = tablet.getLogs().size();
// when tablet is not online and has no logs, there is no reason to wait for it
- if ((online || logs > 0) && tabletFlushID < flushID) {
+ if ((tablet.hasCurrent() || logs > 0) && tablet.getFlushId().orElse(-1) < flushID) {
tabletsToWaitFor++;
- if (server != null)
- serversToFlush.add(server);
+ if (tablet.hasCurrent())
+ serversToFlush.add(new TServerInstance(tablet.getLocation()));
}
tabletCount++;
-
- Text tabletEndRow = new KeyExtent(entry.getKey().getRow(), (Text) null).getEndRow();
- if (tabletEndRow == null || (ert != null && tabletEndRow.compareTo(ert) >= 0))
- break;
}
if (tabletsToWaitFor == 0)
@@ -257,15 +209,9 @@ public class MasterClientServiceHandler extends FateServiceHandler
throw new ThriftTableOperationException(tableId.canonicalID(), null, TableOperation.FLUSH,
TableOperationExceptionType.NOTFOUND, null);
- } catch (AccumuloException | TabletDeletedException e) {
+ } catch (TabletDeletedException e) {
Master.log.debug("Failed to scan {} table to wait for flush {}", MetadataTable.NAME,
tableId, e);
- } catch (AccumuloSecurityException e) {
- Master.log.warn("{}", e.getMessage(), e);
- throw new ThriftSecurityException();
- } catch (TableNotFoundException e) {
- Master.log.error("{}", e.getMessage(), e);
- throw new ThriftTableOperationException();
}
}
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
index 6fd3568..a89799a 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
@@ -41,9 +41,9 @@ import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.metadata.schema.MetadataScanner;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Tracer;
@@ -271,9 +271,9 @@ class LoadFiles extends MasterRepo {
Text startRow = loadMapEntry.getKey().getPrevEndRow();
- Iterator<TabletMetadata> tabletIter = MetadataScanner.builder().from(master.getContext())
- .scanMetadataTable().overRange(tableId, startRow, null).checkConsistency().fetchPrev()
- .fetchLocation().fetchLoaded().build().iterator();
+ Iterator<TabletMetadata> tabletIter = TabletsMetadata.builder().forTable(tableId)
+ .overlapping(startRow, null).checkConsistency().fetchPrev().fetchLocation().fetchLoaded()
+ .build(master.getContext()).iterator();
List<TabletMetadata> tablets = new ArrayList<>();
TabletMetadata currentTablet = tabletIter.next();
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
index 3797d86..c608a96 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
@@ -36,8 +36,8 @@ import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.metadata.schema.MetadataScanner;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -163,9 +163,9 @@ public class PrepBulkImport extends MasterRepo {
Iterators.transform(lmi, entry -> entry.getKey());
TabletIterFactory tabletIterFactory = startRow -> {
- return MetadataScanner.builder().from(master.getContext()).scanMetadataTable()
- .overRange(bulkInfo.tableId, startRow, null).checkConsistency().fetchPrev().build()
- .stream().map(TabletMetadata::getExtent).iterator();
+ return TabletsMetadata.builder().forTable(bulkInfo.tableId).overlapping(startRow, null)
+ .checkConsistency().fetchPrev().build(master.getContext()).stream()
+ .map(TabletMetadata::getExtent).iterator();
};
checkForMerge(bulkInfo.tableId.canonicalID(),
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/compact/CompactionDriver.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/compact/CompactionDriver.java
index 21373a4..2cb3f23 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/compact/CompactionDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/compact/CompactionDriver.java
@@ -17,30 +17,18 @@
package org.apache.accumulo.master.tableOps.compact;
import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map.Entry;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.RowIterator;
-import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.clientImpl.Namespace;
import org.apache.accumulo.core.clientImpl.Table;
import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
@@ -49,7 +37,6 @@ import org.apache.accumulo.master.tableOps.MasterRepo;
import org.apache.accumulo.master.tableOps.Utils;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
import org.slf4j.LoggerFactory;
@@ -75,6 +62,11 @@ class CompactionDriver extends MasterRepo {
@Override
public long isReady(long tid, Master master) throws Exception {
+ if (tableId.equals(RootTable.ID)) {
+ // this codes not properly handle the root table. See #798
+ return 0;
+ }
+
String zCancelID = Constants.ZROOT + "/" + master.getInstanceID() + Constants.ZTABLES + "/"
+ tableId + Constants.ZTABLE_COMPACT_CANCEL_ID;
@@ -87,60 +79,24 @@ class CompactionDriver extends MasterRepo {
}
MapCounter<TServerInstance> serversToFlush = new MapCounter<>();
- AccumuloClient client = master.getClient();
-
- Scanner scanner;
-
- if (tableId.equals(MetadataTable.ID)) {
- scanner = new IsolatedScanner(client.createScanner(RootTable.NAME, Authorizations.EMPTY));
- scanner.setRange(MetadataSchema.TabletsSection.getRange());
- } else {
- scanner = new IsolatedScanner(client.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- Range range = new KeyExtent(tableId, null, startRow == null ? null : new Text(startRow))
- .toMetadataRange();
- scanner.setRange(range);
- }
-
- TabletsSection.ServerColumnFamily.COMPACT_COLUMN.fetch(scanner);
- TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
- scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
-
long t1 = System.currentTimeMillis();
- RowIterator ri = new RowIterator(scanner);
int tabletsToWaitFor = 0;
int tabletCount = 0;
- while (ri.hasNext()) {
- Iterator<Entry<Key,Value>> row = ri.next();
- long tabletCompactID = -1;
-
- TServerInstance server = null;
+ TabletsMetadata tablets = TabletsMetadata.builder().forTable(tableId)
+ .overlapping(startRow, endRow).fetchLocation().fetchPrev().fetchCompactId()
+ .build(master.getContext());
- Entry<Key,Value> entry = null;
- while (row.hasNext()) {
- entry = row.next();
- Key key = entry.getKey();
-
- if (TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.getColumnFamily(),
- key.getColumnQualifier()))
- tabletCompactID = Long.parseLong(entry.getValue().toString());
-
- if (TabletsSection.CurrentLocationColumnFamily.NAME.equals(key.getColumnFamily()))
- server = new TServerInstance(entry.getValue(), key.getColumnQualifier());
- }
-
- if (tabletCompactID < compactId) {
+ for (TabletMetadata tablet : tablets) {
+ if (tablet.getCompactId().orElse(-1) < compactId) {
tabletsToWaitFor++;
- if (server != null)
- serversToFlush.increment(server, 1);
+ if (tablet.hasCurrent()) {
+ serversToFlush.increment(new TServerInstance(tablet.getLocation()), 1);
+ }
}
tabletCount++;
-
- Text tabletEndRow = new KeyExtent(entry.getKey().getRow(), (Text) null).getEndRow();
- if (tabletEndRow == null || (endRow != null && tabletEndRow.compareTo(new Text(endRow)) >= 0))
- break;
}
long scanTime = System.currentTimeMillis() - t1;
@@ -170,11 +126,9 @@ class CompactionDriver extends MasterRepo {
long sleepTime = 500;
+ // make wait time depend on the server with the most to compact
if (serversToFlush.size() > 0)
- sleepTime = Collections.max(serversToFlush.values()) * sleepTime; // make wait time depend on
- // the server with the most
- // to
- // compact
+ sleepTime = Collections.max(serversToFlush.values()) * sleepTime;
sleepTime = Math.max(2 * scanTime, sleepTime);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
index 3fe673e..0c2c25a 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
@@ -52,8 +52,8 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.file.rfile.RFile;
-import org.apache.accumulo.core.metadata.schema.MetadataScanner;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.MemoryUnit;
@@ -376,9 +376,9 @@ public class BulkLoadIT extends AccumuloClusterHarness {
Set<String> endRowsSeen = new HashSet<>();
String id = client.tableOperations().tableIdMap().get(tableName);
- try (MetadataScanner scanner = MetadataScanner.builder().from(client).scanMetadataTable()
- .overRange(Table.ID.of(id)).fetchFiles().fetchLoaded().fetchPrev().build()) {
- for (TabletMetadata tablet : scanner) {
+ try (TabletsMetadata tablets = TabletsMetadata.builder().forTable(Table.ID.of(id)).fetchFiles()
+ .fetchLoaded().fetchPrev().build(client)) {
+ for (TabletMetadata tablet : tablets) {
assertTrue(tablet.getLoaded().isEmpty());
Set<String> fileHashes = tablet.getFiles().stream().map(f -> hash(f))