You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/10/30 01:02:03 UTC

[pinot] branch master updated: Allow MV Field Support For Raw Columns in Text Indices (#7638)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5becf5b  Allow MV Field Support For Raw Columns in Text Indices  (#7638)
5becf5b is described below

commit 5becf5b81c996a6be39a22fd99aacfa7f1b3e1ac
Author: Atri Sharma <at...@gmail.com>
AuthorDate: Sat Oct 30 06:31:49 2021 +0530

    Allow MV Field Support For Raw Columns in Text Indices  (#7638)
    
    Using the new MV raw byte forward index, allow multi value fields to be supported in text indices.
---
 .../pinot/queries/TextSearchQueriesTest.java       | 204 ++++++---------------
 .../creator/impl/SegmentColumnarIndexCreator.java  | 154 ++++++++--------
 .../impl/inv/text/LuceneFSTIndexCreator.java       |   5 +
 .../creator/impl/text/LuceneTextIndexCreator.java  |  20 ++
 .../local/segment/index/loader/LoaderUtils.java    |  13 +-
 .../loader/invertedindex/TextIndexHandler.java     |  85 ++++++---
 .../utils/nativefst/NativeFSTIndexCreator.java     |   5 +
 .../index/loader/SegmentPreProcessorTest.java      |  41 +++--
 .../resources/data/newColumnsSchemaWithText.json   |  10 +
 .../spi/index/creator/TextIndexCreator.java        |   5 +
 10 files changed, 280 insertions(+), 262 deletions(-)

diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
index ef2f054..98f3bc5 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
@@ -31,9 +31,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
@@ -96,14 +98,16 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
   private static final String SKILLS_TEXT_COL_DICT_NAME = "SKILLS_TEXT_COL_DICT";
   private static final String SKILLS_TEXT_COL_MULTI_TERM_NAME = "SKILLS_TEXT_COL_1";
   private static final String SKILLS_TEXT_NO_RAW_NAME = "SKILLS_TEXT_COL_2";
+  private static final String SKILLS_TEXT_MV_COL_NAME = "SKILLS_TEXT_MV_COL";
+  private static final String SKILLS_TEXT_MV_COL_DICT_NAME = "SKILLS_TEXT_MV_COL_DICT";
   private static final String INT_COL_NAME = "INT_COL";
-  private static final List<String> RAW_TEXT_INDEX_COLUMNS = Arrays
-      .asList(QUERY_LOG_TEXT_COL_NAME, SKILLS_TEXT_COL_NAME, SKILLS_TEXT_COL_MULTI_TERM_NAME, SKILLS_TEXT_NO_RAW_NAME);
-  private static final List<String> DICT_TEXT_INDEX_COLUMNS = Arrays.asList(SKILLS_TEXT_COL_DICT_NAME);
+  private static final List<String> RAW_TEXT_INDEX_COLUMNS =
+      Arrays.asList(QUERY_LOG_TEXT_COL_NAME, SKILLS_TEXT_COL_NAME, SKILLS_TEXT_COL_MULTI_TERM_NAME,
+          SKILLS_TEXT_NO_RAW_NAME, SKILLS_TEXT_MV_COL_NAME);
+  private static final List<String> DICT_TEXT_INDEX_COLUMNS =
+      Arrays.asList(SKILLS_TEXT_COL_DICT_NAME, SKILLS_TEXT_MV_COL_DICT_NAME);
   private static final int INT_BASE_VALUE = 1000;
 
-  private final List<GenericRow> _rows = new ArrayList<>();
-
   private IndexSegment _indexSegment;
   private List<IndexSegment> _indexSegments;
 
@@ -157,8 +161,8 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
 
     List<FieldConfig> fieldConfigs = new ArrayList<>(RAW_TEXT_INDEX_COLUMNS.size() + DICT_TEXT_INDEX_COLUMNS.size());
     for (String textIndexColumn : RAW_TEXT_INDEX_COLUMNS) {
-      fieldConfigs
-          .add(new FieldConfig(textIndexColumn, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, null));
+      fieldConfigs.add(
+          new FieldConfig(textIndexColumn, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, null));
     }
     for (String textIndexColumn : DICT_TEXT_INDEX_COLUMNS) {
       fieldConfigs.add(
@@ -174,6 +178,8 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
         .addSingleValueDimension(SKILLS_TEXT_COL_DICT_NAME, FieldSpec.DataType.STRING)
         .addSingleValueDimension(SKILLS_TEXT_COL_MULTI_TERM_NAME, FieldSpec.DataType.STRING)
         .addSingleValueDimension(SKILLS_TEXT_NO_RAW_NAME, FieldSpec.DataType.STRING)
+        .addMultiValueDimension(SKILLS_TEXT_MV_COL_NAME, FieldSpec.DataType.STRING)
+        .addMultiValueDimension(SKILLS_TEXT_MV_COL_DICT_NAME, FieldSpec.DataType.STRING)
         .addMetric(INT_COL_NAME, FieldSpec.DataType.INT).build();
     SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
     config.setOutDir(INDEX_DIR.getPath());
@@ -197,24 +203,22 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
     List<GenericRow> rows = new ArrayList<>();
 
     // read the skills file
-    URL resourceUrl = getClass().getClassLoader().getResource("data/text_search_data/skills.txt");
-    File skillFile = new File(resourceUrl.getFile());
     String[] skills = new String[100];
+    List<String[]> multiValueStringList = new ArrayList<>();
     int skillCount = 0;
-    try (InputStream inputStream = new FileInputStream(skillFile);
-        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
+    try (BufferedReader reader = new BufferedReader(new InputStreamReader(
+        Objects.requireNonNull(getClass().getClassLoader().getResourceAsStream("data/text_search_data/skills.txt"))))) {
       String line;
       while ((line = reader.readLine()) != null) {
         skills[skillCount++] = line;
+        multiValueStringList.add(StringUtils.splitByWholeSeparator(line, ", "));
       }
     }
 
     // read the pql query log file (24k queries) and build dataset
-    resourceUrl = getClass().getClassLoader().getResource("data/text_search_data/pql_query1.txt");
-    File logFile = new File(resourceUrl.getFile());
     int counter = 0;
-    try (InputStream inputStream = new FileInputStream(logFile);
-        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
+    try (BufferedReader reader = new BufferedReader(new InputStreamReader(Objects.requireNonNull(
+        getClass().getClassLoader().getResourceAsStream("data/text_search_data/pql_query1.txt"))))) {
       String line;
       while ((line = reader.readLine()) != null) {
         GenericRow row = new GenericRow();
@@ -224,12 +228,16 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
           row.putValue(SKILLS_TEXT_COL_NAME, "software engineering");
           row.putValue(SKILLS_TEXT_COL_DICT_NAME, "software engineering");
           row.putValue(SKILLS_TEXT_COL_MULTI_TERM_NAME, "software engineering");
-          row.putValue(SKILLS_TEXT_COL_MULTI_TERM_NAME, "software engineering");
+          row.putValue(SKILLS_TEXT_NO_RAW_NAME, "software engineering");
+          row.putValue(SKILLS_TEXT_MV_COL_NAME, new String[]{"software", "engineering"});
+          row.putValue(SKILLS_TEXT_MV_COL_DICT_NAME, new String[]{"software", "engineering"});
         } else {
           row.putValue(SKILLS_TEXT_COL_NAME, skills[counter]);
           row.putValue(SKILLS_TEXT_COL_DICT_NAME, skills[counter]);
           row.putValue(SKILLS_TEXT_COL_MULTI_TERM_NAME, skills[counter]);
           row.putValue(SKILLS_TEXT_NO_RAW_NAME, skills[counter]);
+          row.putValue(SKILLS_TEXT_MV_COL_NAME, multiValueStringList.get(counter));
+          row.putValue(SKILLS_TEXT_MV_COL_DICT_NAME, multiValueStringList.get(counter));
         }
         rows.add(row);
         counter++;
@@ -316,13 +324,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + "management, docker image building and distribution"
     });
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Distributed systems\"') "
-            + "LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"distributed systems\"') LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("\"Distributed systems\"", expected);
 
     // TEST 5: phrase query
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase
@@ -339,13 +341,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + "management, docker image building and distribution"
     });
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"query processing\"') LIMIT"
-            + " 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"query processing\"') LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("\"query processing\"", expected);
 
     // TEST 6: phrase query
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain the phrase "machine
@@ -395,13 +391,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + "management, docker image building and distribution"
     });
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\"') LIMIT"
-            + " 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\"') LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("\"Machine learning\"", expected);
 
     // TEST 7: composite phrase query using boolean operator AND
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain two independent phrases
@@ -420,15 +410,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + "performance scalable systems"
     });
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND "
-            + "\"Tensor flow\"') LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query =
-        "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND \"Tensor flow\"') "
-            + "LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("\"Machine learning\" AND \"Tensor flow\"", expected);
 
     // TEST 8: term query
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain term 'Java'.
@@ -481,11 +463,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + "distributed storage, concurrency, multi-threading, apache airflow"
     });
 
-    query = "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'Java') LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'Java') LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("Java", expected);
 
     // TEST 9: composite term query using BOOLEAN operator AND
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain two independent
@@ -529,12 +507,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + "distributed storage, concurrency, multi-threading, apache airflow"
     });
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'Java AND C++') LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'Java AND C++') LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("Java AND C++", expected);
 
     // TEST 10: phrase query
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase "Java C++" as is.
@@ -563,12 +536,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + "multi-threading, apache airflow"
     });
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Java C++\"') LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Java C++\"') LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("\"Java C++\"", expected);
 
     // TEST 11: composite phrase query using boolean operator AND.
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain two independent phrases
@@ -581,15 +549,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + "performance scalable systems"
     });
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND "
-            + "\"gpu processing\"') LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query =
-        "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND \"gpu "
-            + "processing\"') LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("\"Machine learning\" AND \"gpu processing\"", expected);
 
     // TEST 12: composite phrase and term query using boolean operator AND.
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase "machine learning"
@@ -608,14 +568,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + "performance scalable systems"
     });
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND "
-            + "gpu') LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query =
-        "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND gpu') LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("\"Machine learning\" AND gpu", expected);
 
     // TEST 13: composite phrase and term query using boolean operator AND
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase "machine learning"
@@ -635,15 +588,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + "performance scalable systems"
     });
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND gpu"
-            + " AND python') LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query =
-        "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND gpu AND python') "
-            + "LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("\"Machine learning\" AND gpu AND python", expected);
 
     // TEST 14: term query
     // Search in SKILLS_TEXT_COL column to look for documents that MUST contain term 'apache'. The expected result
@@ -678,11 +623,8 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
         "Databases, columnar query processing, Apache Arrow, distributed systems, Machine learning, cluster "
             + "management, docker image building and distribution"
     });
-    query = "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'apache') LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
 
-    query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'apache') LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("apache", expected);
 
     // TEST 15: composite phrase and term query using boolean operator AND.
     // search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase "distributed
@@ -701,15 +643,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + "management, docker image building and distribution"
     });
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"distributed systems\" AND "
-            + "apache') LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query =
-        "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"distributed systems\" AND apache') LIMIT "
-            + "50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("\"distributed systems\" AND apache", expected);
 
     // TEST 16: term query
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain term 'database'.
@@ -744,11 +678,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + " building large scale systems"
     });
 
-    query = "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'database') LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'database') LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("database", expected);
 
     // TEST 17: phrase query
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase "database engine"
@@ -763,13 +693,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + " building large scale systems"
     });
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"database engine\"') LIMIT "
-            + "50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"database engine\"') LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("\"database engine\"", expected);
 
     // TEST 18: phrase query
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase
@@ -790,13 +714,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + "multi-threading, C++,"
     });
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"publish subscribe\"') "
-            + "LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"publish subscribe\"') LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("\"publish subscribe\"", expected);
 
     // TEST 19: phrase query
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase
@@ -805,14 +723,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
     expected = new ArrayList<>();
     expected.add(new Serializable[]{1000, "Accounts, Banking, Insurance, worked in NGO, Java"});
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"accounts banking "
-            + "insurance\"') LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query =
-        "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"accounts banking insurance\"') LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("\"accounts banking insurance\"", expected);
 
     // TEST 20: composite term query with boolean operator AND
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain terms 'accounts'
@@ -826,15 +737,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
     expected.add(new Serializable[]{1001, "Accounts, Banking, Finance, Insurance"});
     expected.add(new Serializable[]{1002, "Accounts, Finance, Banking, Insurance"});
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'accounts AND banking AND "
-            + "insurance') LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-
-    query =
-        "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'accounts AND banking AND insurance') LIMIT "
-            + "50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("accounts AND banking AND insurance", expected);
 
     // TEST 21: composite phrase and term query using boolean operator AND.
     // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain ALL the following skills:
@@ -853,14 +756,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
             + " concurrency, multi-threading, C++, CPU processing, Java"
     });
 
-    query =
-        "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"distributed systems\" AND "
-            + "Java AND C++') LIMIT 50000";
-    testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
-    query =
-        "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"distributed systems\" AND Java AND C++') "
-            + "LIMIT 50000";
-    testTextSearchAggregationQueryHelper(query, expected.size());
+    testSkillsColumn("\"distributed systems\" AND Java AND C++", expected);
 
     // test for the index configured to use AND as the default
     // conjunction operator
@@ -1802,6 +1698,22 @@ public class TextSearchQueriesTest extends BaseQueriesTest {
     Assert.assertEquals(expectedCount, count);
   }
 
+  private void testSkillsColumn(String searchQuery, List<Serializable[]> expected)
+      throws Exception {
+    for (String skillColumn : Arrays.asList(SKILLS_TEXT_COL_NAME, SKILLS_TEXT_COL_DICT_NAME,
+        SKILLS_TEXT_COL_MULTI_TERM_NAME, SKILLS_TEXT_NO_RAW_NAME, SKILLS_TEXT_MV_COL_NAME,
+        SKILLS_TEXT_MV_COL_DICT_NAME)) {
+      String query =
+          String.format("SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(%s, '%s') LIMIT 50000",
+              skillColumn, searchQuery);
+      testTextSearchSelectQueryHelper(query, expected.size(), false, expected);
+
+      query = String.format("SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(%s, '%s') LIMIT 50000", skillColumn,
+          searchQuery);
+      testTextSearchAggregationQueryHelper(query, expected.size());
+    }
+  }
+
   @Test
   public void testInterSegment() {
     String query =
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 0ea31d5..d2ac484 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -166,9 +166,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
     Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs();
     for (String columnName : h3IndexConfigs.keySet()) {
-      Preconditions
-          .checkState(schema.hasColumn(columnName), "Cannot create H3 index for column: %s because it is not in schema",
-              columnName);
+      Preconditions.checkState(schema.hasColumn(columnName),
+          "Cannot create H3 index for column: %s because it is not in schema", columnName);
     }
 
     // Initialize creators for dictionary, forward index and inverted index
@@ -206,8 +205,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         int cardinality = indexCreationInfo.getDistinctValueCount();
         if (fieldSpec.isSingleValueField()) {
           if (indexCreationInfo.isSorted()) {
-            _forwardIndexCreatorMap
-                .put(columnName, new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality));
+            _forwardIndexCreatorMap.put(columnName,
+                new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality));
           } else {
             _forwardIndexCreatorMap.put(columnName,
                 new SingleValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs));
@@ -221,8 +220,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         // Initialize inverted index creator; skip creating inverted index if sorted
         if (invertedIndexColumns.contains(columnName) && !indexCreationInfo.isSorted()) {
           if (segmentCreationSpec.isOnHeap()) {
-            _invertedIndexCreatorMap
-                .put(columnName, new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality));
+            _invertedIndexCreatorMap.put(columnName,
+                new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality));
           } else {
             _invertedIndexCreatorMap.put(columnName,
                 new OffHeapBitmapInvertedIndexCreator(_indexDir, fieldSpec, cardinality, _totalDocs,
@@ -254,21 +253,19 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
       if (textIndexColumns.contains(columnName)) {
         // Initialize text index creator
-        Preconditions.checkState(fieldSpec.isSingleValueField(),
-            "Text index is currently only supported on single-value columns");
-        Preconditions
-            .checkState(storedType == DataType.STRING, "Text index is currently only supported on STRING type columns");
-        _textIndexCreatorMap
-            .put(columnName, new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */));
+        Preconditions.checkState(storedType == DataType.STRING,
+            "Text index is currently only supported on STRING type columns");
+        _textIndexCreatorMap.put(columnName,
+            new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */));
       }
 
       if (fstIndexColumns.contains(columnName)) {
         Preconditions.checkState(fieldSpec.isSingleValueField(),
             "FST index is currently only supported on single-value columns");
-        Preconditions
-            .checkState(storedType == DataType.STRING, "FST index is currently only supported on STRING type columns");
-        Preconditions
-            .checkState(dictEnabledColumn, "FST index is currently only supported on dictionary-encoded columns");
+        Preconditions.checkState(storedType == DataType.STRING,
+            "FST index is currently only supported on STRING type columns");
+        Preconditions.checkState(dictEnabledColumn,
+            "FST index is currently only supported on dictionary-encoded columns");
         _fstIndexCreatorMap.put(columnName, new LuceneFSTIndexCreator(_indexDir, columnName,
             (String[]) indexCreationInfo.getSortedUniqueElementsArray()));
       }
@@ -276,8 +273,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       if (jsonIndexColumns.contains(columnName)) {
         Preconditions.checkState(fieldSpec.isSingleValueField(),
             "Json index is currently only supported on single-value columns");
-        Preconditions
-            .checkState(storedType == DataType.STRING, "Json index is currently only supported on STRING columns");
+        Preconditions.checkState(storedType == DataType.STRING,
+            "Json index is currently only supported on STRING columns");
         JsonIndexCreator jsonIndexCreator =
             segmentCreationSpec.isOnHeap() ? new OnHeapJsonIndexCreator(_indexDir, columnName)
                 : new OffHeapJsonIndexCreator(_indexDir, columnName);
@@ -286,8 +283,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
       H3IndexConfig h3IndexConfig = h3IndexConfigs.get(columnName);
       if (h3IndexConfig != null) {
-        Preconditions
-            .checkState(fieldSpec.isSingleValueField(), "H3 index is currently only supported on single-value columns");
+        Preconditions.checkState(fieldSpec.isSingleValueField(),
+            "H3 index is currently only supported on single-value columns");
         Preconditions.checkState(storedType == DataType.BYTES, "H3 index is currently only supported on BYTES columns");
         H3IndexResolution resolution = h3IndexConfig.getResolution();
         GeoSpatialIndexCreator h3IndexCreator =
@@ -308,8 +305,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       Map<String, Map<String, String>> columnProperties) {
     if (columnProperties != null) {
       Map<String, String> properties = columnProperties.get(columnName);
-      return properties != null && Boolean
-          .parseBoolean(properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY));
+      return properties != null && Boolean.parseBoolean(
+          properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY));
     }
     return false;
   }
@@ -397,7 +394,22 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       // text-index
       TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName);
       if (textIndexCreator != null) {
-        textIndexCreator.add((String) columnValueToIndex);
+        if (fieldSpec.isSingleValueField()) {
+          textIndexCreator.add((String) columnValueToIndex);
+        } else {
+          Object[] values = (Object[]) columnValueToIndex;
+          int length = values.length;
+          if (values instanceof String[]) {
+            textIndexCreator.add((String[]) values, length);
+          } else {
+            String[] strings = new String[length];
+            for (int i = 0; i < length; i++) {
+              strings[i] = (String) values[i];
+            }
+            textIndexCreator.add(strings, length);
+            columnValueToIndex = strings;
+          }
+        }
       }
 
       if (fieldSpec.isSingleValueField()) {
@@ -461,8 +473,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
           //dictionary encoded
           int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
           forwardIndexCreator.putDictIdMV(dictIds);
-          DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap
-              .get(columnName);
+          DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName);
           if (invertedIndexCreator != null) {
             invertedIndexCreator.add(dictIds, dictIds.length);
           }
@@ -470,76 +481,69 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
           // for text index on raw columns, check the config to determine if actual raw value should
           // be stored or not
           if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) {
-            Object value = _columnProperties.get(columnName)
-                .get(FieldConfig.TEXT_INDEX_RAW_VALUE);
+            Object value = _columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE);
             if (value == null) {
               value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
             }
             if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) {
-              columnValueToIndex = new String[] {String.valueOf(value)};
+              columnValueToIndex = new String[]{String.valueOf(value)};
             } else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) {
-              columnValueToIndex = new byte[][] {String.valueOf(value).getBytes(UTF_8)};
+              columnValueToIndex = new byte[][]{String.valueOf(value).getBytes(UTF_8)};
             } else {
               throw new RuntimeException("Text Index is only supported for STRING and BYTES stored type");
             }
           }
+          Object[] values = (Object[]) columnValueToIndex;
+          int length = values.length;
           switch (forwardIndexCreator.getValueType()) {
             case INT:
-              if (columnValueToIndex instanceof Object[]) {
-                int[] array = new int[((Object[]) columnValueToIndex).length];
-                for (int i = 0; i < array.length; i++) {
-                  array[i] = (Integer) ((Object[]) columnValueToIndex)[i];
-                }
-                forwardIndexCreator.putIntMV(array);
+              int[] ints = new int[length];
+              for (int i = 0; i < length; i++) {
+                ints[i] = (Integer) values[i];
               }
+              forwardIndexCreator.putIntMV(ints);
               break;
             case LONG:
-              if (columnValueToIndex instanceof Object[]) {
-                long[] array = new long[((Object[]) columnValueToIndex).length];
-                for (int i = 0; i < array.length; i++) {
-                  array[i] = (Long) ((Object[]) columnValueToIndex)[i];
-                }
-                forwardIndexCreator.putLongMV(array);
+              long[] longs = new long[length];
+              for (int i = 0; i < length; i++) {
+                longs[i] = (Long) values[i];
               }
+              forwardIndexCreator.putLongMV(longs);
               break;
             case FLOAT:
-              if (columnValueToIndex instanceof Object[]) {
-                float[] array = new float[((Object[]) columnValueToIndex).length];
-                for (int i = 0; i < array.length; i++) {
-                  array[i] = (Float) ((Object[]) columnValueToIndex)[i];
-                }
-                forwardIndexCreator.putFloatMV(array);
+              float[] floats = new float[length];
+              for (int i = 0; i < length; i++) {
+                floats[i] = (Float) values[i];
               }
+              forwardIndexCreator.putFloatMV(floats);
               break;
             case DOUBLE:
-              if (columnValueToIndex instanceof Object[]) {
-                double[] array = new double[((Object[]) columnValueToIndex).length];
-                for (int i = 0; i < array.length; i++) {
-                  array[i] = (Double) ((Object[]) columnValueToIndex)[i];
-                }
-                forwardIndexCreator.putDoubleMV(array);
+              double[] doubles = new double[length];
+              for (int i = 0; i < length; i++) {
+                doubles[i] = (Double) values[i];
               }
+              forwardIndexCreator.putDoubleMV(doubles);
               break;
             case STRING:
-              if (columnValueToIndex instanceof String[]) {
-                forwardIndexCreator.putStringMV((String[]) columnValueToIndex);
-              } else if (columnValueToIndex instanceof Object[]) {
-                String[] array = new String[((Object[]) columnValueToIndex).length];
-                for (int i = 0; i < array.length; i++) {
-                  array[i] = (String) ((Object[]) columnValueToIndex)[i];
+              if (values instanceof String[]) {
+                forwardIndexCreator.putStringMV((String[]) values);
+              } else {
+                String[] strings = new String[length];
+                for (int i = 0; i < length; i++) {
+                  strings[i] = (String) values[i];
                 }
-                forwardIndexCreator.putStringMV(array);
+                forwardIndexCreator.putStringMV(strings);
               }
               break;
             case BYTES:
-              if (columnValueToIndex instanceof byte[][]) {
-                forwardIndexCreator.putBytesMV((byte[][]) columnValueToIndex);
-              } else if (columnValueToIndex instanceof Object[]) {
-                byte[][] array = new byte[((Object[]) columnValueToIndex).length][];
-                for (int i = 0; i < array.length; i++) {
-                  array[i] = (byte[]) ((Object[]) columnValueToIndex)[i];
+              if (values instanceof byte[][]) {
+                forwardIndexCreator.putBytesMV((byte[][]) values);
+              } else {
+                byte[][] bytesArray = new byte[length][];
+                for (int i = 0; i < length; i++) {
+                  bytesArray[i] = (byte[]) values[i];
                 }
-                forwardIndexCreator.putBytesMV(array);
+                forwardIndexCreator.putBytesMV(bytesArray);
               }
               break;
             default:
@@ -740,8 +744,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         String.valueOf(columnIndexCreationInfo.getMaxNumberOfMultiValueElements()));
     properties.setProperty(getKeyFor(column, TOTAL_NUMBER_OF_ENTRIES),
         String.valueOf(columnIndexCreationInfo.getTotalNumberOfEntries()));
-    properties
-        .setProperty(getKeyFor(column, IS_AUTO_GENERATED), String.valueOf(columnIndexCreationInfo.isAutoGenerated()));
+    properties.setProperty(getKeyFor(column, IS_AUTO_GENERATED),
+        String.valueOf(columnIndexCreationInfo.isAutoGenerated()));
 
     PartitionFunction partitionFunction = columnIndexCreationInfo.getPartitionFunction();
     if (partitionFunction != null) {
@@ -871,17 +875,15 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, writerVersion,
             maxRowLengthInBytes, maxNumberOfMultiValueElements);
       default:
-        throw new UnsupportedOperationException(
-            "Data type not supported for raw indexing: " + dataType);
+        throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType);
     }
   }
 
   @Override
   public void close()
       throws IOException {
-    FileUtils.close(Iterables
-        .concat(_dictionaryCreatorMap.values(), _forwardIndexCreatorMap.values(), _invertedIndexCreatorMap.values(),
-            _textIndexCreatorMap.values(), _fstIndexCreatorMap.values(), _jsonIndexCreatorMap.values(),
-            _h3IndexCreatorMap.values(), _nullValueVectorCreatorMap.values()));
+    FileUtils.close(Iterables.concat(_dictionaryCreatorMap.values(), _forwardIndexCreatorMap.values(),
+        _invertedIndexCreatorMap.values(), _textIndexCreatorMap.values(), _fstIndexCreatorMap.values(),
+        _jsonIndexCreatorMap.values(), _h3IndexCreatorMap.values(), _nullValueVectorCreatorMap.values()));
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java
index 5e78289..bb772ac 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java
@@ -78,6 +78,11 @@ public class LuceneFSTIndexCreator implements TextIndexCreator {
   }
 
   @Override
+  public void add(String[] documents, int length) {
+    throw new UnsupportedOperationException("Multiple values not supported");
+  }
+
+  @Override
   public void seal()
       throws IOException {
     LOGGER.info("Sealing FST index: " + _fstIndexFile.getAbsolutePath());
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
index aa4e9b5..a46c481 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
@@ -121,6 +121,26 @@ public class LuceneTextIndexCreator implements TextIndexCreator {
   }
 
   @Override
+  public void add(String[] documents, int length) {
+    Document docToIndex = new Document();
+
+    // Whenever multiple fields with the same name appear in one document, both the
+    // inverted index and term vectors will logically append the tokens of the
+    // field to one another, in the order the fields were added.
+    for (int i = 0; i < length; i++) {
+      docToIndex.add(new TextField(_textColumn, documents[i], Field.Store.NO));
+    }
+    docToIndex.add(new StoredField(LUCENE_INDEX_DOC_ID_COLUMN_NAME, _nextDocId++));
+
+    try {
+      _indexWriter.addDocument(docToIndex);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Caught exception while adding a new document to the Lucene index for column: " + _textColumn, e);
+    }
+  }
+
+  @Override
   public void seal() {
     try {
       // Do this one time operation of combining the multiple lucene index files (if any)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java
index a5b0843..b7083d0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java
@@ -29,7 +29,9 @@ import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexCo
 import org.apache.pinot.segment.local.segment.index.readers.BaseImmutableDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
+import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl;
 import org.apache.pinot.segment.spi.ColumnMetadata;
@@ -70,9 +72,14 @@ public class LoaderUtils {
             columnMetadata.getTotalNumberOfEntries(), columnMetadata.getBitsPerElement());
       }
     } else {
-      DataType dataType = columnMetadata.getDataType();
-      return dataType.isFixedWidth() ? new FixedByteChunkSVForwardIndexReader(dataBuffer, dataType)
-          : new VarByteChunkSVForwardIndexReader(dataBuffer, dataType);
+      DataType storedType = columnMetadata.getDataType().getStoredType();
+      if (columnMetadata.isSingleValue()) {
+        return storedType.isFixedWidth() ? new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType)
+            : new VarByteChunkSVForwardIndexReader(dataBuffer, storedType);
+      } else {
+        return storedType.isFixedWidth() ? new FixedByteChunkMVForwardIndexReader(dataBuffer, storedType)
+            : new VarByteChunkMVForwardIndexReader(dataBuffer, storedType);
+      }
     }
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
index 53134cb..c12308e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
@@ -37,6 +37,7 @@
 package org.apache.pinot.segment.local.segment.index.loader.invertedindex;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.commons.configuration.PropertiesConfiguration;
@@ -45,10 +46,9 @@ import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
-import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
-import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
 import org.apache.pinot.segment.spi.index.creator.TextIndexType;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
@@ -123,9 +123,8 @@ public class TextIndexHandler implements IndexHandler {
   }
 
   /**
-   * Right now the text index is supported on RAW and dictionary encoded
-   * single-value STRING columns. Later we can add support for text index
-   * on multi-value columns and BYTE type columns
+   * Right now the text index is supported on STRING columns.
+   * Later we can add support for text index on BYTES columns
    * @param columnMetadata metadata for column
    */
   private void checkUnsupportedOperationsForTextIndex(ColumnMetadata columnMetadata) {
@@ -133,10 +132,6 @@ public class TextIndexHandler implements IndexHandler {
     if (columnMetadata.getDataType() != DataType.STRING) {
       throw new UnsupportedOperationException("Text index is currently only supported on STRING columns: " + column);
     }
-    if (!columnMetadata.isSingleValue()) {
-      throw new UnsupportedOperationException(
-          "Text index is currently not supported on multi-value columns: " + column);
-    }
   }
 
   private void createTextIndexForColumn(ColumnMetadata columnMetadata)
@@ -156,24 +151,10 @@ public class TextIndexHandler implements IndexHandler {
     try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
         LuceneTextIndexCreator textIndexCreator = new LuceneTextIndexCreator(column, segmentDirectory, true)) {
-      if (!hasDictionary) {
-        // text index on raw column, just read the raw forward index
-        VarByteChunkSVForwardIndexReader rawIndexReader = (VarByteChunkSVForwardIndexReader) forwardIndexReader;
-        BaseChunkSVForwardIndexReader.ChunkReaderContext chunkReaderContext =
-            (BaseChunkSVForwardIndexReader.ChunkReaderContext) readerContext;
-        for (int docId = 0; docId < numDocs; docId++) {
-          textIndexCreator.add(rawIndexReader.getString(docId, chunkReaderContext));
-        }
+      if (columnMetadata.isSingleValue()) {
+        processSVField(hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs, columnMetadata);
       } else {
-        // text index on dictionary encoded SV column
-        // read forward index to get dictId
-        // read the raw value from dictionary using dictId
-        try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) {
-          for (int docId = 0; docId < numDocs; docId++) {
-            int dictId = forwardIndexReader.getDictId(docId, readerContext);
-            textIndexCreator.add(dictionary.getStringValue(dictId));
-          }
-        }
+        processMVField(hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs, columnMetadata);
       }
       textIndexCreator.seal();
     }
@@ -183,4 +164,56 @@ public class TextIndexHandler implements IndexHandler {
     properties.setProperty(getKeyFor(column, TEXT_INDEX_TYPE), TextIndexType.LUCENE.name());
     properties.save();
   }
+
+  private void processSVField(boolean hasDictionary, ForwardIndexReader forwardIndexReader,
+      ForwardIndexReaderContext readerContext, TextIndexCreator textIndexCreator, int numDocs,
+      ColumnMetadata columnMetadata)
+      throws IOException {
+    if (!hasDictionary) {
+      // text index on raw column, just read the raw forward index
+      for (int docId = 0; docId < numDocs; docId++) {
+        textIndexCreator.add(forwardIndexReader.getString(docId, readerContext));
+      }
+    } else {
+      // text index on dictionary encoded SV column
+      // read forward index to get dictId
+      // read the raw value from dictionary using dictId
+      try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) {
+        for (int docId = 0; docId < numDocs; docId++) {
+          int dictId = forwardIndexReader.getDictId(docId, readerContext);
+          textIndexCreator.add(dictionary.getStringValue(dictId));
+        }
+      }
+    }
+  }
+
+  private void processMVField(boolean hasDictionary, ForwardIndexReader forwardIndexReader,
+      ForwardIndexReaderContext readerContext, TextIndexCreator textIndexCreator, int numDocs,
+      ColumnMetadata columnMetadata)
+      throws IOException {
+    if (!hasDictionary) {
+      // text index on raw column, just read the raw forward index
+      String[] valueBuffer = new String[columnMetadata.getMaxNumberOfMultiValues()];
+      for (int docId = 0; docId < numDocs; docId++) {
+        int length = forwardIndexReader.getStringMV(docId, valueBuffer, readerContext);
+        textIndexCreator.add(valueBuffer, length);
+      }
+    } else {
+      // text index on dictionary encoded MV column
+      // read forward index to get dictId
+      // read the raw value from dictionary using dictId
+      try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) {
+        int maxNumEntries = columnMetadata.getMaxNumberOfMultiValues();
+        int[] dictIdBuffer = new int[maxNumEntries];
+        String[] valueBuffer = new String[maxNumEntries];
+        for (int docId = 0; docId < numDocs; docId++) {
+          int length = forwardIndexReader.getDictIdMV(docId, dictIdBuffer, readerContext);
+          for (int i = 0; i < length; i++) {
+            valueBuffer[i] = dictionary.getStringValue(dictIdBuffer[i]);
+          }
+          textIndexCreator.add(valueBuffer, length);
+        }
+      }
+    }
+  }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/NativeFSTIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/NativeFSTIndexCreator.java
index a97bb80..678105a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/NativeFSTIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/NativeFSTIndexCreator.java
@@ -66,6 +66,11 @@ public class NativeFSTIndexCreator implements TextIndexCreator {
   }
 
   @Override
+  public void add(String[] document, int length) {
+    throw new UnsupportedOperationException("Multiple values not supported");
+  }
+
+  @Override
   public void seal()
       throws IOException {
     LOGGER.info("Sealing FST index: " + _fstIndexFile.getAbsolutePath());
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 2a16478..8a2fb82 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -85,6 +85,8 @@ public class SegmentPreProcessorTest {
   private static final String EXISTING_STRING_COL_DICT = "column5";
   private static final String NEWLY_ADDED_STRING_COL_RAW = "newTextColRaw";
   private static final String NEWLY_ADDED_STRING_COL_DICT = "newTextColDict";
+  private static final String NEWLY_ADDED_STRING_MV_COL_RAW = "newTextMVColRaw";
+  private static final String NEWLY_ADDED_STRING_MV_COL_DICT = "newTextMVColDict";
 
   // For create fst index tests
   private static final String NEWLY_ADDED_FST_COL_DICT = "newFSTColDict";
@@ -214,8 +216,10 @@ public class SegmentPreProcessorTest {
       throws Exception {
     Set<String> textIndexColumns = new HashSet<>();
     textIndexColumns.add(NEWLY_ADDED_STRING_COL_RAW);
+    textIndexColumns.add(NEWLY_ADDED_STRING_MV_COL_RAW);
     _indexLoadingConfig.setTextIndexColumns(textIndexColumns);
     _indexLoadingConfig.getNoDictionaryColumns().add(NEWLY_ADDED_STRING_COL_RAW);
+    _indexLoadingConfig.getNoDictionaryColumns().add(NEWLY_ADDED_STRING_MV_COL_RAW);
 
     // Create a segment in V3, add a new raw column with text index enabled
     constructV3Segment();
@@ -224,6 +228,8 @@ public class SegmentPreProcessorTest {
     // should be null since column does not exist in the schema
     assertNull(columnMetadata);
     checkTextIndexCreation(NEWLY_ADDED_STRING_COL_RAW, 1, 1, _newColumnsSchemaWithText, true, true, true, 4);
+    checkTextIndexCreation(NEWLY_ADDED_STRING_MV_COL_RAW, 1, 1, _newColumnsSchemaWithText, true, true, false, 4, false,
+        1);
 
     // Create a segment in V1, add a new raw column with text index enabled
     constructV1Segment();
@@ -303,6 +309,7 @@ public class SegmentPreProcessorTest {
       throws Exception {
     Set<String> textIndexColumns = new HashSet<>();
     textIndexColumns.add(NEWLY_ADDED_STRING_COL_DICT);
+    textIndexColumns.add(NEWLY_ADDED_STRING_MV_COL_DICT);
     _indexLoadingConfig.setTextIndexColumns(textIndexColumns);
 
     // Create a segment in V3, add a new dict encoded column with text index enabled
@@ -313,6 +320,9 @@ public class SegmentPreProcessorTest {
     assertNull(columnMetadata);
     checkTextIndexCreation(NEWLY_ADDED_STRING_COL_DICT, 1, 1, _newColumnsSchemaWithText, true, true, true, 4);
 
+    checkTextIndexCreation(NEWLY_ADDED_STRING_MV_COL_DICT, 1, 1, _newColumnsSchemaWithText, true, true, false, 4, false,
+        1);
+
     // Create a segment in V1, add a new dict encoded column with text index enabled
     constructV1Segment();
     segmentMetadata = new SegmentMetadataImpl(_indexDir);
@@ -385,18 +395,27 @@ public class SegmentPreProcessorTest {
       boolean isSorted, int dictionaryElementSize)
       throws Exception {
     checkIndexCreation(ColumnIndexType.FST_INDEX, column, cardinality, bits, schema, isAutoGenerated, true, isSorted,
-        dictionaryElementSize);
+        dictionaryElementSize, true, 0);
   }
 
   private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
       boolean hasDictionary, boolean isSorted, int dictionaryElementSize)
       throws Exception {
     checkIndexCreation(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated, hasDictionary,
-        isSorted, dictionaryElementSize);
+        isSorted, dictionaryElementSize, true, 0);
+  }
+
+  private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
+      boolean hasDictionary, boolean isSorted, int dictionaryElementSize, boolean isSingleValue,
+      int maxNumberOfMultiValues)
+      throws Exception {
+    checkIndexCreation(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated, hasDictionary,
+        isSorted, dictionaryElementSize, isSingleValue, maxNumberOfMultiValues);
   }
 
   private void checkIndexCreation(ColumnIndexType indexType, String column, int cardinality, int bits, Schema schema,
-      boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize)
+      boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize,
+      boolean isSingleValued, int maxNumberOfMultiValues)
       throws Exception {
 
     try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
@@ -405,14 +424,14 @@ public class SegmentPreProcessorTest {
       processor.process();
       SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
       ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
-      assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column, DataType.STRING, true));
+      assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column, DataType.STRING, isSingleValued));
       assertEquals(columnMetadata.getCardinality(), cardinality);
       assertEquals(columnMetadata.getTotalDocs(), 100000);
       assertEquals(columnMetadata.getBitsPerElement(), bits);
       assertEquals(columnMetadata.getColumnMaxLength(), dictionaryElementSize);
       assertEquals(columnMetadata.isSorted(), isSorted);
       assertEquals(columnMetadata.hasDictionary(), hasDictionary);
-      assertEquals(columnMetadata.getMaxNumberOfMultiValues(), 0);
+      assertEquals(columnMetadata.getMaxNumberOfMultiValues(), maxNumberOfMultiValues);
       assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000);
       assertEquals(columnMetadata.isAutoGenerated(), isAutoGenerated);
 
@@ -761,8 +780,8 @@ public class SegmentPreProcessorTest {
     Iterator<String> keys = configuration.getKeys();
     while (keys.hasNext()) {
       String key = keys.next();
-      if (key.endsWith(V1Constants.MetadataKeys.Column.MIN_VALUE) || key
-          .endsWith(V1Constants.MetadataKeys.Column.MAX_VALUE)) {
+      if (key.endsWith(V1Constants.MetadataKeys.Column.MIN_VALUE) || key.endsWith(
+          V1Constants.MetadataKeys.Column.MAX_VALUE)) {
         configuration.clearProperty(key);
       }
     }
@@ -970,8 +989,8 @@ public class SegmentPreProcessorTest {
     assertNotNull(segmentMetadata.getColumnMetadataFor("newJsonCol"));
 
     _indexLoadingConfig = new IndexLoadingConfig();
-    _indexLoadingConfig
-        .setH3IndexConfigs(ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5"))));
+    _indexLoadingConfig.setH3IndexConfigs(
+        ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5"))));
     _indexLoadingConfig.setJsonIndexColumns(new HashSet<>(Collections.singletonList("newJsonCol")));
 
     // V1 use separate file for each column index.
@@ -1025,8 +1044,8 @@ public class SegmentPreProcessorTest {
     long initFileSize = singleFileIndex.length();
 
     _indexLoadingConfig = new IndexLoadingConfig();
-    _indexLoadingConfig
-        .setH3IndexConfigs(ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5"))));
+    _indexLoadingConfig.setH3IndexConfigs(
+        ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5"))));
     _indexLoadingConfig.setJsonIndexColumns(new HashSet<>(Collections.singletonList("newJsonCol")));
 
     // Create H3 and Json indices.
diff --git a/pinot-segment-local/src/test/resources/data/newColumnsSchemaWithText.json b/pinot-segment-local/src/test/resources/data/newColumnsSchemaWithText.json
index b2975d4..dda33ba 100644
--- a/pinot-segment-local/src/test/resources/data/newColumnsSchemaWithText.json
+++ b/pinot-segment-local/src/test/resources/data/newColumnsSchemaWithText.json
@@ -30,6 +30,16 @@
       "dataType": "STRING"
     },
     {
+      "name": "newTextMVColRaw",
+      "dataType": "STRING",
+      "singleValueField": false
+    },
+    {
+      "name": "newTextMVColDict",
+      "dataType": "STRING",
+      "singleValueField": false
+    },
+    {
       "name": "column6",
       "dataType": "INT",
       "singleValueField": false
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/TextIndexCreator.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/TextIndexCreator.java
index 4a9d6cf..bad466e 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/TextIndexCreator.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/TextIndexCreator.java
@@ -33,6 +33,11 @@ public interface TextIndexCreator extends Closeable {
   void add(String document);
 
   /**
+   * Adds a set of documents to the index
+   */
+  void add(String[] document, int length);
+
+  /**
    * Seals the index and flushes it to disk.
    */
   void seal()

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org