You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2023/04/28 02:25:38 UTC

[kudu] branch master updated: [java] KUDU-3455 Reduce space complexity of hash partition pruning for in-list predicate

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new b69dbeb6c [java] KUDU-3455 Reduce space complexity of hash partition pruning for in-list predicate
b69dbeb6c is described below

commit b69dbeb6c64d04a32ff0e9f7d59bed1fa8165124
Author: duyuqi <sh...@gmail.com>
AuthorDate: Wed Mar 1 15:48:29 2023 +0800

    [java] KUDU-3455 Reduce space complexity of hash partition pruning for in-list predicate
    
    Logic of pruning hash partitions for in-list predicate in Kudu Java client
    has a high space complexity, and it may cause java-client to go out of memory.
    At the same time, there are many deep copies for 'PartialRow', which makes the
    current algorithm slow.
    
    This patch fixes the problems and provides a recursive algorithm, that
    uses an approach like DFS-based algorithm to pick all combinations for
    every in-list columns and try to release PartialRow objects ASAP.
    
    At the same time, new algorithm has a good speedup by reducing lots of heavy
    copies of 'PartialRow' objects. A performance test case show that new
    algorithm has around 100x gain over older one when latter doesn't cause
    OOM.
    
    After Yifan Zhang's reminder, same problem about memory was found to be present
    in cpp-client per code-review. I'll study it later and fix it in another patch.
    
    Change-Id: Icd6f213cb705e1b2a001562cc7cebe4164281723
    Reviewed-on: http://gerrit.cloudera.org:8080/19568
    Reviewed-by: Yuqi Du <sh...@gmail.com>
    Tested-by: Kudu Jenkins
    Reviewed-by: Yifan Zhang <ch...@163.com>
    Reviewed-by: Yingchun Lai <la...@apache.org>
---
 .../org/apache/kudu/client/PartitionPruner.java    |  91 +++++--
 .../apache/kudu/client/TestPartitionPruner.java    | 282 +++++++++++++++++++++
 2 files changed, 354 insertions(+), 19 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
index 5b74d6e9e..f0a8e7649 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
@@ -345,6 +345,10 @@ public class PartitionPruner {
     return true;
   }
 
+  static List<Integer> idsToIndexesForTest(Schema schema, List<Integer> ids) {
+    return idsToIndexes(schema, ids);
+  }
+
   private static List<Integer> idsToIndexes(Schema schema, List<Integer> ids) {
     List<Integer> indexes = new ArrayList<>(ids.size());
     for (int id : ids) {
@@ -635,8 +639,17 @@ public class PartitionPruner {
     return result;
   }
 
+  // Just for test.
+  static BitSet pruneHashComponentV2ForTest(Schema schema,
+      PartitionSchema.HashBucketSchema hashSchema,
+      Map<String, KuduPredicate> predicates) {
+    return pruneHashComponent(schema, hashSchema, predicates);
+  }
+
   /**
-   * Search all combination of in-list and equality predicates for pruneable hash partitions.
+   * Search all combinations of in-list and equality predicates for prunable hash partitions.
+   * The method is an optimized version of 'TestPartitionPruner::pruneHashComponent'
+   * space-complexity wise.
    * @return a bitset containing {@code false} bits for hash buckets which may be pruned
    */
   private static BitSet pruneHashComponent(Schema schema,
@@ -644,6 +657,7 @@ public class PartitionPruner {
                                            Map<String, KuduPredicate> predicates) {
     BitSet hashBuckets = new BitSet(hashSchema.getNumBuckets());
     List<Integer> columnIdxs = idsToIndexes(schema, hashSchema.getColumnIds());
+    List<List<byte[]>> predicateValueList = new ArrayList<>();
     for (int idx : columnIdxs) {
       ColumnSchema column = schema.getColumnByIndex(idx);
       KuduPredicate predicate = predicates.get(column.getName());
@@ -653,34 +667,73 @@ public class PartitionPruner {
         hashBuckets.set(0, hashSchema.getNumBuckets());
         return hashBuckets;
       }
-    }
 
-    List<PartialRow> rows = Arrays.asList(schema.newPartialRow());
-    for (int idx : columnIdxs) {
-      List<PartialRow> newRows = new ArrayList<>();
-      ColumnSchema column = schema.getColumnByIndex(idx);
-      KuduPredicate predicate = predicates.get(column.getName());
       List<byte[]> predicateValues;
       if (predicate.getType() == KuduPredicate.PredicateType.EQUALITY) {
         predicateValues = Collections.singletonList(predicate.getLower());
       } else {
         predicateValues = Arrays.asList(predicate.getInListValues());
       }
-      // For each of the encoded string, replicate it by the number of values in
-      // equality and in-list predicate.
-      for (PartialRow row : rows) {
-        for (byte[] predicateValue : predicateValues) {
-          PartialRow newRow = new PartialRow(row);
-          newRow.setRaw(idx, predicateValue);
-          newRows.add(newRow);
-        }
-      }
-      rows = newRows;
+      predicateValueList.add(predicateValues);
+    }
+    List<byte[]> valuesCombination = new ArrayList<>();
+    computeHashBuckets(schema, hashSchema, hashBuckets,
+                       columnIdxs, predicateValueList, valuesCombination);
+    return hashBuckets;
+  }
+
+  /**
+   * pick all combinations and compute their hashes.
+   * @param schema the table schema
+   * @param hashSchema the hash partition schema.
+   * @param hashBuckets the result of this algorithm, a bit 0 means a partition can be pruned
+   * @param columnIdxs  column indexes of columns in the hash partition schema
+   * @param predicateValueList values in in-list predicates of these columns
+   * @param valuesCombination a combination of in-list and equality predicates
+   */
+  private static void computeHashBuckets(Schema schema,
+                                         PartitionSchema.HashBucketSchema hashSchema,
+                                         BitSet hashBuckets,
+                                         List<Integer> columnIdxs,
+                                         List<List<byte[]>> predicateValueList,
+                                         List<byte[]> valuesCombination) {
+    if (hashBuckets.cardinality() == hashSchema.getNumBuckets()) {
+      return;
     }
-    for (PartialRow row : rows) {
+    int level = valuesCombination.size();
+    if (level == columnIdxs.size()) {
+      // This 'valuesCombination' is a picked combination value for computing hash bucket.
+      //
+      // The algorithm is an argorithm like DFS, which pick value for every column in
+      // 'predicateValueList', 'valuesCombination' is the picked values.
+      //
+      // The valuesCombination is a value list picked by followings algorithm:
+      // 1. pick a value from predicateValueList[0] for the column who columnIdxs[0]
+      // stand for. Every value in predicateValueList[0] can be picked.
+      // The count of pick method is predicateValueList[0].size().
+      // 2. pick a value from predicateValueList[1] for the column who columnIdxs[1]
+      // stand for.
+      // The count of pick method is predicateValueList[1].size().
+      // 3. Do this like step 1,2 until the last one column' value picked in
+      // 'predicateValueList[columnIdx.size()-1]' columnIdx[columnIdx.size()-1] stand for.
+      //
+      // The algorithm ends when all combinations has been searched.
+      // 'valuesCombination' saves a combination values of in-list predicates.
+      // So we use the 'valuesCombination' to construct a row, then compute its hash bucket.
+      PartialRow row = schema.newPartialRow();
+      for (int i = 0; i < valuesCombination.size(); i++) {
+        row.setRaw(columnIdxs.get(i), valuesCombination.get(i));
+      }
       int hash = KeyEncoder.getHashBucket(row, hashSchema);
       hashBuckets.set(hash);
+      return;
+    }
+    for (int i = 0; i < predicateValueList.get(level).size(); i++) {
+      valuesCombination.add(predicateValueList.get(level).get(i));
+      computeHashBuckets(schema, hashSchema, hashBuckets,
+                         columnIdxs, predicateValueList, valuesCombination);
+      valuesCombination.remove(valuesCombination.size() - 1);
     }
-    return hashBuckets;
   }
+
 }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
index 32c68ee7c..962bef2b0 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
@@ -21,12 +21,20 @@ import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
 
 import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
@@ -35,6 +43,7 @@ import org.apache.kudu.client.KuduPredicate.ComparisonOp;
 import org.apache.kudu.test.KuduTestHarness;
 
 public class TestPartitionPruner {
+  public static final Logger LOG = LoggerFactory.getLogger(TestPartitionPruner.class);
 
   private KuduClient client;
 
@@ -46,6 +55,144 @@ public class TestPartitionPruner {
     client = harness.getClient();
   }
 
+  /**
+   * This method is an obsolete implementation of PartitionPruner.java#pruneHashComponent.
+   * The implementation is inefficient and may cause OOM.
+   *
+   * Search all combinations of in-list and equality predicates for pruneable hash partitions.
+   * This method is used just for test to check new algorithm's correctness.
+   * @deprecated we can remove it later, now just for test.
+   * @return a bitset containing {@code false} bits for hash buckets which may be pruned
+   */
+  private static BitSet pruneHashComponent(Schema schema,
+                                           PartitionSchema.HashBucketSchema hashSchema,
+                                           Map<String, KuduPredicate> predicates) {
+    BitSet hashBuckets = new BitSet(hashSchema.getNumBuckets());
+    List<Integer> columnIdxs =
+        PartitionPruner.idsToIndexesForTest(schema, hashSchema.getColumnIds());
+    for (int idx : columnIdxs) {
+      ColumnSchema column = schema.getColumnByIndex(idx);
+      KuduPredicate predicate = predicates.get(column.getName());
+      if (predicate == null ||
+          (predicate.getType() != KuduPredicate.PredicateType.EQUALITY &&
+           predicate.getType() != KuduPredicate.PredicateType.IN_LIST)) {
+        hashBuckets.set(0, hashSchema.getNumBuckets());
+        return hashBuckets;
+      }
+    }
+
+    List<PartialRow> rows = Arrays.asList(schema.newPartialRow());
+    for (int idx : columnIdxs) {
+      List<PartialRow> newRows = new ArrayList<>();
+      ColumnSchema column = schema.getColumnByIndex(idx);
+      KuduPredicate predicate = predicates.get(column.getName());
+      List<byte[]> predicateValues;
+      if (predicate.getType() == KuduPredicate.PredicateType.EQUALITY) {
+        predicateValues = Collections.singletonList(predicate.getLower());
+      } else {
+        predicateValues = Arrays.asList(predicate.getInListValues());
+      }
+      // For each of the encoded string, replicate it by the number of values in
+      // equality and in-list predicate.
+      for (PartialRow row : rows) {
+        for (byte[] predicateValue : predicateValues) {
+          PartialRow newRow = new PartialRow(row);
+          newRow.setRaw(idx, predicateValue);
+          newRows.add(newRow);
+        }
+      }
+      rows = newRows;
+    }
+    for (PartialRow row : rows) {
+      int hash = KeyEncoder.getHashBucket(row, hashSchema);
+      hashBuckets.set(hash);
+    }
+    return hashBuckets;
+  }
+
+  static class ReturnValueHelper {
+    private Schema schema;
+    private PartitionSchema partitionSchema;
+    private Map<String, KuduPredicate> predicates;
+
+    public ReturnValueHelper(Schema schema, PartitionSchema partitionSchema,
+        Map<String, KuduPredicate> predicates) {
+      this.schema = schema;
+      this.partitionSchema = partitionSchema;
+      this.predicates = predicates;
+    }
+  }
+
+  // Prepare test cases for unit tests to test large in-list predicates.
+  public List<ReturnValueHelper> prepareForLargeInListPredicates(KuduClient client,
+      String tablePrefix, int totalCount, int inListMaxLength) throws KuduException {
+    final int columnSize = 200;
+    String keyNamePrefix = "key";
+    final int keyColumnNumber = 10;
+    List<ColumnSchema> columnSchemas = new ArrayList<>();
+    List<String> keyColumnNames = new ArrayList<>();
+    List<ColumnSchema> keyColumnSchemas = new ArrayList<>();
+    for (int i = 0; i < columnSize; i++) {
+      boolean isKey = false;
+      String columnName = keyNamePrefix + i;
+      if (i < keyColumnNumber) {
+        isKey = true;
+      }
+      ColumnSchema columnSchema = new ColumnSchema.ColumnSchemaBuilder(columnName, Type.INT32)
+          .key(isKey).build();
+      if (isKey) {
+        keyColumnNames.add(columnName);
+        keyColumnSchemas.add(columnSchema);
+      }
+      columnSchemas.add(columnSchema);
+    }
+
+    final Schema schema = new Schema(columnSchemas);
+    CreateTableOptions tableBuilder = new CreateTableOptions();
+    tableBuilder.setRangePartitionColumns(new ArrayList<>());
+    tableBuilder.addHashPartitions(ImmutableList.of(keyColumnNames.get(0)), 2);
+    tableBuilder.addHashPartitions(ImmutableList.of(
+        keyColumnNames.get(1), keyColumnNames.get(2)), 2);
+    tableBuilder.addHashPartitions(
+        ImmutableList.of(keyColumnNames.get(3), keyColumnNames.get(4)), 3);
+    tableBuilder.addHashPartitions(
+        ImmutableList.of(
+            keyColumnNames.get(5), keyColumnNames.get(6), keyColumnNames.get(7),
+            keyColumnNames.get(8), keyColumnNames.get(9)),
+        2);
+
+    String tableName = tablePrefix + "-" + System.currentTimeMillis();
+    client.createTable(tableName, schema, tableBuilder);
+    KuduTable table = client.openTable(tableName);
+
+    final int keyColumnCount = schema.getPrimaryKeyColumnCount();
+    assertEquals(keyColumnNumber, keyColumnCount);
+    List<ReturnValueHelper> helpList = new ArrayList<>();
+    for (int index = 1; index <= totalCount; index++) {
+      List<List<Integer>> testCases = new ArrayList<>();
+      Random r = new Random(System.currentTimeMillis());
+      for (int i = 0; i < keyColumnCount; i++) {
+        int inListLength = 1 + r.nextInt(inListMaxLength);
+        List<Integer> testCase = new ArrayList<>();
+        for (int j = 0; j < inListLength; j++) {
+          testCase.add(r.nextInt());
+        }
+        testCases.add(testCase);
+      }
+
+      KuduScanner.KuduScannerBuilder scanBuilder = client.newScannerBuilder(table);
+      Schema scanSchema = scanBuilder.table.getSchema();
+      PartitionSchema partitionSchema = scanBuilder.table.getPartitionSchema();
+      for (int i = 0; i < keyColumnCount; i++) {
+        KuduPredicate pred = KuduPredicate.newInListPredicate(keyColumnSchemas.get(i),
+            ImmutableList.copyOf(testCases.get(i)));
+        scanBuilder.addPredicate(pred);
+      }
+      helpList.add(new ReturnValueHelper(scanSchema, partitionSchema, scanBuilder.predicates));
+    }
+    return helpList;
+  }
+
   /**
    * Counts the partitions touched by a scan with optional primary key bounds.
    * The table is assumed to have three INT8 columns as the primary key.
@@ -679,6 +826,76 @@ public class TestPartitionPruner {
         KuduPredicate.newInListPredicate(a, ImmutableList.of((byte) 0, (byte) 1)),
         KuduPredicate.newInListPredicate(b, ImmutableList.of((byte) 0, (byte) 1)),
         KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 1)));
+
+    // a in [0, 1, 2], b in [0, 1, 2], c in [0, 1, 2];
+    checkPartitions(8, 8, table, partitions,
+        KuduPredicate.newInListPredicate(a, ImmutableList.of((byte) 0, (byte) 1, (byte)2)),
+        KuduPredicate.newInListPredicate(b, ImmutableList.of((byte) 0, (byte) 1, (byte)2)),
+        KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 1, (byte)2)));
+
+    // a in [0, 1, 2, 3], b in [0, 1, 2, 3], c in [0, 1, 2, 3];
+    checkPartitions(8, 8, table, partitions,
+        KuduPredicate.newInListPredicate(
+            a, ImmutableList.of((byte) 0, (byte) 1, (byte) 2, (byte) 3)),
+        KuduPredicate.newInListPredicate(
+            b, ImmutableList.of((byte) 0, (byte) 1, (byte) 2, (byte) 3)),
+        KuduPredicate.newInListPredicate(
+            c, ImmutableList.of((byte) 0, (byte) 1, (byte) 2, (byte) 3)));
+
+    // The following test cases, we give more tests to make sure its correctess.
+    {
+      List<List<Integer>> expectedList = new ArrayList<>();
+      expectedList.add(ImmutableList.of(1, 1));
+      expectedList.add(ImmutableList.of(8, 8));
+      expectedList.add(ImmutableList.of(8, 8));
+      expectedList.add(ImmutableList.of(8, 8));
+      expectedList.add(ImmutableList.of(27, 1));
+      expectedList.add(ImmutableList.of(27, 1));
+      expectedList.add(ImmutableList.of(27, 1));
+      expectedList.add(ImmutableList.of(27, 1));
+      expectedList.add(ImmutableList.of(27, 1));
+      expectedList.add(ImmutableList.of(27, 1));
+
+      for (int size = 1; size <= 10; size++) {
+        int columnCount = schema.getColumnCount();
+        List<List<Byte>> testCases = new ArrayList<>();
+        for (int i = 0; i < columnCount; i++) {
+          List<Byte> testCase = new ArrayList<>();
+          for (int j = 0; j < size; j++) {
+            testCase.add((byte) j);
+          }
+          testCases.add(testCase);
+        }
+
+        KuduScanner.KuduScannerBuilder scanBuilder = client.newScannerBuilder(table);
+
+        List<ColumnSchema> columnSchemas = new ArrayList<>();
+        columnSchemas.add(a);
+        columnSchemas.add(b);
+        columnSchemas.add(c);
+        KuduPredicate[] predicates = new KuduPredicate[3];
+        for (int i = 0; i < 3; i++) {
+          predicates[i] = KuduPredicate.newInListPredicate(columnSchemas.get(i),
+              ImmutableList.copyOf(testCases.get(i)));
+          scanBuilder.addPredicate(predicates[i]);
+        }
+        checkPartitions(expectedList.get(size - 1).get(0),
+            expectedList.get(size - 1).get(1), table, partitions, predicates);
+        Schema scanSchema = scanBuilder.table.getSchema();
+        PartitionSchema partitionSchema = scanBuilder.table.getPartitionSchema();
+
+        List<PartitionSchema.HashBucketSchema> hashBucketSchemas =
+            partitionSchema.getHashBucketSchemas();
+        assertEquals(columnCount, hashBucketSchemas.size());
+        for (PartitionSchema.HashBucketSchema hashSchema : hashBucketSchemas) {
+          BitSet oldBitset = pruneHashComponent(scanSchema, hashSchema,
+              scanBuilder.predicates);
+          BitSet newBitset = PartitionPruner.pruneHashComponentV2ForTest(scanSchema, hashSchema,
+              scanBuilder.predicates);
+          Assert.assertEquals(oldBitset, newBitset);
+        }
+      }
+    }
   }
 
   @Test
@@ -745,6 +962,71 @@ public class TestPartitionPruner {
                     KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 1)));
   }
 
+  // This unit test to make sure to correctness of the newer algorithm compare
+  // with older algorithm. Generate a random list of values and make sure they can not cause
+  // OOM condition and check the two algorithms' results are the same.
+  // At the same time, compare the performance of the two algorithms.
+  @Test
+  public void testInListHashPartitionPruningUsingLargeList() throws Exception {
+    // To test normal case, this unit test should not go out of memory, so
+    // set totalCount = 100 and inListMaxLength = 10;
+    List<ReturnValueHelper> returnValues = this.prepareForLargeInListPredicates(client,
+        "testInListHashPartitionPruningUsingLargeList", 100, 10);
+    for (ReturnValueHelper returnValue : returnValues) {
+      long v1CostMillis = 0;
+      long v2CostMillis = 0;
+      long combinationCount = 1;
+      for (PartitionSchema.HashBucketSchema hashSchema :
+          returnValue.partitionSchema.getHashBucketSchemas()) {
+        long startTime = System.currentTimeMillis();
+        final BitSet oldBitset = pruneHashComponent(
+            returnValue.schema, hashSchema, returnValue.predicates);
+        v1CostMillis += (System.currentTimeMillis() - startTime);
+        startTime = System.currentTimeMillis();
+        final BitSet newBitset = PartitionPruner.pruneHashComponentV2ForTest(
+            returnValue.schema, hashSchema, returnValue.predicates);
+        v2CostMillis += (System.currentTimeMillis() - startTime);
+        Assert.assertEquals(oldBitset, newBitset);
+        combinationCount *= returnValue.predicates.size();
+      }
+      // v2 algorithm is more efficient than v1 algorithm.
+      // The following logs can compare v2 and v1.
+      // v2 (new algorithm) is quicker 100x than v1 (older one).
+      if (v2CostMillis != 0 && v1CostMillis != 0) {
+        LOG.info("combination_count: {}, old algorithm " +
+            "cost: {}ms, new algorithm cost: {}ms, speedup: {}",
+            combinationCount, v1CostMillis, v2CostMillis,
+            (double) v1CostMillis / v2CostMillis);
+      }
+    }
+  }
+
+  // The unit test to make sure that very long in-list predicates can cause
+  // OOM condition in v1 algorithm(older one), and using v2 algorithm(newer one) can solve it.
+  // For details on testing for the OOM condition, see the in-line
+  // TODO comment in the end this scenario.
+  @Test
+  public void testInListHashPartitionPruningUsingLargeListOOM() throws Exception {
+    // To test OOM case, set totalCount = 10 and inListMaxLength = 100;
+    List<ReturnValueHelper> returnValues = this.prepareForLargeInListPredicates(client,
+        "testInListHashPartitionPruningUsingLargeListOOM", 10, 100);
+    for (ReturnValueHelper returnValue : returnValues) {
+      for (PartitionSchema.HashBucketSchema hashSchema :
+          returnValue.partitionSchema.getHashBucketSchemas()) {
+        // TODO(duyuqi)
+        // How to add a test case for the oom?
+        // Comments:
+        // the org.apache.kudu.client.TestPartitionPruner >
+        //   testInListHashPartitionPruningUsingLargeListOOM FAILED
+        //   java.lang.OutOfMemoryError: GC overhead limit exceeded
+        // PartitionPruner.pruneHashComponentV1ForTest(scanSchema, hashSchema,
+        //     scanBuilder.predicates);
+        PartitionPruner.pruneHashComponentV2ForTest(returnValue.schema, hashSchema,
+            returnValue.predicates);
+      }
+    }
+  }
+
   @Test
   public void testPruning() throws Exception {
     // CREATE TABLE timeseries