You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2024/02/27 04:44:29 UTC

(pinot) branch master updated: Record enricher (#12243)

This is an automated email from the ASF dual-hosted git repository.

saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 07daa7bbaa Record enricher (#12243)
07daa7bbaa is described below

commit 07daa7bbaa92c882758f086d4d30dad96c273473
Author: Saurabh Dubey <sa...@gmail.com>
AuthorDate: Tue Feb 27 10:14:23 2024 +0530

    Record enricher (#12243)
    
    * Enricher interface
    
    * Support in realtime ingestion
    
    * Fix offline segment builder
    
    * Add enricher to mapper
    
    * Add columnsToExtract config
    
    * Complete enricher pipeline usage
    
    * Enrichment configs
    
    * Add transform function based enrichment
    
    * Change package name
    
    * Change package name
    
    * Review comments
    
    * Add table config validation
    
    * Review comments
    
    * Change to fieldToFunctionMap
    
    ---------
    
    Co-authored-by: Saurabh Dubey <sa...@saurabhs-macbook-pro-1.tail8a064.ts.net>
    Co-authored-by: Saurabh Dubey <sa...@Saurabhs-MacBook-Pro.local>
---
 .../connector/flink/sink/FlinkSegmentWriter.java   |   4 +
 .../realtime/RealtimeSegmentDataManager.java       |  11 +++
 .../framework/SegmentProcessorFramework.java       |   2 +
 .../segment/processing/mapper/SegmentMapper.java   |   4 +
 .../apache/pinot/queries/TransformQueriesTest.java |   2 +-
 .../record/enricher/clp/CLPEncodingEnricher.java   | 100 +++++++++++++++++++++
 .../enricher/clp/CLPEncodingEnricherFactory.java   |  51 +++++++++++
 .../record/enricher/clp/ClpEnricherConfig.java     |  40 +++++++++
 .../enricher/function/CustomFunctionEnricher.java  |  66 ++++++++++++++
 .../function/CustomFunctionEnricherConfig.java     |  40 +++++++++
 .../function/CustomFunctionEnricherFactory.java    |  61 +++++++++++++
 .../converter/RealtimeSegmentConverter.java        |   4 +-
 .../RecordReaderSegmentCreationDataSource.java     |   9 ++
 .../impl/SegmentIndexCreationDriverImpl.java       |  12 ++-
 .../pinot/segment/local/utils/IngestionUtils.java  |   8 +-
 .../segment/local/utils/TableConfigUtils.java      |  13 +++
 .../spi/annotations/RecordEnricherFactory.java     |  30 +++++++
 .../config/table/ingestion/EnrichmentConfig.java   |  49 ++++++++++
 .../config/table/ingestion/IngestionConfig.java    |  14 +++
 .../pinot/spi/recordenricher/RecordEnricher.java   |  40 +++++++++
 .../spi/recordenricher/RecordEnricherConfig.java   |  23 +++++
 .../spi/recordenricher/RecordEnricherFactory.java  |  29 ++++++
 .../spi/recordenricher/RecordEnricherPipeline.java |  75 ++++++++++++++++
 .../spi/recordenricher/RecordEnricherRegistry.java |  62 +++++++++++++
 .../RecordEnricherValidationConfig.java            |  35 ++++++++
 25 files changed, 777 insertions(+), 7 deletions(-)

diff --git a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
index 8815bbf37a..e9bd29701b 100644
--- a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
+++ b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
@@ -53,6 +53,7 @@ import org.apache.pinot.spi.ingestion.batch.BatchConfig;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,6 +78,7 @@ public class FlinkSegmentWriter implements SegmentWriter {
   private String _outputDirURI;
   private Schema _schema;
   private Set<String> _fieldsToRead;
+  private RecordEnricherPipeline _recordEnricherPipeline;
   private RecordTransformer _recordTransformer;
 
   private File _stagingDir;
@@ -137,6 +139,7 @@ public class FlinkSegmentWriter implements SegmentWriter {
 
     _schema = schema;
     _fieldsToRead = _schema.getColumnNames();
+    _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(_tableConfig);
     _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     _avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_schema);
     _reusableRecord = new GenericData.Record(_avroSchema);
@@ -172,6 +175,7 @@ public class FlinkSegmentWriter implements SegmentWriter {
   public void collect(GenericRow row)
       throws IOException {
     long startTime = System.currentTimeMillis();
+    _recordEnricherPipeline.run(row);
     GenericRow transform = _recordTransformer.transform(row);
     SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(transform, _reusableRecord, _fieldsToRead);
     _rowCount++;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index d64e85fada..5dd8fc025a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -75,6 +75,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
@@ -272,6 +273,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
   private final int _partitionGroupId;
   private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus;
   final String _clientId;
+  private final RecordEnricherPipeline _recordEnricherPipeline;
   private final TransformPipeline _transformPipeline;
   private PartitionGroupConsumer _partitionGroupConsumer = null;
   private StreamMetadataProvider _partitionMetadataProvider = null;
@@ -575,6 +577,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
         _numRowsErrored++;
       } else {
         try {
+          _recordEnricherPipeline.run(decodedRow.getResult());
           _transformPipeline.processRow(decodedRow.getResult(), reusedResult);
         } catch (Exception e) {
           _numRowsErrored++;
@@ -1480,6 +1483,14 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
           new SegmentErrorInfo(now(), "Failed to initialize the StreamMessageDecoder", e));
       throw e;
     }
+
+    try {
+      _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig);
+    } catch (Exception e) {
+      _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+          new SegmentErrorInfo(now(), "Failed to initialize the RecordEnricherPipeline", e));
+      throw e;
+    }
     _transformPipeline = new TransformPipeline(tableConfig, schema);
     // Acquire semaphore to create stream consumers
     try {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 868583d0d3..def4f75b29 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -43,6 +43,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -289,6 +290,7 @@ public class SegmentProcessorFramework {
           GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId);
           SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
           driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange),
+              RecordEnricherPipeline.getPassThroughPipeline(),
               TransformPipeline.getPassThroughPipeline());
           driver.build();
           outputSegmentDirs.add(driver.getOutputDirectory());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 407cbd4dcd..77b651d943 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
 import org.apache.pinot.spi.utils.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,6 +69,7 @@ public class SegmentMapper {
   private final List<FieldSpec> _fieldSpecs;
   private final boolean _includeNullFields;
   private final int _numSortFields;
+  private final RecordEnricherPipeline _recordEnricherPipeline;
   private final CompositeTransformer _recordTransformer;
   private final TimeHandler _timeHandler;
   private final Partitioner[] _partitioners;
@@ -92,6 +94,7 @@ public class SegmentMapper {
     _fieldSpecs = pair.getLeft();
     _numSortFields = pair.getRight();
     _includeNullFields = tableConfig.getIndexingConfig().isNullHandlingEnabled();
+    _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig);
     _recordTransformer = CompositeTransformer.composeAllTransformers(_customRecordTransformers, tableConfig, schema);
     _timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig);
     List<PartitionerConfig> partitionerConfigs = processorConfig.getPartitionerConfigs();
@@ -166,6 +169,7 @@ public class SegmentMapper {
     observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount));
     while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) {
       reuse = recordReader.next(reuse);
+      _recordEnricherPipeline.run(reuse);
 
       // TODO: Add ComplexTypeTransformer here. Currently it is not idempotent so cannot add it
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
index 572080e44e..1f04d16d3b 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
@@ -132,7 +132,7 @@ public class TransformQueriesTest extends BaseQueriesTest {
 
     TableConfig tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME)
-        .setIngestionConfig(new IngestionConfig(null, null, null,
+        .setIngestionConfig(new IngestionConfig(null, null, null, null,
             Arrays.asList(new TransformConfig(M1_V2, "Groovy({INT_COL1_V3  == null || "
                 + "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 }, INT_COL1, INT_COL1_V3)")),
             null, null, null))
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java
new file mode 100644
index 0000000000..790015d8af
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.record.enricher.clp;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Enriches the record with CLP encoded fields.
+ * For a column 'x', it adds three new columns to the record:
+ * 1. 'x_logtype' - The logtype of the encoded message
+ * 2. 'x_dictVars' - The dictionary variables of the encoded message
+ * 3. 'x_encodedVars' - The encoded variables of the encoded message
+ */
+public class CLPEncodingEnricher implements RecordEnricher {
+  private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class);
+  private final ClpEnricherConfig _config;
+  private final EncodedMessage _clpEncodedMessage;
+  private final MessageEncoder _clpMessageEncoder;
+
+  public CLPEncodingEnricher(JsonNode enricherProperties) throws IOException {
+    _config = JsonUtils.jsonNodeToObject(enricherProperties, ClpEnricherConfig.class);
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+  }
+
+  @Override
+  public List<String> getInputColumns() {
+    return _config.getFields();
+  }
+
+  @Override
+  public void enrich(GenericRow record) {
+    try {
+      for (String field : _config.getFields()) {
+        Object value = record.getValue(field);
+        if (value != null) {
+          enrichWithClpEncodedFields(field, value, record);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to enrich record: {}", record);
+    }
+  }
+
+  private void enrichWithClpEncodedFields(String key, Object value, GenericRow to) {
+    String logtype = null;
+    Object[] dictVars = null;
+    Object[] encodedVars = null;
+    if (null != value) {
+      if (value instanceof String) {
+        String valueAsString = (String) value;
+        try {
+          _clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage);
+          logtype = _clpEncodedMessage.getLogTypeAsString();
+          encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+          dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+        } catch (IOException e) {
+          LOGGER.error("Can't encode field with CLP. name: '{}', value: '{}', error: {}", key, valueAsString,
+              e.getMessage());
+        }
+      } else {
+        LOGGER.error("Can't encode value of type {} with CLP. name: '{}', value: '{}'",
+            value.getClass().getSimpleName(), key, value);
+      }
+    }
+
+    to.putValue(key + ClpRewriter.LOGTYPE_COLUMN_SUFFIX, logtype);
+    to.putValue(key + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX, dictVars);
+    to.putValue(key + ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX, encodedVars);
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java
new file mode 100644
index 0000000000..cacc4fdbc9
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.record.enricher.clp;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import org.apache.pinot.spi.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.recordenricher.RecordEnricherFactory;
+import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+@AutoService(RecordEnricherFactory.class)
+public class CLPEncodingEnricherFactory implements RecordEnricherFactory {
+  private static final String ENRICHER_TYPE = "clpEnricher";
+  @Override
+  public String getEnricherType() {
+    return ENRICHER_TYPE;
+  }
+
+  @Override
+  public RecordEnricher createEnricher(JsonNode enricherProps)
+      throws IOException {
+    return new CLPEncodingEnricher(enricherProps);
+  }
+
+  @Override
+  public void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig) {
+    try {
+      ClpEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, ClpEnricherConfig.class);
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Failed to parse clp enricher config", e);
+    }
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/ClpEnricherConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/ClpEnricherConfig.java
new file mode 100644
index 0000000000..93439602ec
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/ClpEnricherConfig.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.record.enricher.clp;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Configuration for the CLP enricher.
+ */
+public class ClpEnricherConfig {
+  private final List<String> _fields;
+
+  @JsonCreator
+  public ClpEnricherConfig(@JsonProperty("fields") List<String> fields) {
+    _fields = fields;
+  }
+
+  public List<String> getFields() {
+    return _fields;
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java
new file mode 100644
index 0000000000..92cf565220
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.plugin.record.enricher.function;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.segment.local.function.FunctionEvaluator;
+import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Enriches the record with custom functions.
+ */
+public class CustomFunctionEnricher implements RecordEnricher {
+  private final Map<String, FunctionEvaluator> _fieldToFunctionEvaluator;
+  private final List<String> _fieldsToExtract;
+
+  public CustomFunctionEnricher(JsonNode enricherProps) throws IOException {
+    CustomFunctionEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, CustomFunctionEnricherConfig.class);
+    _fieldToFunctionEvaluator = new LinkedHashMap<>();
+    _fieldsToExtract = new ArrayList<>();
+    for (Map.Entry<String, String> entry : config.getFieldToFunctionMap().entrySet()) {
+      String column = entry.getKey();
+      String function = entry.getValue();
+      FunctionEvaluator functionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(function);
+      _fieldToFunctionEvaluator.put(column, functionEvaluator);
+      _fieldsToExtract.addAll(functionEvaluator.getArguments());
+    }
+  }
+
+  @Override
+  public List<String> getInputColumns() {
+    return _fieldsToExtract;
+  }
+
+  @Override
+  public void enrich(GenericRow record) {
+    _fieldToFunctionEvaluator.forEach((field, evaluator) -> {
+      record.putValue(field, evaluator.evaluate(record));
+    });
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherConfig.java
new file mode 100644
index 0000000000..cc4270f601
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherConfig.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.record.enricher.function;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.LinkedHashMap;
+
+/**
+ * Configuration for the custom function enricher.
+ */
+public class CustomFunctionEnricherConfig {
+  private final LinkedHashMap<String, String> _fieldToFunctionMap;
+
+  @JsonCreator
+  public CustomFunctionEnricherConfig(
+      @JsonProperty("fieldToFunctionMap") LinkedHashMap<String, String> columnTofunctionMap) {
+    _fieldToFunctionMap = columnTofunctionMap;
+  }
+
+  public LinkedHashMap<String, String> getFieldToFunctionMap() {
+    return _fieldToFunctionMap;
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java
new file mode 100644
index 0000000000..f773081903
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.record.enricher.function;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.recordenricher.RecordEnricherFactory;
+import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+@AutoService(RecordEnricherFactory.class)
+public class CustomFunctionEnricherFactory implements RecordEnricherFactory {
+  private static final String TYPE = "generateColumn";
+  @Override
+  public String getEnricherType() {
+    return TYPE;
+  }
+
+  @Override
+  public RecordEnricher createEnricher(JsonNode enricherProps)
+      throws IOException {
+    return new CustomFunctionEnricher(enricherProps);
+  }
+
+  @Override
+  public void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig) {
+    CustomFunctionEnricherConfig config;
+    try {
+      config = JsonUtils.jsonNodeToObject(enricherProps, CustomFunctionEnricherConfig.class);
+      if (!validationConfig.isGroovyDisabled()) {
+        return;
+      }
+      for (String function : config.getFieldToFunctionMap().values()) {
+        if (FunctionEvaluatorFactory.isGroovyExpression(function)) {
+          throw new IllegalArgumentException("Groovy expression is not allowed for enrichment");
+        }
+      }
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Failed to parse custom function enricher config", e);
+    }
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index a6e2d067b6..ffb9bfc23f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -42,6 +42,7 @@ import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
 
 
 public class RealtimeSegmentConverter {
@@ -125,7 +126,8 @@ public class RealtimeSegmentConverter {
       recordReader.init(_realtimeSegmentImpl, sortedDocIds);
       RealtimeSegmentSegmentCreationDataSource dataSource =
           new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, recordReader);
-      driver.init(genConfig, dataSource, TransformPipeline.getPassThroughPipeline());
+      driver.init(genConfig, dataSource, RecordEnricherPipeline.getPassThroughPipeline(),
+          TransformPipeline.getPassThroughPipeline());
 
       if (!_enableColumnMajor) {
         driver.build();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
index 411c040cdd..219a207d94 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
@@ -25,6 +25,7 @@ import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,12 +39,17 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat
   private static final Logger LOGGER = LoggerFactory.getLogger(RecordReaderSegmentCreationDataSource.class);
 
   private final RecordReader _recordReader;
+  private RecordEnricherPipeline _recordEnricherPipeline;
   private TransformPipeline _transformPipeline;
 
   public RecordReaderSegmentCreationDataSource(RecordReader recordReader) {
     _recordReader = recordReader;
   }
 
+  public void setRecordEnricherPipeline(RecordEnricherPipeline recordEnricherPipeline) {
+    _recordEnricherPipeline = recordEnricherPipeline;
+  }
+
   public void setTransformPipeline(TransformPipeline transformPipeline) {
     _transformPipeline = transformPipeline;
   }
@@ -51,6 +57,8 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat
   @Override
   public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig statsCollectorConfig) {
     try {
+      RecordEnricherPipeline recordEnricherPipeline = _recordEnricherPipeline != null ? _recordEnricherPipeline
+          : RecordEnricherPipeline.fromTableConfig(statsCollectorConfig.getTableConfig());
       TransformPipeline transformPipeline = _transformPipeline != null ? _transformPipeline
           : new TransformPipeline(statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema());
 
@@ -64,6 +72,7 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat
         reuse.clear();
 
         reuse = _recordReader.next(reuse);
+        recordEnricherPipeline.run(reuse);
         transformPipeline.processRow(reuse, reusedResult);
         for (GenericRow row : reusedResult.getTransformedRows()) {
           collector.collectRow(row);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 129bc35ccf..e99d89c8b4 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -79,6 +79,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderFactory;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.slf4j.Logger;
@@ -100,6 +101,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
   private SegmentCreator _indexCreator;
   private SegmentIndexCreationInfo _segmentIndexCreationInfo;
   private Schema _dataSchema;
+  private RecordEnricherPipeline _recordEnricherPipeline;
   private TransformPipeline _transformPipeline;
   private IngestionSchemaValidator _ingestionSchemaValidator;
   private int _totalDocs = 0;
@@ -157,17 +159,20 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
   public void init(SegmentGeneratorConfig config, RecordReader recordReader)
       throws Exception {
     SegmentCreationDataSource dataSource = new RecordReaderSegmentCreationDataSource(recordReader);
-    init(config, dataSource, new TransformPipeline(config.getTableConfig(), config.getSchema()));
+    init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()),
+        new TransformPipeline(config.getTableConfig(), config.getSchema()));
   }
 
   @Deprecated
   public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource,
       RecordTransformer recordTransformer, @Nullable ComplexTypeTransformer complexTypeTransformer)
       throws Exception {
-    init(config, dataSource, new TransformPipeline(recordTransformer, complexTypeTransformer));
+    init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()),
+        new TransformPipeline(recordTransformer, complexTypeTransformer));
   }
 
   public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource,
+      RecordEnricherPipeline enricherPipeline,
       TransformPipeline transformPipeline)
       throws Exception {
     _config = config;
@@ -178,9 +183,11 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
     if (config.isFailOnEmptySegment()) {
       Preconditions.checkState(_recordReader.hasNext(), "No record in data source");
     }
+    _recordEnricherPipeline = enricherPipeline;
     _transformPipeline = transformPipeline;
     // Use the same transform pipeline if the data source is backed by a record reader
     if (dataSource instanceof RecordReaderSegmentCreationDataSource) {
+      ((RecordReaderSegmentCreationDataSource) dataSource).setRecordEnricherPipeline(enricherPipeline);
       ((RecordReaderSegmentCreationDataSource) dataSource).setTransformPipeline(transformPipeline);
     }
 
@@ -244,6 +251,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
 
           // Should not be needed anymore.
           // Add row to indexes
+          _recordEnricherPipeline.run(decodedRow);
           _transformPipeline.processRow(decodedRow, reusedResult);
 
           recordReadStopTime = System.nanoTime();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
index 315bd5becf..203d7d930a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
@@ -65,6 +65,7 @@ import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
@@ -379,15 +380,16 @@ public final class IngestionUtils {
           expressionContext.getColumns(fields);
         }
       }
+
+      fields.addAll(RecordEnricherPipeline.fromIngestionConfig(ingestionConfig).getColumnsToExtract());
       List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs();
       if (transformConfigs != null) {
         for (TransformConfig transformConfig : transformConfigs) {
           FunctionEvaluator expressionEvaluator =
               FunctionEvaluatorFactory.getExpressionEvaluator(transformConfig.getTransformFunction());
           fields.addAll(expressionEvaluator.getArguments());
-          fields.add(transformConfig
-              .getColumnName()); // add the column itself too, so that if it is already transformed, we won't
-          // transform again
+          // add the column itself too, so that if it is already transformed, we won't transform again
+          fields.add(transformConfig.getColumnName());
         }
       }
       ComplexTypeConfig complexTypeConfig = ingestionConfig.getComplexTypeConfig();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 83c890d5f9..43fd69488b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -73,6 +73,7 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
+import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig;
 import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig;
@@ -82,6 +83,8 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.recordenricher.RecordEnricherRegistry;
+import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -505,6 +508,16 @@ public final class TableConfigUtils {
         });
       }
 
+      // Enrichment configs
+      List<EnrichmentConfig> enrichmentConfigs = ingestionConfig.getEnrichmentConfigs();
+      if (enrichmentConfigs != null) {
+        for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) {
+          RecordEnricherRegistry.validateEnrichmentConfig(enrichmentConfig,
+              new RecordEnricherValidationConfig(disableGroovy));
+        }
+      }
+
+
       // Transform configs
       List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs();
       if (transformConfigs != null) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/RecordEnricherFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/RecordEnricherFactory.java
new file mode 100644
index 0000000000..5e58c3a3fd
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/RecordEnricherFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface RecordEnricherFactory {
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/EnrichmentConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/EnrichmentConfig.java
new file mode 100644
index 0000000000..9b295a78e4
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/EnrichmentConfig.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table.ingestion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+public class EnrichmentConfig extends BaseJsonConfig {
+  @JsonPropertyDescription("Enricher type")
+  private final String _enricherType;
+
+  @JsonPropertyDescription("Enricher properties")
+  private final JsonNode _properties;
+
+  @JsonCreator
+  public EnrichmentConfig(@JsonProperty("enricherType") String enricherType,
+      @JsonProperty("properties") JsonNode properties) {
+    _enricherType = enricherType;
+    _properties = properties;
+  }
+
+  public String getEnricherType() {
+    return _enricherType;
+  }
+
+  public JsonNode getProperties() {
+    return _properties;
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
index 86ff9cec6f..5af9cdcc50 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
@@ -39,6 +39,9 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Config related to filtering records during ingestion")
   private FilterConfig _filterConfig;
 
+  @JsonPropertyDescription("Config related to enriching records during ingestion")
+  private List<EnrichmentConfig> _enrichmentConfigs;
+
   @JsonPropertyDescription("Configs related to record transformation functions applied during ingestion")
   private List<TransformConfig> _transformConfigs;
 
@@ -63,12 +66,14 @@ public class IngestionConfig extends BaseJsonConfig {
   @Deprecated
   public IngestionConfig(@Nullable BatchIngestionConfig batchIngestionConfig,
       @Nullable StreamIngestionConfig streamIngestionConfig, @Nullable FilterConfig filterConfig,
+      @Nullable List<EnrichmentConfig> enrichmentConfigs,
       @Nullable List<TransformConfig> transformConfigs, @Nullable ComplexTypeConfig complexTypeConfig,
       @Nullable SchemaConformingTransformerConfig schemaConformingTransformerConfig,
       @Nullable List<AggregationConfig> aggregationConfigs) {
     _batchIngestionConfig = batchIngestionConfig;
     _streamIngestionConfig = streamIngestionConfig;
     _filterConfig = filterConfig;
+    _enrichmentConfigs = enrichmentConfigs;
     _transformConfigs = transformConfigs;
     _complexTypeConfig = complexTypeConfig;
     _schemaConformingTransformerConfig = schemaConformingTransformerConfig;
@@ -93,6 +98,11 @@ public class IngestionConfig extends BaseJsonConfig {
     return _filterConfig;
   }
 
+  @Nullable
+  public List<EnrichmentConfig> getEnrichmentConfigs() {
+    return _enrichmentConfigs;
+  }
+
   @Nullable
   public List<TransformConfig> getTransformConfigs() {
     return _transformConfigs;
@@ -137,6 +147,10 @@ public class IngestionConfig extends BaseJsonConfig {
     _filterConfig = filterConfig;
   }
 
+  public void setEnrichmentConfigs(List<EnrichmentConfig> enrichmentConfigs) {
+    _enrichmentConfigs = enrichmentConfigs;
+  }
+
   public void setTransformConfigs(List<TransformConfig> transformConfigs) {
     _transformConfigs = transformConfigs;
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java
new file mode 100644
index 0000000000..8e48ed7102
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.recordenricher;
+
+import java.util.List;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Interface for enriching records.
+ * If a column with the same name as the input column already exists in the record, it will be overwritten.
+ */
+public interface RecordEnricher {
+  /**
+   * Returns the list of input columns required for enriching the record.
+   * This is used to make sure the required input fields are extracted.
+   */
+  List<String> getInputColumns();
+
+  /**
+   * Enriches the given record, by adding new columns to the same record.
+   */
+  void enrich(GenericRow record);
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java
new file mode 100644
index 0000000000..adac697bdb
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.recordenricher;
+
+public interface RecordEnricherConfig {
+  void parse();
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java
new file mode 100644
index 0000000000..4b55d04260
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.recordenricher;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+
+
+public interface RecordEnricherFactory {
+  String getEnricherType();
+  RecordEnricher createEnricher(JsonNode enricherProps) throws IOException;
+  void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig);
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java
new file mode 100644
index 0000000000..5a50d685ca
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.recordenricher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+public class RecordEnricherPipeline {
+  private final List<RecordEnricher> _enrichers = new ArrayList<>();
+  private final Set<String> _columnsToExtract = new HashSet<>();
+
+  public static RecordEnricherPipeline getPassThroughPipeline() {
+    return new RecordEnricherPipeline();
+  }
+
+  public static RecordEnricherPipeline fromIngestionConfig(IngestionConfig ingestionConfig) {
+    RecordEnricherPipeline pipeline = new RecordEnricherPipeline();
+    if (null == ingestionConfig || null == ingestionConfig.getEnrichmentConfigs()) {
+      return pipeline;
+    }
+    List<EnrichmentConfig> enrichmentConfigs = ingestionConfig.getEnrichmentConfigs();
+    for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) {
+      try {
+        RecordEnricher enricher = RecordEnricherRegistry.createRecordEnricher(enrichmentConfig);
+        pipeline.add(enricher);
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to instantiate record enricher " + enrichmentConfig.getEnricherType(), e);
+      }
+    }
+    return pipeline;
+  }
+
+  public static RecordEnricherPipeline fromTableConfig(TableConfig tableConfig) {
+    return fromIngestionConfig(tableConfig.getIngestionConfig());
+  }
+
+  public Set<String> getColumnsToExtract() {
+    return _columnsToExtract;
+  }
+
+  public void add(RecordEnricher enricher) {
+    _enrichers.add(enricher);
+    _columnsToExtract.addAll(enricher.getInputColumns());
+  }
+
+  public void run(GenericRow record) {
+    for (RecordEnricher enricher : _enrichers) {
+      enricher.enrich(record);
+    }
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java
new file mode 100644
index 0000000000..59c1e6fb34
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.recordenricher;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RecordEnricherRegistry {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RecordEnricherRegistry.class);
+  private static final Map<String, RecordEnricherFactory> RECORD_ENRICHER_FACTORY_MAP = new HashMap<>();
+
+  private RecordEnricherRegistry() {
+  }
+
+  public static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig,
+      RecordEnricherValidationConfig config) {
+    if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) {
+      throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType());
+    }
+
+    RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType())
+        .validateEnrichmentConfig(enrichmentConfig.getProperties(), config);
+  }
+
+  public static RecordEnricher createRecordEnricher(EnrichmentConfig enrichmentConfig)
+      throws IOException {
+    if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) {
+      throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType());
+    }
+    return RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType())
+        .createEnricher(enrichmentConfig.getProperties());
+  }
+
+  static {
+    for (RecordEnricherFactory recordEnricherFactory : ServiceLoader.load(RecordEnricherFactory.class)) {
+      LOGGER.info("Registered record enricher factory type: {}", recordEnricherFactory.getEnricherType());
+      RECORD_ENRICHER_FACTORY_MAP.put(recordEnricherFactory.getEnricherType(), recordEnricherFactory);
+    }
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java
new file mode 100644
index 0000000000..86fb74155e
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java
@@ -0,0 +1,35 @@
+package org.apache.pinot.spi.recordenricher;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Interface for cluster constrains, which can be used to validate the record enricher configs
+ */
+public class RecordEnricherValidationConfig {
+  private final boolean _groovyDisabled;
+
+  public RecordEnricherValidationConfig(boolean groovyDisabled) {
+    _groovyDisabled = groovyDisabled;
+  }
+
+  public boolean isGroovyDisabled() {
+    return _groovyDisabled;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org