You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/07/10 08:35:06 UTC

[GitHub] [incubator-pinot] npawar opened a new pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

npawar opened a new pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681


   ## Description
   Add TransformConfigs list to IngestionConfig. This is based on discussion in Extensions in the Filtering design doc: https://docs.google.com/document/d/1Cahnas3nh0XErETH0KHLaecN6xCnRVYWNKO3rDn7qcI/edit#heading=h.gr3yby6x7mlv
   
   ## Release Notes
   Transform functions can be set in the table config instead of schema. In a future release, support for transform functions in schema will go away.
   
   ## Documentation
   TBD for gitbooks
   Example config
   ```
   "ingestionConfig": {
         "filterConfig": {
           "filterFunction": "Groovy({foo == \"VALUE1\"}, foo)"
         },
         "transformConfigs": [{
           "columnName": "bar",
           "transformFunction": "lower(moo)"
         },
         {
           "columnName": "hoursSinceEpoch",
           "transformFunction": "toEpochHours(millis)"
         }]
       }
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar merged pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

Posted by GitBox <gi...@apache.org>.
npawar merged pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681#discussion_r453979435



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
##########
@@ -67,12 +70,24 @@ private static void extractFieldsFromSchema(Schema schema, Set<String> fields) {
    * Extracts the fields needed by a RecordExtractor from given {@link IngestionConfig}
    */
   private static void extractFieldsFromIngestionConfig(@Nullable IngestionConfig ingestionConfig, Set<String> fields) {
-    if (ingestionConfig != null && ingestionConfig.getFilterConfig() != null) {
-      String filterFunction = ingestionConfig.getFilterConfig().getFilterFunction();
-      if (filterFunction != null) {
-        FunctionEvaluator functionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction);
-        if (functionEvaluator != null) {
-          fields.addAll(functionEvaluator.getArguments());
+    if (ingestionConfig != null) {
+      FilterConfig filterConfig = ingestionConfig.getFilterConfig();
+      if (filterConfig != null) {
+        String filterFunction = filterConfig.getFilterFunction();
+        if (filterFunction != null) {
+          FunctionEvaluator functionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction);
+          if (functionEvaluator != null) {
+            fields.addAll(functionEvaluator.getArguments());
+          }
+        }
+      }
+      List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs();
+      if (transformConfigs != null) {
+        for (TransformConfig transformConfig : transformConfigs) {
+          FunctionEvaluator expressionEvaluator =
+              FunctionEvaluatorFactory.getExpressionEvaluator(transformConfig.getTransformFunction());
+          fields.addAll(expressionEvaluator.getArguments());
+          fields.add(transformConfig.getColumnName());

Review comment:
       if transformation is already done at source, this ensures we avoid doing it again. This behavior is being carried over from previous behavior when it used to be in schema.
   Added a comment too




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681#discussion_r453980640



##########
File path: pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema
##########
@@ -0,0 +1,342 @@
+{

Review comment:
       schema contains some fields which are not in the source data. I preferred adding a new file instead of putting it directly in the test file. But changed it now to override createSchema




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681#discussion_r454009172



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.data.function.FunctionEvaluator;
+import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+
+
+/**
+ * Utils related to table config operations
+ * FIXME: Merge this TableConfigUtils with the TableConfigUtils from pinot-common when merging of modules is done
+ */
+public final class TableConfigUtils {

Review comment:
       I am yet to address a few comments on mine. Will do by EOD. This one can go first and I will rebase. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681#discussion_r453846803



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.data.function.FunctionEvaluator;
+import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+
+
+/**
+ * Utils related to table config operations
+ * FIXME: Merge this TableConfigUtils with the TableConfigUtils from pinot-common when merging of modules is done
+ */
+public final class TableConfigUtils {
+
+  private TableConfigUtils() {
+
+  }
+
+  /**
+   * Validates the table config with the following rules:
+   * <ul>
+   *   <li>Text index column must be raw</li>
+   *   <li>peerSegmentDownloadScheme in ValidationConfig must be http or https</li>
+   * </ul>
+   */
+  public static void validate(TableConfig tableConfig) {
+    validateFieldConfigList(tableConfig);
+    validateValidationConfig(tableConfig);
+    validateIngestionConfig(tableConfig.getIngestionConfig());
+  }
+
+  private static void validateFieldConfigList(TableConfig tableConfig) {
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    if (fieldConfigList != null) {
+      List<String> noDictionaryColumns = tableConfig.getIndexingConfig().getNoDictionaryColumns();
+      for (FieldConfig fieldConfig : fieldConfigList) {
+        if (fieldConfig.getIndexType() == FieldConfig.IndexType.TEXT) {
+          // For Text index column, it must be raw (no-dictionary)
+          // NOTE: Check both encodingType and noDictionaryColumns before migrating indexing configs into field configs
+          String column = fieldConfig.getName();
+          if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW || noDictionaryColumns == null
+              || !noDictionaryColumns.contains(column)) {
+            throw new IllegalStateException(
+                "Text index column: " + column + " must be raw (no-dictionary) in both FieldConfig and IndexingConfig");
+          }
+        }
+      }
+    }
+  }
+
+  private static void validateValidationConfig(TableConfig tableConfig) {
+    SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+    if (validationConfig != null) {
+      if (tableConfig.getTableType() == TableType.REALTIME && validationConfig.getTimeColumnName() == null) {
+        throw new IllegalStateException("Must provide time column in real-time table config");
+      }
+      String peerSegmentDownloadScheme = validationConfig.getPeerSegmentDownloadScheme();
+      if (peerSegmentDownloadScheme != null) {
+        if (!CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme) && !CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme)) {
+          throw new IllegalStateException("Invalid value '" + peerSegmentDownloadScheme + "' for peerSegmentDownloadScheme. Must be one of http nor https" );
+        }
+      }
+    }
+  }
+
+  /**
+   * Validates the following:
+   * 1. validity of filter function
+   * 2. checks for duplicate transform configs
+   * 3. checks for null column name or transform function in transform config
+   * 4. validity of transform function string
+   * 5. checks for source fields used in destination columns
+   */
+  private static void validateIngestionConfig(@Nullable IngestionConfig ingestionConfig) {
+    if (ingestionConfig != null) {
+      FilterConfig filterConfig = ingestionConfig.getFilterConfig();
+      if (filterConfig != null) {
+        String filterFunction = filterConfig.getFilterFunction();
+        if (filterFunction != null) {
+          try {
+            FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction);
+          } catch (Exception e) {
+            throw new IllegalStateException("Invalid filter function " + filterFunction, e);
+          }
+        }
+      }
+      List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs();
+      if (transformConfigs != null) {
+        Set<String> transformColumns = new HashSet<>();
+        for (TransformConfig transformConfig : transformConfigs) {
+          String columnName = transformConfig.getColumnName();
+          if (transformColumns.contains(columnName)) {
+            throw new IllegalStateException("Duplicate transform config found for column '" + columnName + "'");
+          }
+          transformColumns.add(columnName);

Review comment:
       ```suggestion
             if (!transformColumns.add(columnName)) {
               throw new IllegalStateException("Duplicate transform config found for column '" + columnName + "'");
             }
   ```

##########
File path: pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformerTest.java
##########
@@ -43,14 +43,28 @@
 public class ExpressionTransformerTest {
 
   @Test
-  public void testGroovyExpressionTransformer()
-      throws IOException {
-    URL resource = AbstractRecordExtractorTest.class.getClassLoader()

Review comment:
       Should we keep a test for transform in schema to ensure this change is backward-compatible? We can remove the test when we remove the schema transform support.

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/Temp.java
##########
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Hybrid cluster integration test that uses one of the DateTimeFieldSpec as primary time column
+ */
+public class Temp extends BaseClusterIntegrationTest {

Review comment:
       Remove this class

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformer.java
##########
@@ -36,12 +38,19 @@
 
   private final Map<String, FunctionEvaluator> _expressionEvaluators = new HashMap<>();
 
-  public ExpressionTransformer(Schema schema) {
+  public ExpressionTransformer(Schema schema, TableConfig tableConfig) {

Review comment:
       (nit) Let's put `tableConfig` in front of `schema`

##########
File path: pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema
##########
@@ -0,0 +1,342 @@
+{

Review comment:
       Why do we need this new schema?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.data.function.FunctionEvaluator;
+import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+
+
+/**
+ * Utils related to table config operations
+ * FIXME: Merge this TableConfigUtils with the TableConfigUtils from pinot-common when merging of modules is done
+ */
+public final class TableConfigUtils {
+
+  private TableConfigUtils() {
+
+  }
+
+  /**
+   * Validates the table config with the following rules:
+   * <ul>
+   *   <li>Text index column must be raw</li>
+   *   <li>peerSegmentDownloadScheme in ValidationConfig must be http or https</li>
+   * </ul>
+   */
+  public static void validate(TableConfig tableConfig) {
+    validateFieldConfigList(tableConfig);
+    validateValidationConfig(tableConfig);
+    validateIngestionConfig(tableConfig.getIngestionConfig());
+  }
+
+  private static void validateFieldConfigList(TableConfig tableConfig) {
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    if (fieldConfigList != null) {
+      List<String> noDictionaryColumns = tableConfig.getIndexingConfig().getNoDictionaryColumns();
+      for (FieldConfig fieldConfig : fieldConfigList) {
+        if (fieldConfig.getIndexType() == FieldConfig.IndexType.TEXT) {
+          // For Text index column, it must be raw (no-dictionary)
+          // NOTE: Check both encodingType and noDictionaryColumns before migrating indexing configs into field configs
+          String column = fieldConfig.getName();
+          if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW || noDictionaryColumns == null
+              || !noDictionaryColumns.contains(column)) {
+            throw new IllegalStateException(
+                "Text index column: " + column + " must be raw (no-dictionary) in both FieldConfig and IndexingConfig");
+          }
+        }
+      }
+    }
+  }
+
+  private static void validateValidationConfig(TableConfig tableConfig) {
+    SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+    if (validationConfig != null) {
+      if (tableConfig.getTableType() == TableType.REALTIME && validationConfig.getTimeColumnName() == null) {
+        throw new IllegalStateException("Must provide time column in real-time table config");
+      }
+      String peerSegmentDownloadScheme = validationConfig.getPeerSegmentDownloadScheme();
+      if (peerSegmentDownloadScheme != null) {
+        if (!CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme) && !CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme)) {
+          throw new IllegalStateException("Invalid value '" + peerSegmentDownloadScheme + "' for peerSegmentDownloadScheme. Must be one of http nor https" );
+        }
+      }
+    }
+  }
+
+  /**
+   * Validates the following:
+   * 1. validity of filter function
+   * 2. checks for duplicate transform configs
+   * 3. checks for null column name or transform function in transform config
+   * 4. validity of transform function string
+   * 5. checks for source fields used in destination columns
+   */
+  private static void validateIngestionConfig(@Nullable IngestionConfig ingestionConfig) {
+    if (ingestionConfig != null) {
+      FilterConfig filterConfig = ingestionConfig.getFilterConfig();
+      if (filterConfig != null) {
+        String filterFunction = filterConfig.getFilterFunction();
+        if (filterFunction != null) {
+          try {
+            FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction);
+          } catch (Exception e) {
+            throw new IllegalStateException("Invalid filter function " + filterFunction, e);
+          }
+        }
+      }
+      List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs();
+      if (transformConfigs != null) {
+        Set<String> transformColumns = new HashSet<>();
+        for (TransformConfig transformConfig : transformConfigs) {
+          String columnName = transformConfig.getColumnName();
+          if (transformColumns.contains(columnName)) {
+            throw new IllegalStateException("Duplicate transform config found for column '" + columnName + "'");
+          }
+          transformColumns.add(columnName);
+          String transformFunction = transformConfig.getTransformFunction();
+          if (columnName == null || transformFunction == null) {
+            throw new IllegalStateException("columnName/transformFunction cannot be null in TransformConfig " + transformConfig);
+          }
+          FunctionEvaluator expressionEvaluator;
+          try {
+            expressionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(transformFunction);
+          } catch (Exception e) {
+            throw new IllegalStateException(
+                "Invalid transform function '" + transformFunction + "' for column '" + columnName + "'");
+          }
+          List<String> arguments = expressionEvaluator.getArguments();

Review comment:
       Should we check that arguments are not contained in the `transformColumns`? We do not support chained transforms currently

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigIntegrationTest.java
##########
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test that converts Avro data for 12 segments and runs queries against it.
+ */
+public class IngestionConfigIntegrationTest extends BaseClusterIntegrationTestSet {
+
+  private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+  private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema";
+
+  @Override
+  protected String getSchemaFileName() {
+    return SCHEMA_FILE_NAME;
+  }
+
+  @Override
+  protected String getTimeColumnName() {
+    return TIME_COLUMN_NAME;
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return 22300;

Review comment:
       This result should be the same as `select count(*) from mytable where AirlineID != 19393 AND ArrDelayMinutes > 5` within other integration test, where I got 24047.
   Please document how this number is calculated. When we add a test, we should not run the test and directly put the result as the expected value because that won't catch the bug of the code or the test logic

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigIntegrationTest.java
##########
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test that converts Avro data for 12 segments and runs queries against it.
+ */
+public class IngestionConfigIntegrationTest extends BaseClusterIntegrationTestSet {
+
+  private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+  private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema";
+
+  @Override
+  protected String getSchemaFileName() {
+    return SCHEMA_FILE_NAME;
+  }
+
+  @Override
+  protected String getTimeColumnName() {
+    return TIME_COLUMN_NAME;
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return 22300;
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @Override
+  protected IngestionConfig getIngestionConfig() {
+    FilterConfig filterConfig = new FilterConfig("Groovy({AirlineID == 19393 || ArrDelayMinutes <= 5 }, AirlineID, ArrDelayMinutes)");
+    List<TransformConfig> transformConfigs = new ArrayList<>();
+    transformConfigs.add(new TransformConfig("AmPm", "Groovy({DepTime < 1200 ? \"AM\": \"PM\"}, DepTime)"));
+    transformConfigs.add(new TransformConfig("millisSinceEpoch", "fromEpochDays(DaysSinceEpoch)"));
+    transformConfigs.add(new TransformConfig("lowerCaseDestCityName", "lower(DestCityName)"));
+    return new IngestionConfig(filterConfig, transformConfigs);
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+    startKafka();
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles.subList(0, avroFiles.size() -1), tableConfig, schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    List<File> realtimeAvroFile = Lists.newArrayList(avroFiles.get(avroFiles.size() - 1));
+    addTableConfig(createRealtimeTableConfig(realtimeAvroFile.get(0)));
+    pushAvroIntoKafka(realtimeAvroFile);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @Test
+  public void testQueries()
+      throws Exception {
+    // Select column created with transform function
+    String sqlQuery = "Select millisSinceEpoch from " + DEFAULT_TABLE_NAME;
+    JsonNode response = postSqlQuery(sqlQuery);
+    assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(0).asText(), "millisSinceEpoch");
+    assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "LONG");
+
+    // Select column created with transform function
+    sqlQuery = "Select AmPm, DepTime from " + DEFAULT_TABLE_NAME;
+    response = postSqlQuery(sqlQuery);
+    assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(0).asText(), "AmPm");
+    assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(1).asText(), "DepTime");
+    assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "STRING");
+    assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(1).asText(), "INT");
+    for (int i = 0; i < response.get("resultTable").get("rows").size(); i++) {
+      String amPm = response.get("resultTable").get("rows").get(i).get(0).asText();
+      int depTime = response.get("resultTable").get("rows").get(i).get(1).asInt();
+      Assert.assertEquals(amPm, (depTime < 1200) ? "AM" : "PM");
+    }
+
+    // Select column created with transform function - offline table
+    sqlQuery = "Select AmPm, DepTime from " + DEFAULT_TABLE_NAME + "_OFFLINE";
+    response = postSqlQuery(sqlQuery);
+    assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(0).asText(), "AmPm");
+    assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(1).asText(), "DepTime");
+    assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "STRING");
+    assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(1).asText(), "INT");
+    for (int i = 0; i < response.get("resultTable").get("rows").size(); i++) {
+      String amPm = response.get("resultTable").get("rows").get(i).get(0).asText();
+      int depTime = response.get("resultTable").get("rows").get(i).get(1).asInt();
+      Assert.assertEquals(amPm, (depTime < 1200) ? "AM" : "PM");
+    }
+
+    // Select column created with transform - realtime table
+    sqlQuery = "Select AmPm, DepTime from " + DEFAULT_TABLE_NAME + "_REALTIME";
+    response = postSqlQuery(sqlQuery);
+    assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(0).asText(), "AmPm");
+    assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(1).asText(), "DepTime");
+    assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "STRING");
+    assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(1).asText(), "INT");
+    for (int i = 0; i < response.get("resultTable").get("rows").size(); i++) {
+      String amPm = response.get("resultTable").get("rows").get(i).get(0).asText();
+      int depTime = response.get("resultTable").get("rows").get(i).get(1).asInt();
+      Assert.assertEquals(amPm, (depTime < 1200) ? "AM" : "PM");
+    }
+
+    // Check there's no values that should've been filtered
+    sqlQuery = "Select * from " + DEFAULT_TABLE_NAME
+        + "  where AirlineID = 19393 or ArrDelayMinutes <= 5";
+    response = postSqlQuery(sqlQuery);
+    Assert.assertEquals(response.get("resultTable").get("rows").size(), 0);
+
+    // Check there's no values that should've been filtered - realtime table
+    sqlQuery = "Select * from " + DEFAULT_TABLE_NAME + "_REALTIME"
+        + "  where AirlineID = 19393 or ArrDelayMinutes <= 5";
+    response = postSqlQuery(sqlQuery);
+    Assert.assertEquals(response.get("resultTable").get("rows").size(), 0);
+
+    // Check there's no values that should've been filtered - offline table
+    sqlQuery = "Select * from " + DEFAULT_TABLE_NAME + "_OFFLINE"
+        + "  where AirlineID = 19393 or ArrDelayMinutes <= 5";
+    response = postSqlQuery(sqlQuery);
+    Assert.assertEquals(response.get("resultTable").get("rows").size(), 0);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+

Review comment:
       (nit) remove empty line

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigIntegrationTest.java
##########
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test that converts Avro data for 12 segments and runs queries against it.
+ */
+public class IngestionConfigIntegrationTest extends BaseClusterIntegrationTestSet {
+
+  private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+  private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema";

Review comment:
       Make a simplified schema (only contains the columns needed for the test).
   You can directly override `createSchema()`

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigIntegrationTest.java
##########
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test that converts Avro data for 12 segments and runs queries against it.
+ */
+public class IngestionConfigIntegrationTest extends BaseClusterIntegrationTestSet {
+
+  private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+  private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema";
+
+  @Override
+  protected String getSchemaFileName() {
+    return SCHEMA_FILE_NAME;
+  }
+
+  @Override
+  protected String getTimeColumnName() {
+    return TIME_COLUMN_NAME;
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return 22300;
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @Override
+  protected IngestionConfig getIngestionConfig() {
+    FilterConfig filterConfig = new FilterConfig("Groovy({AirlineID == 19393 || ArrDelayMinutes <= 5 }, AirlineID, ArrDelayMinutes)");
+    List<TransformConfig> transformConfigs = new ArrayList<>();
+    transformConfigs.add(new TransformConfig("AmPm", "Groovy({DepTime < 1200 ? \"AM\": \"PM\"}, DepTime)"));
+    transformConfigs.add(new TransformConfig("millisSinceEpoch", "fromEpochDays(DaysSinceEpoch)"));
+    transformConfigs.add(new TransformConfig("lowerCaseDestCityName", "lower(DestCityName)"));
+    return new IngestionConfig(filterConfig, transformConfigs);
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+    startKafka();
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles.subList(0, avroFiles.size() -1), tableConfig, schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    List<File> realtimeAvroFile = Lists.newArrayList(avroFiles.get(avroFiles.size() - 1));

Review comment:
       No overlapping segments? The result won't be correct. Please use the set up  logic as in the `HybridClusterIntegrationTest`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.data.function.FunctionEvaluator;
+import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+
+
+/**
+ * Utils related to table config operations
+ * FIXME: Merge this TableConfigUtils with the TableConfigUtils from pinot-common when merging of modules is done
+ */
+public final class TableConfigUtils {

Review comment:
       This will have conflict with #5667. Let's figure out the sequence of merging these 2 PRs

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
##########
@@ -67,12 +70,24 @@ private static void extractFieldsFromSchema(Schema schema, Set<String> fields) {
    * Extracts the fields needed by a RecordExtractor from given {@link IngestionConfig}
    */
   private static void extractFieldsFromIngestionConfig(@Nullable IngestionConfig ingestionConfig, Set<String> fields) {
-    if (ingestionConfig != null && ingestionConfig.getFilterConfig() != null) {
-      String filterFunction = ingestionConfig.getFilterConfig().getFilterFunction();
-      if (filterFunction != null) {
-        FunctionEvaluator functionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction);
-        if (functionEvaluator != null) {
-          fields.addAll(functionEvaluator.getArguments());
+    if (ingestionConfig != null) {
+      FilterConfig filterConfig = ingestionConfig.getFilterConfig();
+      if (filterConfig != null) {
+        String filterFunction = filterConfig.getFilterFunction();
+        if (filterFunction != null) {
+          FunctionEvaluator functionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction);
+          if (functionEvaluator != null) {
+            fields.addAll(functionEvaluator.getArguments());
+          }
+        }
+      }
+      List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs();
+      if (transformConfigs != null) {
+        for (TransformConfig transformConfig : transformConfigs) {
+          FunctionEvaluator expressionEvaluator =
+              FunctionEvaluatorFactory.getExpressionEvaluator(transformConfig.getTransformFunction());
+          fields.addAll(expressionEvaluator.getArguments());
+          fields.add(transformConfig.getColumnName());

Review comment:
       Please comment on why we extract both input and output column

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.data.function.FunctionEvaluator;
+import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+
+
+/**
+ * Utils related to table config operations
+ * FIXME: Merge this TableConfigUtils with the TableConfigUtils from pinot-common when merging of modules is done
+ */
+public final class TableConfigUtils {
+
+  private TableConfigUtils() {
+
+  }
+
+  /**
+   * Validates the table config with the following rules:
+   * <ul>
+   *   <li>Text index column must be raw</li>
+   *   <li>peerSegmentDownloadScheme in ValidationConfig must be http or https</li>
+   * </ul>
+   */
+  public static void validate(TableConfig tableConfig) {
+    validateFieldConfigList(tableConfig);
+    validateValidationConfig(tableConfig);
+    validateIngestionConfig(tableConfig.getIngestionConfig());
+  }
+
+  private static void validateFieldConfigList(TableConfig tableConfig) {
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    if (fieldConfigList != null) {
+      List<String> noDictionaryColumns = tableConfig.getIndexingConfig().getNoDictionaryColumns();
+      for (FieldConfig fieldConfig : fieldConfigList) {
+        if (fieldConfig.getIndexType() == FieldConfig.IndexType.TEXT) {
+          // For Text index column, it must be raw (no-dictionary)
+          // NOTE: Check both encodingType and noDictionaryColumns before migrating indexing configs into field configs
+          String column = fieldConfig.getName();
+          if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW || noDictionaryColumns == null
+              || !noDictionaryColumns.contains(column)) {
+            throw new IllegalStateException(
+                "Text index column: " + column + " must be raw (no-dictionary) in both FieldConfig and IndexingConfig");
+          }
+        }
+      }
+    }
+  }
+
+  private static void validateValidationConfig(TableConfig tableConfig) {
+    SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+    if (validationConfig != null) {
+      if (tableConfig.getTableType() == TableType.REALTIME && validationConfig.getTimeColumnName() == null) {
+        throw new IllegalStateException("Must provide time column in real-time table config");
+      }
+      String peerSegmentDownloadScheme = validationConfig.getPeerSegmentDownloadScheme();
+      if (peerSegmentDownloadScheme != null) {
+        if (!CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme) && !CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme)) {
+          throw new IllegalStateException("Invalid value '" + peerSegmentDownloadScheme + "' for peerSegmentDownloadScheme. Must be one of http nor https" );
+        }
+      }
+    }
+  }
+
+  /**
+   * Validates the following:
+   * 1. validity of filter function
+   * 2. checks for duplicate transform configs
+   * 3. checks for null column name or transform function in transform config
+   * 4. validity of transform function string
+   * 5. checks for source fields used in destination columns
+   */
+  private static void validateIngestionConfig(@Nullable IngestionConfig ingestionConfig) {
+    if (ingestionConfig != null) {
+      FilterConfig filterConfig = ingestionConfig.getFilterConfig();
+      if (filterConfig != null) {
+        String filterFunction = filterConfig.getFilterFunction();
+        if (filterFunction != null) {
+          try {
+            FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction);
+          } catch (Exception e) {
+            throw new IllegalStateException("Invalid filter function " + filterFunction, e);
+          }
+        }
+      }
+      List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs();
+      if (transformConfigs != null) {
+        Set<String> transformColumns = new HashSet<>();
+        for (TransformConfig transformConfig : transformConfigs) {
+          String columnName = transformConfig.getColumnName();
+          if (transformColumns.contains(columnName)) {
+            throw new IllegalStateException("Duplicate transform config found for column '" + columnName + "'");
+          }
+          transformColumns.add(columnName);
+          String transformFunction = transformConfig.getTransformFunction();
+          if (columnName == null || transformFunction == null) {

Review comment:
       Perform null check before the set check

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigIntegrationTest.java
##########
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test that converts Avro data for 12 segments and runs queries against it.
+ */
+public class IngestionConfigIntegrationTest extends BaseClusterIntegrationTestSet {

Review comment:
       extend `BaseClusterIntegrationTest` instead of `BaseClusterIntegrationTestSet`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

Posted by GitBox <gi...@apache.org>.
npawar commented on pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681#issuecomment-657255805


   > > > With this, do you think it's possible to support placeholders of some intermediate transform result, without adding them to the schema?
   > > > Also, can the filter use the transformed column? In other words, is the filtering pre-transformation or post-transformation?
   > > 
   > > 
   > > Yes, it should be possible now to add placeholders for intermediate results. **Those intermediate results can be used in filter** Filtering is post-transformation, and so any of the results of transformation can be used in filter.
   > 
   > Glad to know that. Shall we consider pre-transformation filtering on the conditions not involving transform as well, so that we can save some compute cycles on the transformation?
   > 
   > > However the intermediate results cannot be used in other transform functions at the moment, because we don't follow any particular order while executing the transform functions. This will be supported in the future, when we work on derived columns: #5509
   
   Yes we could certainly make that optimization. Will bring those changes slowly in future PRs. That might bring in additional config, and best if handled on its own.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

Posted by GitBox <gi...@apache.org>.
npawar commented on pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681#issuecomment-657873737


   > LGTM with minor comments. Please work with @siddharthteotia to decide how to merge the 2 conflicting PRs
   
   Checked with @siddharthteotia . I'll be merging this first, since this one is ready to go. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681#discussion_r453992786



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Tests ingestion configs on a hybrid table
+ */
+public class IngestionConfigHybridIntegrationTest extends BaseClusterIntegrationTest {
+  private static final int NUM_OFFLINE_SEGMENTS = 8;
+  private static final int NUM_REALTIME_SEGMENTS = 6;
+  private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+  private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema";
+  private static final long FILTERED_COUNT_STAR_RESULT = 24047L;

Review comment:
       Let's document how this value is calculated (query result of `SELECT COUNT(*) FROM mytable WHERE AirlineID != 19393 AND ArrDelayMinutes > 5` on unfiltered data)

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Tests ingestion configs on a hybrid table
+ */
+public class IngestionConfigHybridIntegrationTest extends BaseClusterIntegrationTest {
+  private static final int NUM_OFFLINE_SEGMENTS = 8;
+  private static final int NUM_REALTIME_SEGMENTS = 6;
+  private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+  private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema";

Review comment:
       Remove this line

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Tests ingestion configs on a hybrid table
+ */
+public class IngestionConfigHybridIntegrationTest extends BaseClusterIntegrationTest {
+  private static final int NUM_OFFLINE_SEGMENTS = 8;
+  private static final int NUM_REALTIME_SEGMENTS = 6;
+  private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+  private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema";
+  private static final long FILTERED_COUNT_STAR_RESULT = 24047L;
+
+  @Override
+  protected String getSchemaFileName() {
+    return SCHEMA_FILE_NAME;
+  }

Review comment:
       Remove

##########
File path: pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformerTest.java
##########
@@ -43,14 +43,28 @@
 public class ExpressionTransformerTest {
 
   @Test
-  public void testGroovyExpressionTransformer()
-      throws IOException {
-    URL resource = AbstractRecordExtractorTest.class.getClassLoader()
-        .getResource("data/expression_transformer/groovy_expression_transformer.json");
-    File schemaFile = new File(resource.getFile());
-    Schema pinotSchema = Schema.fromFile(schemaFile);
-
-    ExpressionTransformer expressionTransformer = new ExpressionTransformer(pinotSchema);
+  public void testTransformConfigsFromTable() {

Review comment:
       (nit)
   ```suggestion
     public void testTransformConfigsFromTableConfig() {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

Posted by GitBox <gi...@apache.org>.
npawar commented on pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681#issuecomment-656802312


   > With this, do you think it's possible to support placeholders of some intermediate transform result, without adding them to the schema?
   > Also, can the filter use the transformed column? In other words, is the filtering pre-transformation or post-transformation?
   
   Yes, it should be possible now to add placeholders for intermediate results. **Those intermediate results can be used in filter** Filtering is post-transformation, and so any of the results of transformation can be used in filter. 
   However the intermediate results cannot be used in other transform functions at the moment, because we don't follow any particular order while executing the transform functions. This will be supported in the future, when we work on derived columns: https://github.com/apache/incubator-pinot/issues/5509


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681#discussion_r453981152



##########
File path: pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformerTest.java
##########
@@ -43,14 +43,28 @@
 public class ExpressionTransformerTest {
 
   @Test
-  public void testGroovyExpressionTransformer()
-      throws IOException {
-    URL resource = AbstractRecordExtractorTest.class.getClassLoader()

Review comment:
       i've split this test into 2 - testTransformFunctionsInSchema testTransformFunctionsInTable. They are both in the same file




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] yupeng9 commented on pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

Posted by GitBox <gi...@apache.org>.
yupeng9 commented on pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681#issuecomment-656807868


   > > With this, do you think it's possible to support placeholders of some intermediate transform result, without adding them to the schema?
   > > Also, can the filter use the transformed column? In other words, is the filtering pre-transformation or post-transformation?
   > 
   > Yes, it should be possible now to add placeholders for intermediate results. **Those intermediate results can be used in filter** Filtering is post-transformation, and so any of the results of transformation can be used in filter.
   
   Glad to know that. Shall we consider pre-transformation filtering on the conditions not involving transform as well, so that we can save some compute cycles on the transformation?
   > However the intermediate results cannot be used in other transform functions at the moment, because we don't follow any particular order while executing the transform functions. This will be supported in the future, when we work on derived columns: #5509
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681#discussion_r453982110



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigIntegrationTest.java
##########
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test that converts Avro data for 12 segments and runs queries against it.
+ */
+public class IngestionConfigIntegrationTest extends BaseClusterIntegrationTestSet {
+
+  private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+  private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema";
+
+  @Override
+  protected String getSchemaFileName() {
+    return SCHEMA_FILE_NAME;
+  }
+
+  @Override
+  protected String getTimeColumnName() {
+    return TIME_COLUMN_NAME;
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return 22300;

Review comment:
       it was an effect of using different number of files as compared to Hybrid test. Changed it to be exactly like Hybrid test. Now number is 24047




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] yupeng9 commented on pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

Posted by GitBox <gi...@apache.org>.
yupeng9 commented on pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681#issuecomment-656758962


   With this, do you think it's possible to support placeholders of some intermediate transform result, without adding them to the schema?
   Also, can the filter use the transformed column? In other words, is the filtering pre-transformation or post-transformation?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5681: TransformConfig in IngestionConfig for ingestion transformations

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5681:
URL: https://github.com/apache/incubator-pinot/pull/5681#discussion_r453979896



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.data.function.FunctionEvaluator;
+import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+
+
+/**
+ * Utils related to table config operations
+ * FIXME: Merge this TableConfigUtils with the TableConfigUtils from pinot-common when merging of modules is done
+ */
+public final class TableConfigUtils {
+
+  private TableConfigUtils() {
+
+  }
+
+  /**
+   * Validates the table config with the following rules:
+   * <ul>
+   *   <li>Text index column must be raw</li>
+   *   <li>peerSegmentDownloadScheme in ValidationConfig must be http or https</li>
+   * </ul>
+   */
+  public static void validate(TableConfig tableConfig) {
+    validateFieldConfigList(tableConfig);
+    validateValidationConfig(tableConfig);
+    validateIngestionConfig(tableConfig.getIngestionConfig());
+  }
+
+  private static void validateFieldConfigList(TableConfig tableConfig) {
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    if (fieldConfigList != null) {
+      List<String> noDictionaryColumns = tableConfig.getIndexingConfig().getNoDictionaryColumns();
+      for (FieldConfig fieldConfig : fieldConfigList) {
+        if (fieldConfig.getIndexType() == FieldConfig.IndexType.TEXT) {
+          // For Text index column, it must be raw (no-dictionary)
+          // NOTE: Check both encodingType and noDictionaryColumns before migrating indexing configs into field configs
+          String column = fieldConfig.getName();
+          if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW || noDictionaryColumns == null
+              || !noDictionaryColumns.contains(column)) {
+            throw new IllegalStateException(
+                "Text index column: " + column + " must be raw (no-dictionary) in both FieldConfig and IndexingConfig");
+          }
+        }
+      }
+    }
+  }
+
+  private static void validateValidationConfig(TableConfig tableConfig) {
+    SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+    if (validationConfig != null) {
+      if (tableConfig.getTableType() == TableType.REALTIME && validationConfig.getTimeColumnName() == null) {
+        throw new IllegalStateException("Must provide time column in real-time table config");
+      }
+      String peerSegmentDownloadScheme = validationConfig.getPeerSegmentDownloadScheme();
+      if (peerSegmentDownloadScheme != null) {
+        if (!CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme) && !CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme)) {
+          throw new IllegalStateException("Invalid value '" + peerSegmentDownloadScheme + "' for peerSegmentDownloadScheme. Must be one of http nor https" );
+        }
+      }
+    }
+  }
+
+  /**
+   * Validates the following:
+   * 1. validity of filter function
+   * 2. checks for duplicate transform configs
+   * 3. checks for null column name or transform function in transform config
+   * 4. validity of transform function string
+   * 5. checks for source fields used in destination columns
+   */
+  private static void validateIngestionConfig(@Nullable IngestionConfig ingestionConfig) {
+    if (ingestionConfig != null) {
+      FilterConfig filterConfig = ingestionConfig.getFilterConfig();
+      if (filterConfig != null) {
+        String filterFunction = filterConfig.getFilterFunction();
+        if (filterFunction != null) {
+          try {
+            FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction);
+          } catch (Exception e) {
+            throw new IllegalStateException("Invalid filter function " + filterFunction, e);
+          }
+        }
+      }
+      List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs();
+      if (transformConfigs != null) {
+        Set<String> transformColumns = new HashSet<>();
+        for (TransformConfig transformConfig : transformConfigs) {
+          String columnName = transformConfig.getColumnName();
+          if (transformColumns.contains(columnName)) {
+            throw new IllegalStateException("Duplicate transform config found for column '" + columnName + "'");
+          }
+          transformColumns.add(columnName);
+          String transformFunction = transformConfig.getTransformFunction();
+          if (columnName == null || transformFunction == null) {
+            throw new IllegalStateException("columnName/transformFunction cannot be null in TransformConfig " + transformConfig);
+          }
+          FunctionEvaluator expressionEvaluator;
+          try {
+            expressionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(transformFunction);
+          } catch (Exception e) {
+            throw new IllegalStateException(
+                "Invalid transform function '" + transformFunction + "' for column '" + columnName + "'");
+          }
+          List<String> arguments = expressionEvaluator.getArguments();

Review comment:
       yes sounds good. added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org