You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2018/05/24 15:55:56 UTC
[accumulo] branch master updated: fixes #469 made bulk import scan
split tolerant (#488)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 9feb5e1 fixes #469 made bulk import scan split tolerant (#488)
9feb5e1 is described below
commit 9feb5e1dce0082b188a70753f3587bc1ba085c0f
Author: Keith Turner <kt...@apache.org>
AuthorDate: Mon May 14 18:52:25 2018 -0400
fixes #469 made bulk import scan split tolerant (#488)
* Made MetadataScanner check structure and retry when linked list broken
* Removed TabletIterator
* Made code that used TabletIterator use MetadataScanner
---
.../core/metadata/schema/LinkingIterator.java | 154 ++++++++++++
.../core/metadata/schema/MetadataScanner.java | 218 ++++++++++++-----
.../core/metadata/schema/MetadataSchema.java | 31 ++-
.../metadata/schema/TabletDeletedException.java | 27 +++
.../core/metadata/schema/TabletMetadata.java | 194 +++++++++++----
.../org/apache/accumulo/core/summary/Gatherer.java | 12 +-
.../core/metadata/schema/LinkingIteratorTest.java | 112 +++++++++
.../accumulo/server/util/MetadataTableUtil.java | 125 +++++-----
.../accumulo/server/util/TabletIterator.java | 267 ---------------------
.../accumulo/gc/GarbageCollectionAlgorithm.java | 42 ++--
.../accumulo/gc/GarbageCollectionEnvironment.java | 16 +-
.../apache/accumulo/gc/SimpleGarbageCollector.java | 34 +--
.../apache/accumulo/gc/GarbageCollectionTest.java | 51 +---
.../master/MasterClientServiceHandler.java | 2 +-
.../master/tableOps/bulkVer2/LoadFiles.java | 7 +-
.../master/tableOps/bulkVer2/PrepBulkImport.java | 6 +-
.../java/org/apache/accumulo/test/CloneIT.java | 6 +-
.../apache/accumulo/test/functional/MergeIT.java | 78 ------
18 files changed, 754 insertions(+), 628 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/LinkingIterator.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/LinkingIterator.java
new file mode 100644
index 0000000..e95a8af
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/LinkingIterator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.metadata.schema;
+
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * Tablets for a table in the metadata table should form a linked list. This iterator detects when
+ * tablets do not form a linked list and backs up when this happens.
+ *
+ * <p>
+ * The purpose of this is to hide inconsistencies caused by splits and detect anomalies in the
+ * metadata table.
+ *
+ * <p>
+ * If a tablet that was returned by this iterator is subsequently deleted from the metadata table,
+ * then this iterator will throw a TabletDeletedException. This could occur when a table is merged.
+ */
+public class LinkingIterator implements Iterator<TabletMetadata> {
+
+ private static final Logger log = LoggerFactory.getLogger(LinkingIterator.class);
+
+ private Range range;
+ private Function<Range,Iterator<TabletMetadata>> iteratorFactory;
+ private Iterator<TabletMetadata> source;
+ private TabletMetadata prevTablet = null;
+
+ LinkingIterator(Function<Range,Iterator<TabletMetadata>> iteratorFactory, Range range) {
+ this.range = range;
+ this.iteratorFactory = iteratorFactory;
+ source = iteratorFactory.apply(range);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return source.hasNext();
+ }
+
+ static boolean goodTransition(TabletMetadata prev, TabletMetadata curr) {
+ if (!curr.sawPrevEndRow()) {
+ log.warn("Tablet {} had no prev end row.", curr.getExtent());
+ return false;
+ }
+
+ if (!curr.getTableId().equals(prev.getTableId())) {
+ if (prev.getEndRow() != null) {
+ log.debug("Non-null end row for last tablet in table: " + prev.getExtent() + " "
+ + curr.getExtent());
+ return false;
+ }
+
+ if (curr.getPrevEndRow() != null) {
+ log.debug("First tablet for table had prev end row {} {} ", prev.getExtent(),
+ curr.getExtent());
+ return false;
+ }
+ } else {
+ if (prev.getEndRow() == null) {
+ throw new IllegalStateException("Null end row for tablet in middle of table: "
+ + prev.getExtent() + " " + curr.getExtent());
+ }
+
+ if (curr.getPrevEndRow() == null || !prev.getEndRow().equals(curr.getPrevEndRow())) {
+ log.debug("Tablets end row and prev end row not equals {} {} ", prev.getExtent(),
+ curr.getExtent());
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private void resetSource() {
+ if (prevTablet == null) {
+ source = iteratorFactory.apply(range);
+ } else {
+ // get the metadata table row for the previous tablet
+ Text prevMetaRow = KeyExtent.getMetadataEntry(prevTablet.getTableId(),
+ prevTablet.getEndRow());
+
+ // ensure the previous tablet still exists in the metadata table
+ if (Iterators.size(iteratorFactory.apply(new Range(prevMetaRow))) == 0) {
+ throw new TabletDeletedException("Tablet " + prevMetaRow + " was deleted while iterating");
+ }
+
+ // start scanning at next possible row in metadata table
+ Range seekRange = new Range(new Key(prevMetaRow).followingKey(PartialKey.ROW), true,
+ range.getEndKey(), range.isEndKeyInclusive());
+
+ log.info("Resetting scanner to {}", seekRange);
+
+ source = iteratorFactory.apply(seekRange);
+ }
+ }
+
+ @Override
+ public TabletMetadata next() {
+
+ long sleepTime = 250;
+
+ TabletMetadata currTablet = null;
+ while (currTablet == null) {
+ TabletMetadata tmp = source.next();
+
+ if (prevTablet == null) {
+ if (tmp.sawPrevEndRow()) {
+ currTablet = tmp;
+ } else {
+ log.warn("Tablet has no prev end row " + tmp.getTableId() + " " + tmp.getEndRow());
+ }
+ } else if (goodTransition(prevTablet, tmp)) {
+ currTablet = tmp;
+ }
+
+ if (currTablet == null) {
+ sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
+ resetSource();
+ sleepTime = Math.min(2 * sleepTime, 5000);
+ }
+ }
+
+ prevTablet = currTablet;
+ return currTablet;
+ }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java
index 44b7611..6fc05bb 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java
@@ -17,6 +17,8 @@
package org.apache.accumulo.core.metadata.schema;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
import java.util.ArrayList;
@@ -24,58 +26,89 @@ import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.client.impl.Table;
+import org.apache.accumulo.core.client.impl.Table.ID;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.FetchedColumns;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.hadoop.io.Text;
-import com.google.common.base.Preconditions;
-
-public class MetadataScanner {
+public class MetadataScanner implements Iterable<TabletMetadata>, AutoCloseable {
public interface SourceOptions {
- TableOptions from(Scanner scanner);
-
TableOptions from(ClientContext ctx);
+
+ TableOptions from(Connector conn);
}
- public interface TableOptions {
- ColumnOptions overRootTable();
+ public interface TableOptions extends RangeOptions {
+ /**
+ * Optionally set a table name, defaults to {@value MetadataTable#NAME}
+ */
+ RangeOptions scanTable(String tableName);
+ }
- ColumnOptions overMetadataTable();
+ public interface RangeOptions {
+ Options overTabletRange();
- ColumnOptions overUserTableId(Table.ID tableId);
+ Options overRange(Range range);
- ColumnOptions overUserTableId(Table.ID tableId, Text startRow, Text endRow);
+ Options overRange(Table.ID tableId);
+
+ Options overRange(Table.ID tableId, Text startRow, Text endRow);
}
- public interface ColumnOptions {
- ColumnOptions fetchFiles();
+ public interface Options {
+ /**
+ * Checks that the metadata table forms a linked list and automatically backs up until it does.
+ */
+ Options checkConsistency();
+
+ /**
+ * Saves the key values seen in the metadata table for each tablet.
+ */
+ Options saveKeyValues();
+
+ Options fetchFiles();
- ColumnOptions fetchLoaded();
+ Options fetchLoaded();
- ColumnOptions fetchLocation();
+ Options fetchLocation();
- ColumnOptions fetchPrev();
+ Options fetchPrev();
- ColumnOptions fetchLast();
+ Options fetchLast();
- Iterable<TabletMetadata> build()
+ Options fetchScans();
+
+ Options fetchDir();
+
+ Options fetchTime();
+
+ Options fetchCloned();
+
+ MetadataScanner build()
throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
}
@@ -108,34 +141,41 @@ public class MetadataScanner {
}
}
- private static class Builder implements SourceOptions, TableOptions, ColumnOptions {
+ private static class Builder implements SourceOptions, TableOptions, RangeOptions, Options {
private List<Text> families = new ArrayList<>();
private List<ColumnFQ> qualifiers = new ArrayList<>();
- private Scanner scanner;
- private ClientContext ctx;
- private String table;
- private Table.ID userTableId;
+ private Connector conn;
+ private String table = MetadataTable.NAME;
+ private Range range;
private EnumSet<FetchedColumns> fetchedCols = EnumSet.noneOf(FetchedColumns.class);
- private Text startRow;
private Text endRow;
+ private boolean checkConsistency = false;
+ private boolean saveKeyValues;
@Override
- public ColumnOptions fetchFiles() {
+ public Options fetchFiles() {
fetchedCols.add(FetchedColumns.FILES);
families.add(DataFileColumnFamily.NAME);
return this;
}
@Override
- public ColumnOptions fetchLoaded() {
+ public Options fetchScans() {
+ fetchedCols.add(FetchedColumns.SCANS);
+ families.add(ScanFileColumnFamily.NAME);
+ return this;
+ }
+
+ @Override
+ public Options fetchLoaded() {
fetchedCols.add(FetchedColumns.LOADED);
- families.add(MetadataSchema.TabletsSection.BulkFileColumnFamily.NAME);
+ families.add(BulkFileColumnFamily.NAME);
return this;
}
@Override
- public ColumnOptions fetchLocation() {
+ public Options fetchLocation() {
fetchedCols.add(FetchedColumns.LOCATION);
families.add(CurrentLocationColumnFamily.NAME);
families.add(FutureLocationColumnFamily.NAME);
@@ -143,31 +183,49 @@ public class MetadataScanner {
}
@Override
- public ColumnOptions fetchPrev() {
+ public Options fetchPrev() {
fetchedCols.add(FetchedColumns.PREV_ROW);
qualifiers.add(PREV_ROW_COLUMN);
return this;
}
@Override
- public ColumnOptions fetchLast() {
+ public Options fetchDir() {
+ fetchedCols.add(FetchedColumns.DIR);
+ qualifiers.add(DIRECTORY_COLUMN);
+ return this;
+ }
+
+ @Override
+ public Options fetchLast() {
fetchedCols.add(FetchedColumns.LAST);
families.add(LastLocationColumnFamily.NAME);
return this;
}
@Override
- public Iterable<TabletMetadata> build()
+ public Options fetchTime() {
+ fetchedCols.add(FetchedColumns.TIME);
+ qualifiers.add(TIME_COLUMN);
+ return this;
+ }
+
+ @Override
+ public Options fetchCloned() {
+ fetchedCols.add(FetchedColumns.CLONED);
+ families.add(ClonedColumnFamily.NAME);
+ return this;
+ }
+
+ @Override
+ public MetadataScanner build()
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
- if (ctx != null) {
- scanner = new IsolatedScanner(
- ctx.getConnector().createScanner(table, Authorizations.EMPTY));
- } else if (!(scanner instanceof IsolatedScanner)) {
- scanner = new IsolatedScanner(scanner);
- }
- if (userTableId != null) {
- scanner.setRange(new KeyExtent(userTableId, null, startRow).toMetadataRange());
+ Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY));
+ scanner.setRange(range);
+
+ if (checkConsistency && !fetchedCols.contains(FetchedColumns.PREV_ROW)) {
+ fetchPrev();
}
for (Text fam : families) {
@@ -182,69 +240,101 @@ public class MetadataScanner {
fetchedCols = EnumSet.allOf(FetchedColumns.class);
}
- Iterable<TabletMetadata> tmi = TabletMetadata.convert(scanner, fetchedCols);
+ Iterable<TabletMetadata> tmi = TabletMetadata.convert(scanner, fetchedCols, checkConsistency,
+ saveKeyValues);
if (endRow != null) {
// create an iterable that will stop at the tablet which contains the endRow
- return new Iterable<TabletMetadata>() {
- @Override
- public Iterator<TabletMetadata> iterator() {
- return new TabletMetadataIterator(tmi.iterator(), endRow);
- }
- };
+ return new MetadataScanner(scanner,
+ () -> new TabletMetadataIterator(tmi.iterator(), endRow));
} else {
- return tmi;
+ return new MetadataScanner(scanner, tmi);
}
+ }
+ @Override
+ public TableOptions from(ClientContext ctx) {
+ try {
+ this.conn = ctx.getConnector();
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ return this;
}
@Override
- public ColumnOptions overRootTable() {
- this.table = RootTable.NAME;
+ public TableOptions from(Connector conn) {
+ this.conn = conn;
return this;
}
@Override
- public ColumnOptions overMetadataTable() {
- this.table = MetadataTable.NAME;
+ public Options checkConsistency() {
+ this.checkConsistency = true;
return this;
}
@Override
- public ColumnOptions overUserTableId(Table.ID tableId) {
- Preconditions
- .checkArgument(!tableId.equals(RootTable.ID) && !tableId.equals(MetadataTable.ID));
+ public Options saveKeyValues() {
+ this.saveKeyValues = true;
+ return this;
+ }
- this.table = MetadataTable.NAME;
- this.userTableId = tableId;
+ @Override
+ public Options overTabletRange() {
+ this.range = TabletsSection.getRange();
return this;
}
@Override
- public TableOptions from(Scanner scanner) {
- this.scanner = scanner;
+ public Options overRange(Range range) {
+ this.range = range;
return this;
}
@Override
- public TableOptions from(ClientContext ctx) {
- this.ctx = ctx;
+ public Options overRange(ID tableId) {
+ this.range = TabletsSection.getRange(tableId);
return this;
}
@Override
- public ColumnOptions overUserTableId(Table.ID tableId, Text startRow, Text endRow) {
- this.table = MetadataTable.NAME;
- this.userTableId = tableId;
- this.startRow = startRow;
+ public Options overRange(ID tableId, Text startRow, Text endRow) {
+ this.range = new KeyExtent(tableId, null, startRow).toMetadataRange();
this.endRow = endRow;
return this;
}
+ @Override
+ public RangeOptions scanTable(String tableName) {
+ this.table = tableName;
+ return this;
+ }
+ }
+
+ private Scanner scanner;
+ private Iterable<TabletMetadata> tablets;
+
+ private MetadataScanner(Scanner scanner, Iterable<TabletMetadata> tmi) {
+ this.scanner = scanner;
+ this.tablets = tmi;
}
public static SourceOptions builder() {
return new Builder();
}
+ @Override
+ public Iterator<TabletMetadata> iterator() {
+ return tablets.iterator();
+ }
+
+ public Stream<TabletMetadata> stream() {
+ return StreamSupport.stream(tablets.spliterator(), false);
+ }
+
+ @Override
+ public void close() {
+ scanner.close();
+ }
}
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 5309d6a..b51526a 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -75,7 +75,8 @@ public class MetadataSchema {
* {@link #PREV_ROW_COLUMN} sits in this and that needs to sort last because the
* SimpleGarbageCollector relies on this.
*/
- public static final Text NAME = new Text("~tab");
+ public static final String STR_NAME = "~tab";
+ public static final Text NAME = new Text(STR_NAME);
/**
* README : very important that prevRow sort last to avoid race conditions between garbage
* collector and split this needs to sort after everything else for that tablet
@@ -95,7 +96,8 @@ public class MetadataSchema {
* Column family for recording information used by the TServer
*/
public static class ServerColumnFamily {
- public static final Text NAME = new Text("srv");
+ public static final String STR_NAME = "srv";
+ public static final Text NAME = new Text(STR_NAME);
/**
* Holds the location of the tablet in the DFS file system
*/
@@ -124,27 +126,31 @@ public class MetadataSchema {
* that it was assigned
*/
public static class CurrentLocationColumnFamily {
- public static final Text NAME = new Text("loc");
+ public static final String STR_NAME = "loc";
+ public static final Text NAME = new Text(STR_NAME);
}
/**
* Column family for storing the assigned location
*/
public static class FutureLocationColumnFamily {
- public static final Text NAME = new Text("future");
+ public static final String STR_NAME = "future";
+ public static final Text NAME = new Text(STR_NAME);
}
/**
* Column family for storing last location, as a hint for assignment
*/
public static class LastLocationColumnFamily {
- public static final Text NAME = new Text("last");
+ public static final String STR_NAME = "last";
+ public static final Text NAME = new Text(STR_NAME);
}
/**
* Column family for storing suspension location, as a demand for assignment.
*/
public static class SuspendLocationColumn {
+ public static final String STR_NAME = "suspend";
public static final ColumnFQ SUSPEND_COLUMN = new ColumnFQ(new Text("suspend"),
new Text("loc"));
}
@@ -153,21 +159,24 @@ public class MetadataSchema {
* Temporary markers that indicate a tablet loaded a bulk file
*/
public static class BulkFileColumnFamily {
- public static final Text NAME = new Text("loaded");
+ public static final String STR_NAME = "loaded";
+ public static final Text NAME = new Text(STR_NAME);
}
/**
* Temporary marker that indicates a tablet was successfully cloned
*/
public static class ClonedColumnFamily {
- public static final Text NAME = new Text("!cloned");
+ public static final String STR_NAME = "!cloned";
+ public static final Text NAME = new Text(STR_NAME);
}
/**
* Column family for storing files used by a tablet
*/
public static class DataFileColumnFamily {
- public static final Text NAME = new Text("file");
+ public static final String STR_NAME = "file";
+ public static final Text NAME = new Text(STR_NAME);
}
/**
@@ -175,14 +184,16 @@ public class MetadataSchema {
* from being deleted
*/
public static class ScanFileColumnFamily {
- public static final Text NAME = new Text("scan");
+ public static final String STR_NAME = "scan";
+ public static final Text NAME = new Text(STR_NAME);
}
/**
* Column family for storing write-ahead log entries
*/
public static class LogColumnFamily {
- public static final Text NAME = new Text("log");
+ public static final String STR_NAME = "log";
+ public static final Text NAME = new Text(STR_NAME);
}
/**
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletDeletedException.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletDeletedException.java
new file mode 100644
index 0000000..d22b1e8
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletDeletedException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.metadata.schema;
+
+public class TabletDeletedException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public TabletDeletedException(String msg) {
+ super(msg);
+ }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
index b101971..e3e268f 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
@@ -17,6 +17,8 @@
package org.apache.accumulo.core.metadata.schema;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
import java.util.EnumSet;
@@ -25,45 +27,61 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.function.Function;
import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.impl.Table;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.hadoop.io.Text;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Iterators;
public class TabletMetadata {
private Table.ID tableId;
private Text prevEndRow;
+ private boolean sawPrevEndRow = false;
private Text endRow;
private Location location;
private List<String> files;
+ private List<String> scans;
private Set<String> loadedFiles;
- private EnumSet<FetchedColumns> fetchedColumns;
+ private EnumSet<FetchedColumns> fetchedCols;
private KeyExtent extent;
private Location last;
+ private String dir;
+ private String time;
+ private String cloned;
+ private SortedMap<Key,Value> keyValues;
public static enum LocationType {
CURRENT, FUTURE, LAST
}
public static enum FetchedColumns {
- LOCATION, PREV_ROW, FILES, LAST, LOADED
+ LOCATION, PREV_ROW, FILES, LAST, LOADED, SCANS, DIR, TIME, CLONED
}
public static class Location {
@@ -101,47 +119,81 @@ public class TabletMetadata {
return extent;
}
+ private void ensureFetched(FetchedColumns col) {
+ Preconditions.checkState(fetchedCols.contains(col), "%s was not fetched", col);
+ }
+
public Text getPrevEndRow() {
- Preconditions.checkState(fetchedColumns.contains(FetchedColumns.PREV_ROW),
- "Requested prev row when it was not fetched");
+ ensureFetched(FetchedColumns.PREV_ROW);
return prevEndRow;
}
+ public boolean sawPrevEndRow() {
+ ensureFetched(FetchedColumns.PREV_ROW);
+ return sawPrevEndRow;
+ }
+
public Text getEndRow() {
return endRow;
}
public Location getLocation() {
- Preconditions.checkState(fetchedColumns.contains(FetchedColumns.LOCATION),
- "Requested location when it was not fetched");
+ ensureFetched(FetchedColumns.LOCATION);
return location;
}
public Set<String> getLoaded() {
- Preconditions.checkState(fetchedColumns.contains(FetchedColumns.LOADED),
- "Requested loaded when it was not fetched");
+ ensureFetched(FetchedColumns.LOADED);
return loadedFiles;
}
public Location getLast() {
- Preconditions.checkState(fetchedColumns.contains(FetchedColumns.LAST),
- "Requested last when it was not fetched");
+ ensureFetched(FetchedColumns.LAST);
return last;
}
public List<String> getFiles() {
- Preconditions.checkState(fetchedColumns.contains(FetchedColumns.FILES),
- "Requested files when it was not fetched");
+ ensureFetched(FetchedColumns.FILES);
return files;
}
- public static TabletMetadata convertRow(Iterator<Entry<Key,Value>> rowIter,
- EnumSet<FetchedColumns> fetchedColumns) {
+ public List<String> getScans() {
+ ensureFetched(FetchedColumns.SCANS);
+ return scans;
+ }
+
+ public String getDir() {
+ ensureFetched(FetchedColumns.DIR);
+ return dir;
+ }
+
+ public String getTime() {
+ ensureFetched(FetchedColumns.TIME);
+ return time;
+ }
+
+ public String getCloned() {
+ ensureFetched(FetchedColumns.CLONED);
+ return cloned;
+ }
+
+ public SortedMap<Key,Value> getKeyValues() {
+ Preconditions.checkState(keyValues != null, "Requested key values when it was not saved");
+ return keyValues;
+ }
+
+ private static TabletMetadata convertRow(Iterator<Entry<Key,Value>> rowIter,
+ EnumSet<FetchedColumns> fetchedColumns, boolean buildKeyValueMap) {
Objects.requireNonNull(rowIter);
TabletMetadata te = new TabletMetadata();
+ ImmutableSortedMap.Builder<Key,Value> kvBuilder = null;
+ if (buildKeyValueMap) {
+ kvBuilder = ImmutableSortedMap.naturalOrder();
+ }
Builder<String> filesBuilder = ImmutableList.builder();
+ Builder<String> scansBuilder = ImmutableList.builder();
final ImmutableSet.Builder<String> loadedFilesBuilder = ImmutableSet.builder();
ByteSequence row = null;
@@ -151,6 +203,10 @@ public class TabletMetadata {
Value v = kv.getValue();
Text fam = k.getColumnFamily();
+ if (buildKeyValueMap) {
+ kvBuilder.put(k, v);
+ }
+
if (row == null) {
row = k.getRowData();
KeyExtent ke = new KeyExtent(k.getRow(), (Text) null);
@@ -161,47 +217,95 @@ public class TabletMetadata {
"Input contains more than one row : " + row + " " + k.getRowData());
}
- if (PREV_ROW_COLUMN.hasColumns(k)) {
- te.prevEndRow = KeyExtent.decodePrevEndRow(v);
- }
- if (fam.equals(DataFileColumnFamily.NAME)) {
- filesBuilder.add(k.getColumnQualifier().toString());
- } else if (fam.equals(MetadataSchema.TabletsSection.BulkFileColumnFamily.NAME)) {
- loadedFilesBuilder.add(k.getColumnQualifier().toString());
- } else if (fam.equals(CurrentLocationColumnFamily.NAME)) {
- if (te.location != null) {
- throw new IllegalArgumentException(
- "Input contains more than one location " + te.location + " " + v);
- }
- te.location = new Location(v.toString(), k.getColumnQualifierData().toString(),
- LocationType.CURRENT);
- } else if (fam.equals(FutureLocationColumnFamily.NAME)) {
- if (te.location != null) {
- throw new IllegalArgumentException(
- "Input contains more than one location " + te.location + " " + v);
- }
- te.location = new Location(v.toString(), k.getColumnQualifierData().toString(),
- LocationType.FUTURE);
- } else if (fam.equals(LastLocationColumnFamily.NAME)) {
- te.last = new Location(v.toString(), k.getColumnQualifierData().toString(),
- LocationType.LAST);
+ switch (fam.toString()) {
+ case TabletColumnFamily.STR_NAME:
+ if (PREV_ROW_COLUMN.hasColumns(k)) {
+ te.prevEndRow = KeyExtent.decodePrevEndRow(v);
+ te.sawPrevEndRow = true;
+ }
+ break;
+ case ServerColumnFamily.STR_NAME:
+ if (DIRECTORY_COLUMN.hasColumns(k)) {
+ te.dir = v.toString();
+ } else if (TIME_COLUMN.hasColumns(k)) {
+ te.time = v.toString();
+ }
+ break;
+ case DataFileColumnFamily.STR_NAME:
+ filesBuilder.add(k.getColumnQualifier().toString());
+ break;
+ case BulkFileColumnFamily.STR_NAME:
+ loadedFilesBuilder.add(k.getColumnQualifier().toString());
+ break;
+ case CurrentLocationColumnFamily.STR_NAME:
+ if (te.location != null) {
+ throw new IllegalArgumentException(
+ "Input contains more than one location " + te.location + " " + v);
+ }
+ te.location = new Location(v.toString(), k.getColumnQualifierData().toString(),
+ LocationType.CURRENT);
+ break;
+ case FutureLocationColumnFamily.STR_NAME:
+ if (te.location != null) {
+ throw new IllegalArgumentException(
+ "Input contains more than one location " + te.location + " " + v);
+ }
+ te.location = new Location(v.toString(), k.getColumnQualifierData().toString(),
+ LocationType.FUTURE);
+ break;
+ case LastLocationColumnFamily.STR_NAME:
+ te.last = new Location(v.toString(), k.getColumnQualifierData().toString(),
+ LocationType.LAST);
+ break;
+ case ScanFileColumnFamily.STR_NAME:
+ scansBuilder.add(k.getColumnQualifierData().toString());
+ break;
+ case ClonedColumnFamily.STR_NAME:
+ te.cloned = v.toString();
+ break;
+ default:
+ throw new IllegalStateException("Unexpected family " + fam);
}
}
te.files = filesBuilder.build();
te.loadedFiles = loadedFilesBuilder.build();
- te.fetchedColumns = fetchedColumns;
+ te.fetchedCols = fetchedColumns;
+ te.scans = scansBuilder.build();
+ if (buildKeyValueMap) {
+ te.keyValues = kvBuilder.build();
+ }
return te;
}
- public static Iterable<TabletMetadata> convert(Scanner input,
- EnumSet<FetchedColumns> fetchedColumns) {
- return new Iterable<TabletMetadata>() {
- @Override
- public Iterator<TabletMetadata> iterator() {
+ static Iterable<TabletMetadata> convert(Scanner input, EnumSet<FetchedColumns> fetchedColumns,
+ boolean checkConsistency, boolean buildKeyValueMap) {
+
+ Range range = input.getRange();
+
+ Function<Range,Iterator<TabletMetadata>> iterFactory = r -> {
+ synchronized (input) {
+ input.setRange(r);
RowIterator rowIter = new RowIterator(input);
- return Iterators.transform(rowIter, ri -> convertRow(ri, fetchedColumns));
+ return Iterators.transform(rowIter, ri -> convertRow(ri, fetchedColumns, buildKeyValueMap));
}
};
+
+ if (checkConsistency) {
+ return () -> new LinkingIterator(iterFactory, range);
+ } else {
+ return () -> iterFactory.apply(range);
+ }
+ }
+
+ @VisibleForTesting
+ static TabletMetadata create(String id, String prevEndRow, String endRow) {
+ TabletMetadata te = new TabletMetadata();
+ te.tableId = Table.ID.of(id);
+ te.sawPrevEndRow = true;
+ te.prevEndRow = prevEndRow == null ? null : new Text(prevEndRow);
+ te.endRow = endRow == null ? null : new Text(endRow);
+ te.fetchedCols = EnumSet.of(FetchedColumns.PREV_ROW);
+ return te;
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
index 9df6cef..0799510 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -38,7 +38,6 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -162,11 +161,12 @@ public class Gatherer {
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
Iterable<TabletMetadata> tmi = MetadataScanner.builder().from(ctx)
- .overUserTableId(tableId, startRow, endRow).fetchFiles().fetchLocation().fetchLast()
- .fetchPrev().build();
+ .overRange(tableId, startRow, endRow).fetchFiles().fetchLocation().fetchLast().fetchPrev()
+ .build();
// get a subset of files
Map<String,List<TabletMetadata>> files = new HashMap<>();
+
for (TabletMetadata tm : tmi) {
for (String file : tm.getFiles()) {
if (fileSelector.test(file)) {
@@ -522,10 +522,8 @@ public class Gatherer {
private int countFiles()
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
// TODO use a batch scanner + iterator to parallelize counting files
- Iterable<TabletMetadata> tmi = MetadataScanner.builder().from(ctx)
- .overUserTableId(tableId, startRow, endRow).fetchFiles().fetchPrev().build();
- return StreamSupport.stream(tmi.spliterator(), false).mapToInt(tm -> tm.getFiles().size())
- .sum();
+ return MetadataScanner.builder().from(ctx).overRange(tableId, startRow, endRow).fetchFiles()
+ .fetchPrev().build().stream().mapToInt(tm -> tm.getFiles().size()).sum();
}
private class GatherRequest implements Supplier<SummaryCollection> {
diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/LinkingIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/LinkingIteratorTest.java
new file mode 100644
index 0000000..a12297b
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/LinkingIteratorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.metadata.schema;
+
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.create;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class LinkingIteratorTest {
+
+ private static class IterFactory implements Function<Range,Iterator<TabletMetadata>> {
+ private int count;
+ private List<TabletMetadata> initial;
+ private List<TabletMetadata> subsequent;
+
+ IterFactory(List<TabletMetadata> initial, List<TabletMetadata> subsequent) {
+ this.initial = initial;
+ this.subsequent = subsequent;
+ count = 0;
+ }
+
+ @Override
+ public Iterator<TabletMetadata> apply(Range range) {
+ Stream<TabletMetadata> stream = count++ == 0 ? initial.stream() : subsequent.stream();
+ return stream.filter(tm -> range.contains(new Key(tm.getExtent().getMetadataEntry())))
+ .iterator();
+ }
+ }
+
+ private static void check(List<TabletMetadata> expected, IterFactory iterFactory) {
+ List<KeyExtent> actual = new ArrayList<>();
+ new LinkingIterator(iterFactory, new Range())
+ .forEachRemaining(tm -> actual.add(tm.getExtent()));
+ Assert.assertEquals(Lists.transform(expected, TabletMetadata::getExtent), actual);
+ }
+
+ @Test
+ public void testHole() {
+
+ List<TabletMetadata> tablets1 = Arrays.asList(create("4", null, "f"), create("4", "f", "m"),
+ create("4", "r", "x"), create("4", "x", null));
+ List<TabletMetadata> tablets2 = Arrays.asList(create("4", null, "f"), create("4", "f", "m"),
+ create("4", "m", "r"), create("4", "r", "x"), create("4", "x", null));
+
+ check(tablets2, new IterFactory(tablets1, tablets2));
+ }
+
+ @Test(expected = TabletDeletedException.class)
+ public void testMerge() {
+ // test for case when a tablet is merged away
+ List<TabletMetadata> tablets1 = Arrays.asList(create("4", null, "f"), create("4", "f", "m"),
+ create("4", "f", "r"), create("4", "x", null));
+ List<TabletMetadata> tablets2 = Arrays.asList(create("4", null, "f"), create("4", "f", "r"),
+ create("4", "r", "x"), create("4", "x", null));
+
+ LinkingIterator li = new LinkingIterator(new IterFactory(tablets1, tablets2), new Range());
+
+ while (li.hasNext()) {
+ li.next();
+ }
+ }
+
+ @Test
+ public void testBadTableTransition1() {
+ // test when last tablet in table does not have null end row
+ List<TabletMetadata> tablets1 = Arrays.asList(create("4", null, "f"), create("4", "f", "m"),
+ create("5", null, null));
+ List<TabletMetadata> tablets2 = Arrays.asList(create("4", null, "f"), create("4", "f", "m"),
+ create("4", "m", null), create("5", null, null));
+
+ check(tablets2, new IterFactory(tablets1, tablets2));
+ }
+
+ @Test
+ public void testBadTableTransition2() {
+ // test when first tablet in table does not have null prev end row
+ List<TabletMetadata> tablets1 = Arrays.asList(create("4", null, "f"), create("4", "f", null),
+ create("5", "h", null));
+ List<TabletMetadata> tablets2 = Arrays.asList(create("4", null, "f"), create("4", "f", null),
+ create("5", null, "h"), create("5", "h", null));
+
+ check(tablets2, new IterFactory(tablets1, tablets2));
+ }
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index d93de1b..efaf6f0 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -62,6 +62,7 @@ import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.MetadataScanner;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
@@ -70,6 +71,8 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Da
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletDeletedException;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
@@ -714,15 +717,12 @@ public class MetadataTableUtil {
}
}
- private static void getFiles(Set<String> files, Map<Key,Value> tablet, Table.ID srcTableId) {
- for (Entry<Key,Value> entry : tablet.entrySet()) {
- if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
- String cf = entry.getKey().getColumnQualifier().toString();
- if (srcTableId != null && !cf.startsWith("../") && !cf.contains(":")) {
- cf = "../" + srcTableId + entry.getKey().getColumnQualifier();
- }
- files.add(cf);
+ private static void getFiles(Set<String> files, List<String> tabletFiles, Table.ID srcTableId) {
+ for (String file : tabletFiles) {
+ if (srcTableId != null && !file.startsWith("../") && !file.contains(":")) {
+ file = "../" + srcTableId + file;
}
+ files.add(file);
}
}
@@ -753,37 +753,43 @@ public class MetadataTableUtil {
return m;
}
- private static Scanner createCloneScanner(String tableName, Table.ID tableId, Connector conn)
- throws TableNotFoundException {
- if (tableId.equals(MetadataTable.ID))
+ private static Iterable<TabletMetadata> createCloneScanner(String testTableName, Table.ID tableId,
+ Connector conn) throws TableNotFoundException {
+
+ String tableName;
+ Range range;
+
+ if (testTableName != null) {
+ tableName = testTableName;
+ range = TabletsSection.getRange(tableId);
+ } else if (tableId.equals(MetadataTable.ID)) {
tableName = RootTable.NAME;
- Scanner mscanner = new IsolatedScanner(conn.createScanner(tableName, Authorizations.EMPTY));
- mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
- mscanner.fetchColumnFamily(DataFileColumnFamily.NAME);
- mscanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
- mscanner.fetchColumnFamily(TabletsSection.LastLocationColumnFamily.NAME);
- mscanner.fetchColumnFamily(ClonedColumnFamily.NAME);
- TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(mscanner);
- TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(mscanner);
- return mscanner;
+ range = TabletsSection.getRange();
+ } else {
+ tableName = MetadataTable.NAME;
+ range = TabletsSection.getRange(tableId);
+ }
+
+ try {
+ return MetadataScanner.builder().from(conn).scanTable(tableName).overRange(range)
+ .checkConsistency().saveKeyValues().fetchFiles().fetchLocation().fetchLast().fetchCloned()
+ .fetchPrev().fetchTime().build();
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new RuntimeException(e);
+ }
}
@VisibleForTesting
- public static void initializeClone(String tableName, Table.ID srcTableId, Table.ID tableId,
+ public static void initializeClone(String testTableName, Table.ID srcTableId, Table.ID tableId,
Connector conn, BatchWriter bw) throws TableNotFoundException, MutationsRejectedException {
- TabletIterator ti;
- if (srcTableId.equals(MetadataTable.ID))
- ti = new TabletIterator(createCloneScanner(tableName, srcTableId, conn), new Range(), true,
- true);
- else
- ti = new TabletIterator(createCloneScanner(tableName, srcTableId, conn),
- new KeyExtent(srcTableId, null, null).toMetadataRange(), true, true);
+
+ Iterator<TabletMetadata> ti = createCloneScanner(testTableName, srcTableId, conn).iterator();
if (!ti.hasNext())
throw new RuntimeException(" table deleted during clone? srcTableId = " + srcTableId);
while (ti.hasNext())
- bw.addMutation(createCloneMutation(srcTableId, tableId, ti.next()));
+ bw.addMutation(createCloneMutation(srcTableId, tableId, ti.next().getKeyValues()));
bw.flush();
}
@@ -794,12 +800,13 @@ public class MetadataTableUtil {
}
@VisibleForTesting
- public static int checkClone(String tableName, Table.ID srcTableId, Table.ID tableId,
+ public static int checkClone(String testTableName, Table.ID srcTableId, Table.ID tableId,
Connector conn, BatchWriter bw) throws TableNotFoundException, MutationsRejectedException {
- TabletIterator srcIter = new TabletIterator(createCloneScanner(tableName, srcTableId, conn),
- new KeyExtent(srcTableId, null, null).toMetadataRange(), true, true);
- TabletIterator cloneIter = new TabletIterator(createCloneScanner(tableName, tableId, conn),
- new KeyExtent(tableId, null, null).toMetadataRange(), true, true);
+
+ Iterator<TabletMetadata> srcIter = createCloneScanner(testTableName, srcTableId, conn)
+ .iterator();
+ Iterator<TabletMetadata> cloneIter = createCloneScanner(testTableName, tableId, conn)
+ .iterator();
if (!cloneIter.hasNext() || !srcIter.hasNext())
throw new RuntimeException(
@@ -808,50 +815,40 @@ public class MetadataTableUtil {
int rewrites = 0;
while (cloneIter.hasNext()) {
- Map<Key,Value> cloneTablet = cloneIter.next();
- Text cloneEndRow = new KeyExtent(cloneTablet.keySet().iterator().next().getRow(), (Text) null)
- .getEndRow();
+ TabletMetadata cloneTablet = cloneIter.next();
+ Text cloneEndRow = cloneTablet.getEndRow();
HashSet<String> cloneFiles = new HashSet<>();
- boolean cloneSuccessful = false;
- for (Entry<Key,Value> entry : cloneTablet.entrySet()) {
- if (entry.getKey().getColumnFamily().equals(ClonedColumnFamily.NAME)) {
- cloneSuccessful = true;
- break;
- }
- }
+ boolean cloneSuccessful = cloneTablet.getCloned() != null;
if (!cloneSuccessful)
- getFiles(cloneFiles, cloneTablet, null);
+ getFiles(cloneFiles, cloneTablet.getFiles(), null);
- List<Map<Key,Value>> srcTablets = new ArrayList<>();
- Map<Key,Value> srcTablet = srcIter.next();
+ List<TabletMetadata> srcTablets = new ArrayList<>();
+ TabletMetadata srcTablet = srcIter.next();
srcTablets.add(srcTablet);
- Text srcEndRow = new KeyExtent(srcTablet.keySet().iterator().next().getRow(), (Text) null)
- .getEndRow();
-
+ Text srcEndRow = srcTablet.getEndRow();
int cmp = compareEndRows(cloneEndRow, srcEndRow);
if (cmp < 0)
- throw new TabletIterator.TabletDeletedException(
+ throw new TabletDeletedException(
"Tablets deleted from src during clone : " + cloneEndRow + " " + srcEndRow);
HashSet<String> srcFiles = new HashSet<>();
if (!cloneSuccessful)
- getFiles(srcFiles, srcTablet, srcTableId);
+ getFiles(srcFiles, srcTablet.getFiles(), srcTableId);
while (cmp > 0) {
srcTablet = srcIter.next();
srcTablets.add(srcTablet);
- srcEndRow = new KeyExtent(srcTablet.keySet().iterator().next().getRow(), (Text) null)
- .getEndRow();
+ srcEndRow = srcTablet.getEndRow();
cmp = compareEndRows(cloneEndRow, srcEndRow);
if (cmp < 0)
- throw new TabletIterator.TabletDeletedException(
+ throw new TabletDeletedException(
"Tablets deleted from src during clone : " + cloneEndRow + " " + srcEndRow);
if (!cloneSuccessful)
- getFiles(srcFiles, srcTablet, srcTableId);
+ getFiles(srcFiles, srcTablet.getFiles(), srcTableId);
}
if (cloneSuccessful)
@@ -859,22 +856,22 @@ public class MetadataTableUtil {
if (!srcFiles.containsAll(cloneFiles)) {
// delete existing cloned tablet entry
- Mutation m = new Mutation(cloneTablet.keySet().iterator().next().getRow());
+ Mutation m = new Mutation(cloneTablet.getExtent().getMetadataEntry());
- for (Entry<Key,Value> entry : cloneTablet.entrySet()) {
+ for (Entry<Key,Value> entry : cloneTablet.getKeyValues().entrySet()) {
Key k = entry.getKey();
m.putDelete(k.getColumnFamily(), k.getColumnQualifier(), k.getTimestamp());
}
bw.addMutation(m);
- for (Map<Key,Value> st : srcTablets)
- bw.addMutation(createCloneMutation(srcTableId, tableId, st));
+ for (TabletMetadata st : srcTablets)
+ bw.addMutation(createCloneMutation(srcTableId, tableId, st.getKeyValues()));
rewrites++;
} else {
// write out marker that this tablet was successfully cloned
- Mutation m = new Mutation(cloneTablet.keySet().iterator().next().getRow());
+ Mutation m = new Mutation(cloneTablet.getExtent().getMetadataEntry());
m.put(ClonedColumnFamily.NAME, new Text(""), new Value("OK".getBytes(UTF_8)));
bw.addMutation(m);
}
@@ -893,13 +890,13 @@ public class MetadataTableUtil {
while (true) {
try {
- initializeClone(MetadataTable.NAME, srcTableId, tableId, conn, bw);
+ initializeClone(null, srcTableId, tableId, conn, bw);
// the following loop looks changes in the file that occurred during the copy.. if files
// were dereferenced then they could have been GCed
while (true) {
- int rewrites = checkClone(MetadataTable.NAME, srcTableId, tableId, conn, bw);
+ int rewrites = checkClone(null, srcTableId, tableId, conn, bw);
if (rewrites == 0)
break;
@@ -908,7 +905,7 @@ public class MetadataTableUtil {
bw.flush();
break;
- } catch (TabletIterator.TabletDeletedException tde) {
+ } catch (TabletDeletedException tde) {
// tablets were merged in the src table
bw.flush();
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TabletIterator.java b/server/base/src/main/java/org/apache/accumulo/server/util/TabletIterator.java
deleted file mode 100644
index e65fd71..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TabletIterator.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.util;
-
-import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.impl.Table;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.impl.KeyExtent;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.hadoop.io.Text;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Iterators;
-
-/**
- * This class iterates over the metadata table returning all key values for a tablet in one chunk.
- * As it scans the metadata table it checks the correctness of the metadata table, and rescans if
- * needed. So the tablet key/values returned by this iterator should satisfy the sorted linked list
- * property of the metadata table.
- *
- * The purpose of this is to hide inconsistencies caused by splits and detect anomalies in the
- * metadata table.
- *
- * If a tablet that was returned by this iterator is subsequently deleted from the metadata table,
- * then this iterator will throw a TabletDeletedException. This could occur when a table is merged.
- */
-public class TabletIterator implements Iterator<Map<Key,Value>> {
-
- private static final Logger log = LoggerFactory.getLogger(TabletIterator.class);
-
- private SortedMap<Key,Value> currentTabletKeys;
-
- private Text lastTablet;
-
- private Scanner scanner;
- private Iterator<Entry<Key,Value>> iter;
-
- private boolean returnPrevEndRow;
-
- private boolean returnDir;
-
- private Range range;
-
- public static class TabletDeletedException extends RuntimeException {
-
- private static final long serialVersionUID = 1L;
-
- public TabletDeletedException(String msg) {
- super(msg);
- }
- }
-
- /**
- *
- * @param s
- * A scanner over the entire metadata table configure to fetch needed columns.
- */
- public TabletIterator(Scanner s, Range range, boolean returnPrevEndRow, boolean returnDir) {
- this.scanner = s;
- this.range = range;
- this.scanner.setRange(range);
- TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
- TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
- this.iter = s.iterator();
- this.returnPrevEndRow = returnPrevEndRow;
- this.returnDir = returnDir;
- }
-
- @Override
- public boolean hasNext() {
- while (currentTabletKeys == null) {
-
- currentTabletKeys = scanToPrevEndRow();
- if (currentTabletKeys.size() == 0) {
- break;
- }
-
- Key prevEndRowKey = currentTabletKeys.lastKey();
- Value prevEndRowValue = currentTabletKeys.get(prevEndRowKey);
-
- if (!TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(prevEndRowKey)) {
- log.debug("{}", currentTabletKeys);
- throw new RuntimeException("Unexpected key " + prevEndRowKey);
- }
-
- Text per = KeyExtent.decodePrevEndRow(prevEndRowValue);
- Text lastEndRow;
-
- if (lastTablet == null) {
- lastEndRow = null;
- } else {
- lastEndRow = new KeyExtent(lastTablet, (Text) null).getEndRow();
-
- // do table transition sanity check
- Table.ID lastTable = new KeyExtent(lastTablet, (Text) null).getTableId();
- Table.ID currentTable = new KeyExtent(prevEndRowKey.getRow(), (Text) null).getTableId();
-
- if (!lastTable.equals(currentTable) && (per != null || lastEndRow != null)) {
- log.info("Metadata inconsistency on table transition : {} {} {} {}", lastTable,
- currentTable, per, lastEndRow);
-
- currentTabletKeys = null;
- resetScanner();
-
- sleepUninterruptibly(250, TimeUnit.MILLISECONDS);
-
- continue;
- }
- }
-
- boolean perEqual = (per == null && lastEndRow == null)
- || (per != null && lastEndRow != null && per.equals(lastEndRow));
-
- if (!perEqual) {
-
- log.info("Metadata inconsistency : {} != {} metadataKey = {}", per, lastEndRow,
- prevEndRowKey);
-
- currentTabletKeys = null;
- resetScanner();
-
- sleepUninterruptibly(250, TimeUnit.MILLISECONDS);
-
- continue;
-
- }
- // this tablet is good, so set it as the last tablet
- lastTablet = prevEndRowKey.getRow();
- }
-
- return currentTabletKeys.size() > 0;
- }
-
- @Override
- public Map<Key,Value> next() {
-
- if (!hasNext())
- throw new NoSuchElementException();
-
- Map<Key,Value> tmp = currentTabletKeys;
- currentTabletKeys = null;
-
- Set<Entry<Key,Value>> es = tmp.entrySet();
- Iterator<Entry<Key,Value>> esIter = es.iterator();
-
- while (esIter.hasNext()) {
- Map.Entry<Key,Value> entry = esIter.next();
- if (!returnPrevEndRow
- && TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(entry.getKey())) {
- esIter.remove();
- }
-
- if (!returnDir
- && TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(entry.getKey())) {
- esIter.remove();
- }
- }
-
- return tmp;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- private SortedMap<Key,Value> scanToPrevEndRow() {
-
- Text curMetaDataRow = null;
-
- TreeMap<Key,Value> tm = new TreeMap<>();
-
- boolean sawPrevEndRow = false;
-
- while (true) {
- while (iter.hasNext()) {
- Entry<Key,Value> entry = iter.next();
-
- if (curMetaDataRow == null) {
- curMetaDataRow = entry.getKey().getRow();
- }
-
- if (!curMetaDataRow.equals(entry.getKey().getRow())) {
- // tablet must not have a prev end row, try scanning again
- break;
- }
-
- tm.put(entry.getKey(), entry.getValue());
-
- if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(entry.getKey())) {
- sawPrevEndRow = true;
- break;
- }
- }
-
- if (!sawPrevEndRow && tm.size() > 0) {
- log.warn("Metadata problem : tablet {} has no prev end row", curMetaDataRow);
- resetScanner();
- curMetaDataRow = null;
- tm.clear();
- sleepUninterruptibly(250, TimeUnit.MILLISECONDS);
- } else {
- break;
- }
- }
-
- return tm;
- }
-
- protected void resetScanner() {
-
- Range range;
-
- if (lastTablet == null) {
- range = this.range;
- } else {
- // check to see if the last tablet still exist
- range = new Range(lastTablet, true, lastTablet, true);
- scanner.setRange(range);
- int count = Iterators.size(scanner.iterator());
-
- if (count == 0)
- throw new TabletDeletedException("Tablet " + lastTablet + " was deleted while iterating");
-
- // start right after the last good tablet
- range = new Range(new Key(lastTablet).followingKey(PartialKey.ROW), true,
- this.range.getEndKey(), this.range.isEndKeyInclusive());
- }
-
- log.info("Resetting {} scanner to {}", MetadataTable.NAME, range);
-
- scanner.setRange(range);
- iter = scanner.iterator();
-
- }
-
-}
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
index 04f0c1a..7733298 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
@@ -33,18 +33,12 @@ import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.impl.Table;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.impl.KeyExtent;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
+import org.apache.accumulo.gc.GarbageCollectionEnvironment.Reference;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
-import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -170,21 +164,17 @@ public class GarbageCollectionAlgorithm {
}
- Iterator<Entry<Key,Value>> iter = gce.getReferenceIterator();
+ Iterator<Reference> iter = gce.getReferences().iterator();
while (iter.hasNext()) {
- Entry<Key,Value> entry = iter.next();
- Key key = entry.getKey();
- Text cft = key.getColumnFamily();
-
- if (cft.equals(DataFileColumnFamily.NAME) || cft.equals(ScanFileColumnFamily.NAME)) {
- String cq = key.getColumnQualifier().toString();
-
- String reference = cq;
- if (cq.startsWith("/")) {
- String tableID = new String(KeyExtent.tableOfMetadataRow(key.getRow()));
- reference = "/" + tableID + cq;
- } else if (!cq.contains(":") && !cq.startsWith("../")) {
- throw new RuntimeException("Bad file reference " + cq);
+ Reference ref = iter.next();
+
+ if (!ref.isDir) {
+
+ String reference = ref.ref;
+ if (reference.startsWith("/")) {
+ reference = "/" + ref.id + reference;
+ } else if (!reference.contains(":") && !reference.startsWith("../")) {
+ throw new RuntimeException("Bad file reference " + reference);
}
reference = makeRelative(reference, 3);
@@ -198,9 +188,9 @@ public class GarbageCollectionAlgorithm {
if (candidateMap.remove(dir) != null)
log.debug("Candidate was still in use: {}", reference);
- } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
- String tableID = new String(KeyExtent.tableOfMetadataRow(key.getRow()));
- String dir = entry.getValue().toString();
+ } else {
+ String tableID = ref.id.toString();
+ String dir = ref.ref;
if (!dir.contains(":")) {
if (!dir.startsWith("/"))
throw new RuntimeException("Bad directory " + dir);
@@ -211,9 +201,7 @@ public class GarbageCollectionAlgorithm {
if (candidateMap.remove(dir) != null)
log.debug("Candidate was still in use: {}", dir);
- } else
- throw new RuntimeException(
- "Scanner over metadata table returned unexpected column : " + entry.getKey());
+ }
}
confirmDeletesFromReplication(gce.getReplicationNeededIterator(),
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
index f9abfa1..1a8ec59 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
@@ -23,11 +23,13 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
+import java.util.stream.Stream;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.impl.Table;
+import org.apache.accumulo.core.client.impl.Table.ID;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
@@ -62,6 +64,18 @@ public interface GarbageCollectionEnvironment {
Iterator<String> getBlipIterator()
throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
+ static class Reference {
+ public final ID id;
+ public final String ref;
+ public final boolean isDir;
+
+ Reference(ID id, String ref, boolean isDir) {
+ this.id = id;
+ this.ref = ref;
+ this.isDir = isDir;
+ }
+ }
+
/**
* Fetches the references to files, {@link DataFileColumnFamily#NAME} or
* {@link ScanFileColumnFamily#NAME}, from tablets
@@ -69,7 +83,7 @@ public interface GarbageCollectionEnvironment {
* @return An {@link Iterator} of {@link Entry}<{@link Key}, {@link Value}> which constitute
* a reference to a file.
*/
- Iterator<Entry<Key,Value>> getReferenceIterator()
+ Stream<Reference> getReferences()
throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
/**
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 7ac9b59..3c60861 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -30,6 +30,7 @@ import java.util.SortedMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
@@ -58,10 +59,9 @@ import org.apache.accumulo.core.gc.thrift.GcCycleStats;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataScanner;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
@@ -102,7 +102,6 @@ import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.server.tables.TableManager;
import org.apache.accumulo.server.util.Halt;
-import org.apache.accumulo.server.util.TabletIterator;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -297,18 +296,23 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
}
@Override
- public Iterator<Entry<Key,Value>> getReferenceIterator()
+ public Stream<Reference> getReferences()
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
- IsolatedScanner scanner = new IsolatedScanner(
- getConnector().createScanner(tableName, Authorizations.EMPTY));
- scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
- scanner.fetchColumnFamily(ScanFileColumnFamily.NAME);
- TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
- TabletIterator tabletIterator = new TabletIterator(scanner,
- MetadataSchema.TabletsSection.getRange(), false, true);
-
- return Iterators
- .concat(Iterators.transform(tabletIterator, input -> input.entrySet().iterator()));
+
+ Stream<TabletMetadata> tabletStream = MetadataScanner.builder().from(getConnector())
+ .overTabletRange().checkConsistency().fetchDir().fetchFiles().fetchScans().build()
+ .stream();
+
+ Stream<Reference> refStream = tabletStream.flatMap(tm -> {
+ Stream<Reference> refs = Stream.concat(tm.getFiles().stream(), tm.getScans().stream())
+ .map(f -> new Reference(tm.getTableId(), f, false));
+ if (tm.getDir() != null) {
+ refs = Stream.concat(refs, Stream.of(new Reference(tm.getTableId(), tm.getDir(), true)));
+ }
+ return refs;
+ });
+
+ return refStream;
}
@Override
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
index a719378..6bedd68 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
@@ -27,18 +27,13 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.stream.Stream;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.impl.Table;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.impl.KeyExtent;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
-import org.apache.hadoop.io.Text;
import org.junit.Assert;
import org.junit.Test;
@@ -46,7 +41,7 @@ public class GarbageCollectionTest {
static class TestGCE implements GarbageCollectionEnvironment {
TreeSet<String> candidates = new TreeSet<>();
ArrayList<String> blips = new ArrayList<>();
- Map<Key,Value> references = new TreeMap<>();
+ Map<String,Reference> references = new TreeMap<>();
HashSet<Table.ID> tableIds = new HashSet<>();
ArrayList<String> deletes = new ArrayList<>();
@@ -69,8 +64,8 @@ public class GarbageCollectionTest {
}
@Override
- public Iterator<Entry<Key,Value>> getReferenceIterator() {
- return references.entrySet().iterator();
+ public Stream<Reference> getReferences() {
+ return references.values().stream();
}
@Override
@@ -89,41 +84,21 @@ public class GarbageCollectionTest {
tablesDirsToDelete.add(tableID);
}
- public Key newFileReferenceKey(String tableId, String endRow, String file) {
- String row = new KeyExtent(Table.ID.of(tableId), endRow == null ? null : new Text(endRow),
- null).getMetadataEntry().toString();
- String cf = MetadataSchema.TabletsSection.DataFileColumnFamily.NAME.toString();
- return new Key(row, cf, file);
+ public void addFileReference(String tableId, String endRow, String file) {
+ references.put(tableId + ":" + endRow + ":" + file,
+ new Reference(Table.ID.of(tableId), file, false));
}
- public Value addFileReference(String tableId, String endRow, String file) {
- Key key = newFileReferenceKey(tableId, endRow, file);
- Value val = new Value(new DataFileValue(0, 0).encode());
- return references.put(key, val);
+ public void removeFileReference(String tableId, String endRow, String file) {
+ references.remove(tableId + ":" + endRow + ":" + file);
}
- public Value removeFileReference(String tableId, String endRow, String file) {
- return references.remove(newFileReferenceKey(tableId, endRow, file));
+ public void addDirReference(String tableId, String endRow, String dir) {
+ references.put(tableId + ":" + endRow, new Reference(Table.ID.of(tableId), dir, true));
}
- Key newDirReferenceKey(String tableId, String endRow) {
- String row = new KeyExtent(Table.ID.of(tableId), endRow == null ? null : new Text(endRow),
- null).getMetadataEntry().toString();
- String cf = MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN
- .getColumnFamily().toString();
- String cq = MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN
- .getColumnQualifier().toString();
- return new Key(row, cf, cq);
- }
-
- public Value addDirReference(String tableId, String endRow, String dir) {
- Key key = newDirReferenceKey(tableId, endRow);
- Value val = new Value(dir.getBytes());
- return references.put(key, val);
- }
-
- public Value removeDirReference(String tableId, String endRow) {
- return references.remove(newDirReferenceKey(tableId, endRow));
+ public void removeDirReference(String tableId, String endRow) {
+ references.remove(tableId + ":" + endRow);
}
@Override
diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
index 41aa3d8..3511c82 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
@@ -68,6 +68,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletDeletedException;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
import org.apache.accumulo.core.replication.ReplicationTable;
@@ -92,7 +93,6 @@ import org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretM
import org.apache.accumulo.server.util.NamespacePropUtil;
import org.apache.accumulo.server.util.SystemPropUtil;
import org.apache.accumulo.server.util.TablePropUtil;
-import org.apache.accumulo.server.util.TabletIterator.TabletDeletedException;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
index bfd7fdd..1baac99 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
@@ -107,11 +107,10 @@ class LoadFiles extends MasterRepo {
Text startRow = loadMapEntry.getKey().getPrevEndRow();
- Iterable<TabletMetadata> tableMetadata = MetadataScanner.builder().from(master)
- .overUserTableId(tableId, startRow, null).fetchPrev().fetchLocation().fetchLoaded().build();
-
long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
- Iterator<TabletMetadata> tabletIter = tableMetadata.iterator();
+ Iterator<TabletMetadata> tabletIter = MetadataScanner.builder().from(master)
+ .overRange(tableId, startRow, null).checkConsistency().fetchPrev().fetchLocation()
+ .fetchLoaded().build().iterator();
List<TabletMetadata> tablets = new ArrayList<>();
TabletMetadata currentTablet = tabletIter.next();
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
index 345aa20..18ddab9 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
@@ -166,9 +166,9 @@ public class PrepBulkImport extends MasterRepo {
Iterators.transform(lmi, entry -> entry.getKey());
TabletIterFactory tabletIterFactory = startRow -> {
- Iterable<TabletMetadata> tableMetadata = MetadataScanner.builder().from(master)
- .overUserTableId(bulkInfo.tableId, startRow, null).build();
- return Iterators.transform(tableMetadata.iterator(), tm -> tm.getExtent());
+ return MetadataScanner.builder().from(master).overRange(bulkInfo.tableId, startRow, null)
+ .checkConsistency().fetchPrev().build().stream().map(TabletMetadata::getExtent)
+ .iterator();
};
checkForMerge(bulkInfo.tableId.canonicalID(),
diff --git a/test/src/main/java/org/apache/accumulo/test/CloneIT.java b/test/src/main/java/org/apache/accumulo/test/CloneIT.java
index 0c4c6a9..aa1b9b3 100644
--- a/test/src/main/java/org/apache/accumulo/test/CloneIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/CloneIT.java
@@ -35,10 +35,10 @@ import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletDeletedException;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.server.util.MetadataTableUtil;
-import org.apache.accumulo.server.util.TabletIterator;
import org.apache.hadoop.io.Text;
import org.junit.Test;
@@ -389,8 +389,6 @@ public class CloneIT extends AccumuloClusterHarness {
try {
MetadataTableUtil.checkClone(tableName, Table.ID.of("0"), Table.ID.of("1"), conn, bw2);
fail();
- } catch (TabletIterator.TabletDeletedException tde) {}
-
+ } catch (TabletDeletedException tde) {}
}
-
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
index 68a4b36..d585549 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
@@ -27,28 +27,18 @@ import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TimeType;
-import org.apache.accumulo.core.client.impl.Table;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.impl.KeyExtent;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Merge;
import org.apache.accumulo.harness.AccumuloClusterHarness;
-import org.apache.accumulo.server.util.TabletIterator;
-import org.apache.accumulo.server.util.TabletIterator.TabletDeletedException;
import org.apache.hadoop.io.Text;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
public class MergeIT extends AccumuloClusterHarness {
@@ -217,72 +207,4 @@ public class MergeIT extends AccumuloClusterHarness {
}
}
}
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
- private static class TestTabletIterator extends TabletIterator {
-
- private final Connector conn;
- private final String metadataTableName;
-
- public TestTabletIterator(Connector conn, String metadataTableName) throws Exception {
- super(conn.createScanner(metadataTableName, Authorizations.EMPTY),
- MetadataSchema.TabletsSection.getRange(), true, true);
- this.conn = conn;
- this.metadataTableName = metadataTableName;
- }
-
- @Override
- protected void resetScanner() {
- try (Scanner ds = conn.createScanner(metadataTableName, Authorizations.EMPTY)) {
-
- Text tablet = new KeyExtent(Table.ID.of("0"), new Text("m"), null).getMetadataEntry();
- ds.setRange(new Range(tablet, true, tablet, true));
-
- Mutation m = new Mutation(tablet);
-
- BatchWriter bw = conn.createBatchWriter(metadataTableName, new BatchWriterConfig());
- for (Entry<Key,Value> entry : ds) {
- Key k = entry.getKey();
- m.putDelete(k.getColumnFamily(), k.getColumnQualifier(), k.getTimestamp());
- }
- bw.addMutation(m);
- bw.close();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- super.resetScanner();
- }
-
- }
-
- // simulate a merge happening while iterating over tablets
- @Test
- public void testMerge() throws Exception {
- // create a fake metadata table
- String metadataTableName = getUniqueNames(1)[0];
- getConnector().tableOperations().create(metadataTableName);
-
- KeyExtent ke1 = new KeyExtent(Table.ID.of("0"), new Text("m"), null);
- Mutation mut1 = ke1.getPrevRowUpdateMutation();
- TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut1, new Value("/d1".getBytes()));
-
- KeyExtent ke2 = new KeyExtent(Table.ID.of("0"), null, null);
- Mutation mut2 = ke2.getPrevRowUpdateMutation();
- TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut2, new Value("/d2".getBytes()));
-
- BatchWriter bw1 = getConnector().createBatchWriter(metadataTableName, new BatchWriterConfig());
- bw1.addMutation(mut1);
- bw1.addMutation(mut2);
- bw1.close();
-
- TestTabletIterator tabIter = new TestTabletIterator(getConnector(), metadataTableName);
-
- exception.expect(TabletDeletedException.class);
- while (tabIter.hasNext()) {
- tabIter.next();
- }
- }
}
--
To stop receiving notification emails like this one, please contact
kturner@apache.org.