You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2024/01/19 18:30:16 UTC

(accumulo) branch elasticity updated: Avoids buffering large split points in memory (#4173)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new f6c096bef1 Avoids buffering large split points in memory (#4173)
f6c096bef1 is described below

commit f6c096bef1c7809b698a9ed9b39d1f625c1e0ee3
Author: Keith Turner <kt...@apache.org>
AuthorDate: Fri Jan 19 13:30:10 2024 -0500

    Avoids buffering large split points in memory (#4173)
    
    When a tablet needs to split a set of split points is found. These split points must be under a desired threshold. The code that found these would buffer everything in memory and then remove any that were over the threshold. This commit avoids buffering anything that is over the threshold in the first place.
    
    Noticed this behavior while debugging #4172
---
 .../apache/accumulo/manager/split/SplitUtils.java  |  35 ++++---
 .../accumulo/manager/split/SplitUtilsTest.java     | 107 ++++++++++++---------
 2 files changed, 84 insertions(+), 58 deletions(-)

diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitUtils.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitUtils.java
index 8f1ac57729..bce10badb2 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitUtils.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitUtils.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.function.Predicate;
 
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ByteSequence;
@@ -218,19 +219,19 @@ public class SplitUtils {
 
     try (var indexIterable = new IndexIterable(context, tableConf, tabletMetadata.getFiles(),
         tabletMetadata.getEndRow(), tabletMetadata.getPrevEndRow())) {
-      var splits = findSplits(indexIterable, calculateDesiredSplits(estimatedSize, threshold));
 
-      splits.removeIf(split -> {
-        if (split.getLength() >= maxEndRowSize) {
+      Predicate<ByteSequence> splitPredicate = splitCandidate -> {
+        if (splitCandidate.length() >= maxEndRowSize) {
           log.warn("Ignoring split point for {} of length {}", tabletMetadata.getExtent(),
-              split.getLength());
-          return true;
+              splitCandidate.length());
+          return false;
         }
 
-        return false;
-      });
+        return true;
+      };
 
-      return splits;
+      return findSplits(indexIterable, calculateDesiredSplits(estimatedSize, threshold),
+          splitPredicate);
     }
   }
 
@@ -243,7 +244,8 @@ public class SplitUtils {
     return common;
   }
 
-  public static SortedSet<Text> findSplits(Iterable<Key> tabletIndexIterator, int desiredSplits) {
+  public static SortedSet<Text> findSplits(Iterable<Key> tabletIndexIterator, int desiredSplits,
+      Predicate<ByteSequence> rowPredicate) {
     Preconditions.checkArgument(desiredSplits >= 1);
 
     int numKeys = Iterables.size(tabletIndexIterator);
@@ -263,17 +265,24 @@ public class SplitUtils {
       }
 
       count++;
+
       if (count >= Math.round((splits.size() + 1) * interSplitDistance)) {
         if (prevRow == null) {
-          splits.add(key.getRow());
+          if (rowPredicate.test(key.getRowData())) {
+            splits.add(key.getRow());
+          }
         } else {
           var lcl = longestCommonLength(prevRow, key.getRowData());
           if (lcl + 1 >= key.getRowData().length()) {
-            splits.add(key.getRow());
+            if (rowPredicate.test(key.getRowData())) {
+              splits.add(key.getRow());
+            }
           } else {
-            splits.add(new Text(key.getRowData().subSequence(0, lcl + 1).toArray()));
+            var shortenedRow = key.getRowData().subSequence(0, lcl + 1);
+            if (rowPredicate.test(shortenedRow)) {
+              splits.add(new Text(shortenedRow.toArray()));
+            }
           }
-
         }
 
         if (splits.size() >= desiredSplits) {
diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitUtilsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitUtilsTest.java
index 66684a5333..80258e4914 100644
--- a/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitUtilsTest.java
+++ b/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitUtilsTest.java
@@ -33,9 +33,11 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
@@ -124,25 +126,26 @@ public class SplitUtilsTest {
     };
   }
 
+  public static SortedSet<Text> findSplits(Iterable<Key> tabletIndexIterator, int desiredSplits) {
+    return SplitUtils.findSplits(tabletIndexIterator, desiredSplits, sc -> true);
+  }
+
   @Test
   public void testFindSplits() {
     List<Key> keys = IntStream.range(1, 101).mapToObj(SplitUtilsTest::newKey).collect(toList());
-    assertEquals(newRowsSet(50), SplitUtils.findSplits(keys, 1));
-    assertEquals(newRowsSet(33, 67), SplitUtils.findSplits(keys, 2));
-    assertEquals(newRowsSet(25, 50, 75), SplitUtils.findSplits(keys, 3));
-    assertEquals(newRowsSet(20, 40, 60, 80), SplitUtils.findSplits(keys, 4));
-    assertEquals(newRowsSet(17, 33, 50, 67, 83), SplitUtils.findSplits(keys, 5));
-    assertEquals(newRowsSet(14, 29, 43, 57, 71, 86), SplitUtils.findSplits(keys, 6));
-    assertEquals(newRowsSet(13, 25, 38, 50, 63, 75, 88), SplitUtils.findSplits(keys, 7));
-    assertEquals(newRowsSet(IntStream.range(1, 10).map(i -> i * 10)),
-        SplitUtils.findSplits(keys, 9));
-    assertEquals(newRowsSet(IntStream.range(1, 20).map(i -> i * 5)),
-        SplitUtils.findSplits(keys, 19));
-    assertEquals(newRowsSet(IntStream.range(1, 50).map(i -> i * 2)),
-        SplitUtils.findSplits(keys, 49));
-    assertEquals(newRowsSet(IntStream.range(1, 100)), SplitUtils.findSplits(keys, 99));
-    assertEquals(newRowsSet(IntStream.range(1, 101)), SplitUtils.findSplits(keys, 100));
-    assertEquals(newRowsSet(IntStream.range(1, 101)), SplitUtils.findSplits(keys, 1000));
+    assertEquals(newRowsSet(50), findSplits(keys, 1));
+    assertEquals(newRowsSet(33, 67), findSplits(keys, 2));
+    assertEquals(newRowsSet(25, 50, 75), findSplits(keys, 3));
+    assertEquals(newRowsSet(20, 40, 60, 80), findSplits(keys, 4));
+    assertEquals(newRowsSet(17, 33, 50, 67, 83), findSplits(keys, 5));
+    assertEquals(newRowsSet(14, 29, 43, 57, 71, 86), findSplits(keys, 6));
+    assertEquals(newRowsSet(13, 25, 38, 50, 63, 75, 88), findSplits(keys, 7));
+    assertEquals(newRowsSet(IntStream.range(1, 10).map(i -> i * 10)), findSplits(keys, 9));
+    assertEquals(newRowsSet(IntStream.range(1, 20).map(i -> i * 5)), findSplits(keys, 19));
+    assertEquals(newRowsSet(IntStream.range(1, 50).map(i -> i * 2)), findSplits(keys, 49));
+    assertEquals(newRowsSet(IntStream.range(1, 100)), findSplits(keys, 99));
+    assertEquals(newRowsSet(IntStream.range(1, 101)), findSplits(keys, 100));
+    assertEquals(newRowsSet(IntStream.range(1, 101)), findSplits(keys, 1000));
   }
 
   @Test
@@ -150,42 +153,42 @@ public class SplitUtilsTest {
     List<Key> keys = IntStream.range(1, 101).map(i -> i / 10 * 10).mapToObj(SplitUtilsTest::newKey)
         .collect(toList());
 
-    assertEquals(newRowsSet(50), SplitUtils.findSplits(keys, 1));
-    assertEquals(newRowsSet(i -> i - 10, 30, 60), SplitUtils.findSplits(keys, 2));
-    assertEquals(newRowsSet(i -> i - 10, 20, 50, 70), SplitUtils.findSplits(keys, 3));
+    assertEquals(newRowsSet(50), findSplits(keys, 1));
+    assertEquals(newRowsSet(i -> i - 10, 30, 60), findSplits(keys, 2));
+    assertEquals(newRowsSet(i -> i - 10, 20, 50, 70), findSplits(keys, 3));
     assertEquals(newRowsSet(IntStream.range(1, 10).map(i -> i * 10), i -> i - 10),
-        SplitUtils.findSplits(keys, 9));
+        findSplits(keys, 9));
     assertEquals(newRowsSet(IntStream.range(0, 11).map(i -> i * 10), i -> i == 0 ? null : i - 10),
-        SplitUtils.findSplits(keys, 19));
+        findSplits(keys, 19));
     assertEquals(newRowsSet(IntStream.range(0, 11).map(i -> i * 10), i -> i == 0 ? null : i - 10),
-        SplitUtils.findSplits(keys, 100));
+        findSplits(keys, 100));
   }
 
   @Test
   public void testIndexIterator() {
     Iterable<Key> keys = newIndexIterable(IntStream.range(1, 101), null, null);
-    assertEquals(newRowsSet(50), SplitUtils.findSplits(keys, 1));
-    assertEquals(newRowsSet(25, 50, 75), SplitUtils.findSplits(keys, 3));
-    assertEquals(newRowsSet(14, 29, 43, 57, 71, 86), SplitUtils.findSplits(keys, 6));
-    assertEquals(newRowsSet(IntStream.range(1, 101)), SplitUtils.findSplits(keys, 200));
+    assertEquals(newRowsSet(50), findSplits(keys, 1));
+    assertEquals(newRowsSet(25, 50, 75), findSplits(keys, 3));
+    assertEquals(newRowsSet(14, 29, 43, 57, 71, 86), findSplits(keys, 6));
+    assertEquals(newRowsSet(IntStream.range(1, 101)), findSplits(keys, 200));
 
     keys = newIndexIterable(IntStream.range(1, 101), null, 50);
-    assertEquals(newRowsSet(75), SplitUtils.findSplits(keys, 1));
-    assertEquals(newRowsSet(67, 83), SplitUtils.findSplits(keys, 2));
-    assertEquals(newRowsSet(60, 70, 80, 90), SplitUtils.findSplits(keys, 4));
-    assertEquals(newRowsSet(IntStream.range(51, 101)), SplitUtils.findSplits(keys, 60));
+    assertEquals(newRowsSet(75), findSplits(keys, 1));
+    assertEquals(newRowsSet(67, 83), findSplits(keys, 2));
+    assertEquals(newRowsSet(60, 70, 80, 90), findSplits(keys, 4));
+    assertEquals(newRowsSet(IntStream.range(51, 101)), findSplits(keys, 60));
 
     keys = newIndexIterable(IntStream.range(1, 101), 50, null);
-    assertEquals(newRowsSet(25), SplitUtils.findSplits(keys, 1));
-    assertEquals(newRowsSet(17, 33), SplitUtils.findSplits(keys, 2));
-    assertEquals(newRowsSet(10, 20, 30, 40), SplitUtils.findSplits(keys, 4));
-    assertEquals(newRowsSet(IntStream.range(1, 51)), SplitUtils.findSplits(keys, 60));
+    assertEquals(newRowsSet(25), findSplits(keys, 1));
+    assertEquals(newRowsSet(17, 33), findSplits(keys, 2));
+    assertEquals(newRowsSet(10, 20, 30, 40), findSplits(keys, 4));
+    assertEquals(newRowsSet(IntStream.range(1, 51)), findSplits(keys, 60));
 
     keys = newIndexIterable(IntStream.range(1, 101), 75, 25);
-    assertEquals(newRowsSet(50), SplitUtils.findSplits(keys, 1));
-    assertEquals(newRowsSet(17 + 25, 33 + 25), SplitUtils.findSplits(keys, 2));
-    assertEquals(newRowsSet(35, 45, 55, 65), SplitUtils.findSplits(keys, 4));
-    assertEquals(newRowsSet(IntStream.range(26, 76)), SplitUtils.findSplits(keys, 60));
+    assertEquals(newRowsSet(50), findSplits(keys, 1));
+    assertEquals(newRowsSet(17 + 25, 33 + 25), findSplits(keys, 2));
+    assertEquals(newRowsSet(35, 45, 55, 65), findSplits(keys, 4));
+    assertEquals(newRowsSet(IntStream.range(26, 76)), findSplits(keys, 60));
   }
 
   @Test
@@ -194,8 +197,8 @@ public class SplitUtilsTest {
     // range falls between two index keys.
     Iterable<Key> keys = newIndexIterable(IntStream.range(1, 101).map(i -> i * 1000), 250, 150);
     assertFalse(keys.iterator().hasNext());
-    assertEquals(Set.of(), SplitUtils.findSplits(keys, 1));
-    assertEquals(Set.of(), SplitUtils.findSplits(keys, 2));
+    assertEquals(Set.of(), findSplits(keys, 1));
+    assertEquals(Set.of(), findSplits(keys, 2));
   }
 
   @Test
@@ -220,11 +223,25 @@ public class SplitUtilsTest {
     var keys = newIndexIterable(
         Stream.of("aa11", "aa112", "b", "bg45", "ct", "cz7882", "mn", "mn009", "mnrtssd", "mnz076"),
         null, null);
-    assertEquals(newRowSet("c"), SplitUtils.findSplits(keys, 1));
-    assertEquals(newRowSet("b", "m"), SplitUtils.findSplits(keys, 2));
-    assertEquals(newRowSet("b", "c", "mn0"), SplitUtils.findSplits(keys, 3));
-    assertEquals(newRowSet("aa112", "bg", "cz", "mn0"), SplitUtils.findSplits(keys, 4));
+    assertEquals(newRowSet("c"), findSplits(keys, 1));
+    assertEquals(newRowSet("b", "m"), findSplits(keys, 2));
+    assertEquals(newRowSet("b", "c", "mn0"), findSplits(keys, 3));
+    assertEquals(newRowSet("aa112", "bg", "cz", "mn0"), findSplits(keys, 4));
     assertEquals(newRowSet("aa11", "aa112", "b", "bg", "c", "cz", "m", "mn0", "mnr", "mnz"),
-        SplitUtils.findSplits(keys, 10));
+        findSplits(keys, 10));
+  }
+
+  @Test
+  public void testSplitFilter() {
+    List<Key> keys = IntStream.range(1, 101).mapToObj(SplitUtilsTest::newKey).collect(toList());
+    Predicate<ByteSequence> splitFilter = splitCandidate -> {
+      int i = Integer.parseInt(splitCandidate.toString());
+      return i % 3 != 0;
+    };
+
+    assertEquals(newRowsSet(50), SplitUtils.findSplits(keys, 1, splitFilter));
+    assertEquals(newRowsSet(34, 67), SplitUtils.findSplits(keys, 2, splitFilter));
+    assertEquals(newRowsSet(25, 50, 76), SplitUtils.findSplits(keys, 3, splitFilter));
+    assertEquals(newRowsSet(20, 40, 61, 80), SplitUtils.findSplits(keys, 4, splitFilter));
   }
 }