You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2022/10/25 22:24:31 UTC
[accumulo] branch 2.1 updated: Prevents bulk import from hanging. (#3044)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new d0d7b585ea Prevents bulk import from hanging. (#3044)
d0d7b585ea is described below
commit d0d7b585ea7bace2f86d5168058aaf5f33eda69c
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue Oct 25 23:24:24 2022 +0100
Prevents bulk import from hanging. (#3044)
Fixes a bug where bulk import would hang when the first row of the file was
equal to the last row of the first tablet.
---
.../clientImpl/bulk/ConcurrentKeyExtentCache.java | 6 +++--
.../core/metadata/schema/TabletsMetadata.java | 24 ++++++++++++++++++--
.../apache/accumulo/test/functional/BulkNewIT.java | 26 ++++++++++++++++++++++
3 files changed, 52 insertions(+), 4 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java
index ec154e7dbe..3f5763c10b 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java
@@ -88,13 +88,15 @@ class ConcurrentKeyExtentCache implements KeyExtentCache {
@VisibleForTesting
protected Stream<KeyExtent> lookupExtents(Text row) {
- return TabletsMetadata.builder(ctx).forTable(tableId).overlapping(row, null).checkConsistency()
- .fetch(PREV_ROW).build().stream().limit(100).map(TabletMetadata::getExtent);
+ return TabletsMetadata.builder(ctx).forTable(tableId).overlapping(row, true, null)
+ .checkConsistency().fetch(PREV_ROW).build().stream().limit(100)
+ .map(TabletMetadata::getExtent);
}
@Override
public KeyExtent lookup(Text row) {
while (true) {
+
KeyExtent ke = getFromCache(row);
if (ke != null)
return ke;
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index 36caa39e90..e586b1a6ea 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -356,12 +356,19 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
}
@Override
- public Options overlapping(Text startRow, Text endRow) {
- this.range = new KeyExtent(tableId, null, startRow).toMetaRange();
+ public Options overlapping(Text startRow, boolean startInclusive, Text endRow) {
+ var encRow = TabletsSection.encodeRow(tableId, startRow == null ? new Text("") : startRow);
+ this.range = new Range(encRow, startRow == null ? true : startInclusive, null, true);
this.endRow = endRow;
+
return this;
}
+ @Override
+ public Options overlapping(Text startRow, Text endRow) {
+ return overlapping(startRow, false, endRow);
+ }
+
@Override
public Options saveKeyValues() {
this.saveKeyValues = true;
@@ -465,8 +472,21 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
* Limit to tablets that overlap the range {@code (startRow, endRow]}. Can pass null
* representing -inf and +inf. The impl creates open ended ranges which may be problematic, see
* #813.
+ *
+ * <p>
+ * This method is equivalent to calling {@link #overlapping(Text, boolean, Text)} as
+ * {@code overlapping(startRow, false, endRow)}
+ * </p>
*/
Options overlapping(Text startRow, Text endRow);
+
+ /**
+ * When {@code startRowInclusive} is true limits to tablets that overlap the range
+ * {@code [startRow,endRow]}. When {@code startRowInclusive} is false limits to tablets that
+ * overlap the range {@code (startRow, endRow]}. Can pass null for start and end row
+ * representing -inf and +inf.
+ */
+ Options overlapping(Text startRow, boolean startRowInclusive, Text endRow);
}
private static class TabletMetadataIterator implements Iterator<TabletMetadata> {
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index 927f17993c..afa0afc677 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -480,6 +480,32 @@ public class BulkNewIT extends SharedMiniClusterBase {
}
}
+ /*
+ * This test imports a file where the first row of the file is equal to the last row of the first
+ * tablet. There was a bug where this scenario would cause bulk import to hang forever.
+ */
+ @Test
+ public void testEndOfFirstTablet() throws Exception {
+ try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+ String dir = getDir("/testBulkFile-");
+ FileSystem fs = getCluster().getFileSystem();
+ fs.mkdirs(new Path(dir));
+
+ addSplits(c, tableName, "0333");
+
+ var h1 = writeData(dir + "/f1.", aconf, 333, 333);
+
+ c.tableOperations().importDirectory(dir).to(tableName).load();
+
+ verifyData(c, tableName, 333, 333, false);
+
+ Map<String,Set<String>> hashes = new HashMap<>();
+ hashes.put("0333", Set.of(h1));
+ hashes.put("null", Set.of());
+ verifyMetadata(c, tableName, hashes);
+ }
+ }
+
private void addSplits(AccumuloClient client, String tableName, String splitString)
throws Exception {
SortedSet<Text> splits = new TreeSet<>();